Add one-time document clean-up job

This commit is contained in:
Jet Li 2025-01-31 07:03:27 +00:00
parent c49ae956f1
commit 669a5303cd
6 changed files with 99 additions and 2 deletions

View File

@ -1,4 +1,4 @@
from backend.content.models import ContentFolderDoc from backend.content.models import ContentFolderDoc, DocumentDoc
content_models = [] content_models = []
content_models.extend([ContentFolderDoc]) content_models.extend([ContentFolderDoc, DocumentDoc])

View File

@ -1,6 +1,7 @@
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import List, Optional
from .constants import ContentSource, ContentMediaType, ContentDataFormat from .constants import ContentSource, ContentMediaType, ContentDataFormat
from enum import IntEnum
from common.constants.region import UserRegion from common.constants.region import UserRegion
from beanie import Document from beanie import Document
from pydantic import BaseModel from pydantic import BaseModel
@ -17,6 +18,18 @@ class ContentDirectory(BaseModel):
content_document_id: Optional[str] = None content_document_id: Optional[str] = None
class MediaType(IntEnum):
UNKNOWN = 0
PDF = 1
PNG = 2
TXT = 3
class DataFormat(IntEnum):
RAW = 0
BASED64 = 1
class ContentFolderDoc(Document): class ContentFolderDoc(Document):
folder_name: str folder_name: str
content_directories: List[ContentDirectory] content_directories: List[ContentDirectory]
@ -31,3 +44,20 @@ class ContentFolderDoc(Document):
[("folder_name", 1), ("region", 1)], # Compound index [("folder_name", 1), ("region", 1)], # Compound index
[("udpate_time", -1)], # Descending index for udpate_time [("udpate_time", -1)], # Descending index for udpate_time
] ]
class DocumentDoc(Document):
document_id: Optional[str]
file_name: str
created_by: str
create_time: datetime
updated_by: str
update_time: datetime
location: Optional[str]
version_number: int
media_type: MediaType
data_format: DataFormat
is_deleted: Optional[bool] = False
class Settings:
name = "document"

View File

@ -1,4 +1,5 @@
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
from backend.content.models import DocumentDoc
import httpx import httpx
@ -34,3 +35,24 @@ class DocumentManager:
api_url, data={"associated_with": associated_with}, files=files api_url, data={"associated_with": associated_with}, files=files
) )
return response.json()["document_id"] return response.json()["document_id"]
async def cleanup_document(self):
# Corrected query with regex
documents = await DocumentDoc.find(
{"created_by": {"$regex": "^content-service-"}}
).to_list()
if documents:
print(
f"Found {len(documents)} documents to modify and delete: {[doc.id for doc in documents]}"
)
else:
print("No documents found to modify or delete.")
for document in documents:
print(
f"Deleting document {document.id} - created_by: {document.created_by}"
)
await document.delete()
print("Modification and deletion completed.")

View File

@ -0,0 +1,31 @@
import logging
from scheduler.constants import ScheduleJobLocker
from scheduler.schedule_job_locker import acquire_lock, release_lock
from backend.document.document_manager import DocumentManager
async def cleanup_document_job():
if await acquire_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER, 3600):
try:
logging.info("Starting job to clean up documents.")
document_manager = DocumentManager()
# Monitor execution time
start_time = logging.getLogger().handlers[0].formatter.converter()
logging.info(f"Job started at {start_time}")
await document_manager.cleanup_document()
logging.info("Successfully completed cleanup document job.")
except Exception as e:
# Log any unexpected exceptions
logging.error(
f"Error occurred during cleanup_document_job: {e}", exc_info=True
)
finally:
# Ensure the lock is released even if an exception occurs
await release_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER)
logging.info("Lock released for cleanup_document_job.")
else:
logging.info("The cleanup document job is locked by another process.")

View File

@ -3,3 +3,4 @@ from enum import Enum
class ScheduleJobLocker(Enum): class ScheduleJobLocker(Enum):
REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER = "analyze_sharepoint_content_job" REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER = "analyze_sharepoint_content_job"
CLEANUP_DOCUMENT_JOB_LOCKER = "cleanup_document_job"

View File

@ -1,5 +1,8 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
from scheduler.refresh_sharepoint_content_job import refresh_sharepoint_content_job from scheduler.refresh_sharepoint_content_job import refresh_sharepoint_content_job
from scheduler.cleanup_document_job import cleanup_document_job
from common.log.log_utils import log_entry_exit_async from common.log.log_utils import log_entry_exit_async
from scheduler.constants import ScheduleJobLocker from scheduler.constants import ScheduleJobLocker
from scheduler.schedule_job_locker import init_lock from scheduler.schedule_job_locker import init_lock
@ -16,3 +19,13 @@ async def create_scheduler() -> AsyncIOScheduler:
async def register_job(scheduler): async def register_job(scheduler):
await init_lock(ScheduleJobLocker.REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER) await init_lock(ScheduleJobLocker.REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER)
scheduler.add_job(refresh_sharepoint_content_job, "interval", seconds=(3600 + 3)) scheduler.add_job(refresh_sharepoint_content_job, "interval", seconds=(3600 + 3))
# Register cleanup_document_job as a one-time job
await init_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER)
execution_time = datetime.now() + timedelta(
seconds=60
) # Schedule to run 60 seconds from now
scheduler.add_job(
cleanup_document_job, # Job function
trigger=DateTrigger(run_date=execution_time), # One-time trigger
id="cleanup_document_one_time", # Optional: Give the job an ID
)