Create one-off photo_id backfill job

This commit is contained in:
Jet Li 2025-02-07 08:13:59 +00:00
parent da369dc61b
commit 6028b0a6f0
7 changed files with 142 additions and 2 deletions

View File

@ -0,0 +1,4 @@
from backend.document.models import BasicProfileDoc
document_models = []
document_models.extend([BasicProfileDoc])

View File

@ -1,6 +1,8 @@
from common.config.app_settings import app_settings
from backend.content.models import DocumentDoc
from backend.document.models import BasicProfileDoc
import httpx
import base64
class DocumentManager:
@ -70,3 +72,34 @@ class DocumentManager:
await document.delete()
print("Modification and deletion completed.")
async def backfill_photo_id(self):
profiles = await BasicProfileDoc.find().to_list()
if profiles:
print(
f"Found {len(profiles)} documents to modify and delete: {[profile.id for profile in profiles]}"
)
else:
print("No documents found to modify or delete.")
for profile in profiles:
print(f"Updating document {profile.id}")
if (
profile.photo_id is None or profile.photo_id == ""
) and profile.photo.base64:
# Strip the metadata prefix (e.g., 'data:image/png;base64,')
_, encoded_data = profile.photo.base64.split(",", 1)
# Decode the Base64 string into bytes
blob_data = base64.b64decode(encoded_data)
photo_id = await self.save_document_file(
profile.user_id,
profile.photo.filename,
blob_data,
)
print("this is photo_id", photo_id)
profile.photo_id = photo_id
await profile.save()
print("User photo id backfill completed.")

View File

@ -0,0 +1,74 @@
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, EmailStr
from beanie import Document, Indexed
from enum import IntEnum
class UserRegion(IntEnum):
OTHER = 0
ZH_CN = 1
class Tags(BaseModel):
skill: List[str]
class SelfIntro(BaseModel):
summary: str = ""
content_html: str = ""
tags: Tags
class Photo(BaseModel):
url: Optional[str]
base64: str
filename: str
class Email(BaseModel):
address: Optional[EmailStr]
verified: bool = False
class Mobile(BaseModel):
number: Optional[str]
verified: bool
class FLID(BaseModel):
identity: str
set_by: str
create_time: datetime
update_time: datetime
class Password(BaseModel):
set_up: bool
update_time: datetime
expiry: datetime
class BasicProfileDoc(Document):
user_id: str
first_name: Indexed(str) = "" # type: ignore
last_name: Indexed(str) = "" # type: ignore Index for faster search
spoken_language: List[str] = []
self_intro: SelfIntro
photo: Photo
photo_id: Optional[str] = None
email: Email
mobile: Mobile
FLID: FLID
password: Password
region: int = UserRegion.OTHER
time_zone: Optional[str] = None
class Settings:
name = "basic_profile"
indexes = [
"user_id", # Add index for fast querying by user_id
"email.address", # This adds an index for the 'email.address' field
# Compound text index for fuzzy search across multiple fields
[("first_name", "text"), ("last_name", "text"), ("email.address", "text")],
]

View File

@ -1,4 +1,6 @@
from backend.content import content_models
from backend.document import document_models
backend_models = []
backend_models.extend(content_models)
backend_models.extend(document_models)

View File

@ -0,0 +1,17 @@
import logging
from scheduler.constants import ScheduleJobLocker
from scheduler.schedule_job_locker import acquire_lock, release_lock
from backend.document.document_manager import DocumentManager
async def backfill_photo_id_job():
if await acquire_lock(ScheduleJobLocker.BACKFILL_PHOTO_ID_JOB_LOCKER, 3600):
try:
logging.info("Starting job to backfill photo id job.")
document_manager = DocumentManager()
await document_manager.backfill_photo_id()
logging.info("Exiting job to backfill photo id job.")
finally:
await release_lock(ScheduleJobLocker.BACKFILL_PHOTO_ID_JOB_LOCKER)
else:
logging.info("The job has been locked by other process.")

View File

@ -4,3 +4,4 @@ from enum import Enum
class ScheduleJobLocker(Enum):
REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER = "analyze_sharepoint_content_job"
CLEANUP_DOCUMENT_JOB_LOCKER = "cleanup_document_job"
BACKFILL_PHOTO_ID_JOB_LOCKER = "backfill_photo_id_job"

View File

@ -1,7 +1,10 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
from scheduler.refresh_sharepoint_content_job import refresh_sharepoint_content_job
from scheduler.refresh_sharepoint_content_job import (
refresh_sharepoint_content_job,
)
from scheduler.backfill_photo_id_job import backfill_photo_id_job
from scheduler.cleanup_document_job import cleanup_document_job
from common.log.log_utils import log_entry_exit_async
from scheduler.constants import ScheduleJobLocker
@ -16,9 +19,15 @@ async def create_scheduler() -> AsyncIOScheduler:
@log_entry_exit_async
async def register_job(scheduler):
async def register_job(scheduler: AsyncIOScheduler):
await init_lock(ScheduleJobLocker.REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER)
scheduler.add_job(refresh_sharepoint_content_job, "interval", seconds=(3600 + 3))
await init_lock(ScheduleJobLocker.BACKFILL_PHOTO_ID_JOB_LOCKER)
scheduler.add_job(
backfill_photo_id_job,
"date",
run_date=datetime(2025, 2, 7, 20, 0, 0),
)
# Register cleanup_document_job as a one-time job
# This job is just one-time job for removing many unused documents
# Run already, now comment it out