diff --git a/apps/content/backend/content/__init__.py b/apps/content/backend/content/__init__.py index 770644c..213ed92 100644 --- a/apps/content/backend/content/__init__.py +++ b/apps/content/backend/content/__init__.py @@ -1,4 +1,4 @@ -from backend.content.models import ContentFolderDoc +from backend.content.models import ContentFolderDoc, DocumentDoc content_models = [] -content_models.extend([ContentFolderDoc]) +content_models.extend([ContentFolderDoc, DocumentDoc]) diff --git a/apps/content/backend/content/models.py b/apps/content/backend/content/models.py index 2901717..d8ed78e 100644 --- a/apps/content/backend/content/models.py +++ b/apps/content/backend/content/models.py @@ -1,6 +1,7 @@ from datetime import datetime from typing import List, Optional from .constants import ContentSource, ContentMediaType, ContentDataFormat +from enum import IntEnum from common.constants.region import UserRegion from beanie import Document from pydantic import BaseModel @@ -17,6 +18,18 @@ class ContentDirectory(BaseModel): content_document_id: Optional[str] = None +class MediaType(IntEnum): + UNKNOWN = 0 + PDF = 1 + PNG = 2 + TXT = 3 + + +class DataFormat(IntEnum): + RAW = 0 + BASED64 = 1 + + class ContentFolderDoc(Document): folder_name: str content_directories: List[ContentDirectory] @@ -31,3 +44,20 @@ class ContentFolderDoc(Document): [("folder_name", 1), ("region", 1)], # Compound index [("udpate_time", -1)], # Descending index for udpate_time ] + + +class DocumentDoc(Document): + document_id: Optional[str] + file_name: str + created_by: str + create_time: datetime + updated_by: str + update_time: datetime + location: Optional[str] + version_number: int + media_type: MediaType + data_format: DataFormat + is_deleted: Optional[bool] = False + + class Settings: + name = "document" diff --git a/apps/content/backend/document/document_manager.py b/apps/content/backend/document/document_manager.py index 39864c8..aeb9a07 100644 --- a/apps/content/backend/document/document_manager.py +++ b/apps/content/backend/document/document_manager.py @@ -1,4 +1,5 @@ from common.config.app_settings import app_settings +from backend.content.models import DocumentDoc import httpx @@ -34,3 +35,24 @@ class DocumentManager: api_url, data={"associated_with": associated_with}, files=files ) return response.json()["document_id"] + + async def cleanup_document(self): + # Corrected query with regex + documents = await DocumentDoc.find( + {"created_by": {"$regex": "^content-service-"}} + ).to_list() + + if documents: + print( + f"Found {len(documents)} documents to modify and delete: {[doc.id for doc in documents]}" + ) + else: + print("No documents found to modify or delete.") + + for document in documents: + print( + f"Deleting document {document.id} - created_by: {document.created_by}" + ) + await document.delete() + + print("Modification and deletion completed.") diff --git a/apps/content/scheduler/cleanup_document_job.py b/apps/content/scheduler/cleanup_document_job.py new file mode 100644 index 0000000..9e185ff --- /dev/null +++ b/apps/content/scheduler/cleanup_document_job.py @@ -0,0 +1,31 @@ +import logging +from scheduler.constants import ScheduleJobLocker +from scheduler.schedule_job_locker import acquire_lock, release_lock +from backend.document.document_manager import DocumentManager + + +async def cleanup_document_job(): + if await acquire_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER, 3600): + try: + logging.info("Starting job to clean up documents.") + + document_manager = DocumentManager() + + # Monitor execution time + start_time = logging.getLogger().handlers[0].formatter.converter() + logging.info(f"Job started at {start_time}") + + await document_manager.cleanup_document() + + logging.info("Successfully completed cleanup document job.") + except Exception as e: + # Log any unexpected exceptions + logging.error( + f"Error occurred during cleanup_document_job: {e}", exc_info=True + ) + finally: + # Ensure the lock is released even if an exception occurs + await release_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER) + logging.info("Lock released for cleanup_document_job.") + else: + logging.info("The cleanup document job is locked by another process.") diff --git a/apps/content/scheduler/constants.py b/apps/content/scheduler/constants.py index ede65f9..65fd201 100755 --- a/apps/content/scheduler/constants.py +++ b/apps/content/scheduler/constants.py @@ -3,3 +3,4 @@ from enum import Enum class ScheduleJobLocker(Enum): REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER = "analyze_sharepoint_content_job" + CLEANUP_DOCUMENT_JOB_LOCKER = "cleanup_document_job" diff --git a/apps/content/scheduler/scheduler_manager.py b/apps/content/scheduler/scheduler_manager.py index 8829b75..1d7aab4 100755 --- a/apps/content/scheduler/scheduler_manager.py +++ b/apps/content/scheduler/scheduler_manager.py @@ -1,5 +1,8 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.date import DateTrigger +from datetime import datetime, timedelta from scheduler.refresh_sharepoint_content_job import refresh_sharepoint_content_job +from scheduler.cleanup_document_job import cleanup_document_job from common.log.log_utils import log_entry_exit_async from scheduler.constants import ScheduleJobLocker from scheduler.schedule_job_locker import init_lock @@ -16,3 +19,13 @@ async def create_scheduler() -> AsyncIOScheduler: async def register_job(scheduler): await init_lock(ScheduleJobLocker.REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER) scheduler.add_job(refresh_sharepoint_content_job, "interval", seconds=(3600 + 3)) + # Register cleanup_document_job as a one-time job + await init_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER) + execution_time = datetime.now() + timedelta( + seconds=60 + ) # Schedule to run 60 seconds from now + scheduler.add_job( + cleanup_document_job, # Job function + trigger=DateTrigger(run_date=execution_time), # One-time trigger + id="cleanup_document_one_time", # Optional: Give the job an ID + )