Fix the scheduler bug and APIs under content

This commit is contained in:
Jet Li 2025-01-30 07:26:53 +00:00
parent 8b3d22344e
commit 71dc20ae4d
8 changed files with 158 additions and 149 deletions

View File

@ -10,66 +10,23 @@ from backend.content.constants import ContentSource
class ContentService: class ContentService:
def __init__(self) -> None: def __init__(self) -> None:
self.sharepoint_manager = ContentSharePointManager() pass
self.expiry_time = timedelta(hours=24)
async def retrieve_directories_for_folder( async def retrieve_content_directories_for_folder(
self, folder_name: str, region: UserRegion self, folder_name: str, region: UserRegion
) -> List[ContentDirectory]: ) -> List[ContentDirectory]:
# Check cache folder = await ContentFolderDoc.find_one(
# folder_key = f"{folder_name}/{region.name}" ContentFolderDoc.folder_name == folder_name,
# folder = await ContentFolderDoc.find_one( ContentFolderDoc.region == region,
# ContentFolderDoc.folder_name == folder_key, )
# ContentFolderDoc.region == region,
# )
folder = None
# Refresh cache if expired or not present
if folder is None or folder.valid_thru.replace( if folder is None or folder.valid_thru.replace(
tzinfo=timezone.utc tzinfo=timezone.utc
) < datetime.now(timezone.utc): ) < datetime.now(timezone.utc):
folder = await self.__refresh_folder_from_sharepoint(folder_name, region) folder = await ContentSharePointManager().retrieve_directories_for_folder(
folder_name=folder_name, region=region
return folder.content_directories if folder else []
async def __refresh_folder_from_sharepoint(
self, folder_name: str, region: UserRegion
) -> ContentFolderDoc:
content_folder_name = f"{folder_name}/{region.name}"
sp_folders = self.sharepoint_manager.list_sub_folders(content_folder_name)
current_time = datetime.now(timezone.utc)
# Create or update folder metadata
folder = await ContentFolderDoc.find_one(
ContentFolderDoc.folder_name == content_folder_name
)
if folder is None:
folder = ContentFolderDoc(
folder_name=content_folder_name,
content_directories=[],
udpate_time=current_time,
update_source=ContentSource.SHAREPOINT,
valid_thru=current_time + self.expiry_time,
region=region,
) )
else:
folder.content_directories.clear()
# Process subfolders in parallel return folder.content_directories if folder else None
tasks = [
self.sharepoint_manager.process_subfolder(content_folder_name, sp_folder)
for sp_folder in sp_folders
]
try:
folder.content_directories = await asyncio.gather(*tasks)
except Exception as e:
raise RuntimeError(f"Failed to process subfolders: {e}")
# Save folder metadata
folder.udpate_time = current_time
folder.valid_thru = current_time + self.expiry_time
await folder.save()
return folder
async def retrieve_content_as_media_data(self, document_id: str) -> Optional[str]: async def retrieve_content_as_media_data(self, document_id: str) -> Optional[str]:
document_manager = DocumentManager() document_manager = DocumentManager()

View File

@ -1,12 +1,16 @@
import asyncio from typing import Dict, List, Optional
from datetime import datetime, timezone, timedelta
from typing import List, Dict
from backend.sharepoint.sharepoint_graph_client import SharePointGraphClient
from backend.document.document_manager import DocumentManager
from backend.content.models import ContentDirectory, ContentFolderDoc
from backend.content.constants import ContentFileConstants, ContentSource
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
from datetime import datetime, timedelta, timezone
from backend.sharepoint.sharepoint_graph_client import SharePointGraphClient
from common.constants.region import UserRegion
from backend.document.document_manager import DocumentManager
from backend.content.constants import (
ContentSource,
ContentMediaType,
ContentDataFormat,
ContentFileConstants,
)
from backend.content.models import ContentDirectory, ContentFolderDoc
class ContentSharePointManager: class ContentSharePointManager:
@ -23,76 +27,127 @@ class ContentSharePointManager:
) )
self.share_point_file_expiry = timedelta(hours=24) self.share_point_file_expiry = timedelta(hours=24)
def list_sub_folders(self, folder_name: str) -> List[dict]: def __generate_created__by__(self, folder_name):
""" return "content-service-" + folder_name.replace("/", "-").lower()
Fetches the subfolders under the specified folder from SharePoint.
"""
try:
# Use SharePointGraphClient to list subfolders
return self.sharepoint_client.list_sub_folders(folder_name)
except Exception as e:
raise ValueError(f"Failed to list subfolders for {folder_name}: {e}")
async def process_subfolder( async def retrieve_directories_for_folder(
self, content_folder_name: str, sp_folder: dict self, folder_name: str, region: UserRegion
) -> ContentDirectory: ):
content_directory = ContentDirectory(content_name=sp_folder["name"]) content_folder_name = folder_name + "/" + region.name
sp_files = self.sharepoint_client.list_files( sp_folders = self.sharepoint_client.list_sub_folders(content_folder_name)
f"{content_folder_name}/{sp_folder['name']}" current_time = datetime.now(timezone.utc)
folder = await ContentFolderDoc.find_one(
ContentFolderDoc.folder_name == content_folder_name
) )
# Process files in parallel if folder is None:
tasks = [ folder = ContentFolderDoc(
self.__process_file(file, content_directory, content_folder_name) folder_name=folder_name,
for file in sp_files content_directories=[],
] udpate_time=current_time,
await asyncio.gather(*tasks) update_source=ContentSource.SHAREPOINT,
valid_thru=current_time + self.share_point_file_expiry,
return content_directory region=region,
async def __process_file(
self, sp_file: dict, content_directory: ContentDirectory, folder_name: str
):
document_manager = DocumentManager()
file_content = self.sharepoint_client.get_file_content(sp_file["id"])
if sp_file["name"].lower() == ContentFileConstants.COVER_FILE_NAME.lower():
content_directory.cover_document_id = (
await document_manager.save_document_file(
self.__generate_created_by(folder_name),
sp_file["name"],
file_content,
)
)
elif sp_file["name"].lower() == ContentFileConstants.SUMMARY_FILE_NAME.lower():
content_directory.summary_text = file_content
elif sp_file["name"].lower() == ContentFileConstants.TITLE_FILE_NAME.lower():
content_directory.title_text = file_content
elif (
sp_file["name"].lower()
== ContentFileConstants.CONTENT_HTML_FILE_NAME.lower()
):
content_directory.content_html = self.sharepoint_client.get_file_content(
sp_file["id"]
)
elif (
sp_file["name"].lower()
== ContentFileConstants.CONTENT_TEXT_FILE_NAME.lower()
):
content_directory.content_text = self.sharepoint_client.get_file_content(
sp_file["id"]
)
elif (
sp_file["name"].lower()
== ContentFileConstants.CONTENT_PDF_FILE_NAME.lower()
):
content_directory.content_document_id = (
await document_manager.save_document_file(
self.__generate_created_by(folder_name),
sp_file["name"],
file_content,
)
) )
else:
folder.content_directories.clear()
def __generate_created_by(self, folder_name: str) -> str: for sp_folder in sp_folders:
return f"content-service-{folder_name.replace('/', '-').lower()}" content_directory = ContentDirectory(
content_name=sp_folder["name"],
cover_document_id=None,
summary_text=None,
title_text=None,
content_link=None,
content_document_id=None,
)
sp_files = self.sharepoint_client.list_files(
content_folder_name + "/" + sp_folder["name"]
)
for sp_file in sp_files:
if (
sp_file["name"].lower()
== ContentFileConstants.COVER_FILE_NAME.lower()
):
cover_file_content = self.sharepoint_client.get_file_content(
sp_file["id"]
)
cover_document_manager = DocumentManager()
file_name = sp_file["name"].lower()
created_by = self.__generate_created__by__(folder_name=folder_name)
content_directory.cover_document_id = (
await cover_document_manager.save_document_file(
created_by, file_name, cover_file_content
)
)
elif (
sp_file["name"].lower()
== ContentFileConstants.SUMMARY_FILE_NAME.lower()
):
content_directory.summary_text = (
self.sharepoint_client.get_file_content(sp_file["id"])
)
elif (
sp_file["name"].lower()
== ContentFileConstants.TITLE_FILE_NAME.lower()
):
content_directory.title_text = (
self.sharepoint_client.get_file_content(sp_file["id"])
)
elif (
sp_file["name"].lower()
== ContentFileConstants.CONTENT_LINK_FILE_NAME.lower()
):
content_directory.content_link = (
self.sharepoint_client.get_file_content(sp_file["id"])
)
elif (
sp_file["name"].lower()
== ContentFileConstants.CONTENT_HTML_FILE_NAME.lower()
):
content_directory.content_html = (
self.sharepoint_client.get_file_content(sp_file["id"])
)
elif (
sp_file["name"].lower()
== ContentFileConstants.CONTENT_TEXT_FILE_NAME.lower()
):
content_directory.content_text = (
self.sharepoint_client.get_file_content(sp_file["id"])
)
elif (
sp_file["name"].lower()
== ContentFileConstants.CONTENT_PDF_FILE_NAME.lower()
):
content_file_content = self.sharepoint_client.get_file_content(
sp_file["id"]
)
content_document_manager = DocumentManager()
file_name = sp_file["name"]
created_by = self.__generate_created__by__(folder_name=folder_name)
content_directory.content_document_id = (
await content_document_manager.save_document_file(
created_by, file_name, content_file_content
)
)
folder.content_directories.append(content_directory)
folder.udpate_time = current_time
folder.update_source = ContentSource.SHAREPOINT
folder.valid_thru = current_time + self.share_point_file_expiry
await folder.save()
async def retrieve_directorys_for_all_folders(self):
current_time = datetime.now(timezone.utc)
folders = await ContentFolderDoc.find(
ContentFolderDoc.update_source == ContentSource.SHAREPOINT,
ContentFolderDoc.valid_thru < current_time,
).to_list()
for folder in folders:
await self.retrieve_directories_for_folder(
folder.folder_name, folder.region
)

View File

@ -1,16 +1,17 @@
from backend.content.content_sharepoint_manager import ContentSharePointManager from backend.content.content_sharepoint_manager import ContentSharePointManager
from common.constants.region import UserRegion from common.constants.region import UserRegion
from datetime import datetime, timezone, timedelta
from common.log.log_utils import log_entry_exit_async from common.log.log_utils import log_entry_exit_async
from backend.content.content_service import ContentService
from backend.content.models import ContentFolderDoc
from backend.content.constants import ContentSource
class SharePointContentRefresher: class SharePointContentRefresher:
def __init__(self) -> None: def __init__(self) -> None:
self.content_sharepoint_manager = ContentSharePointManager() self.content_sharepoint_manager = ContentSharePointManager()
@log_entry_exit_async @log_entry_exit_async
async def refresh_all_in_database(self) -> None: async def refresh_all_in_database(self) -> None:
await self.content_sharepoint_manager.retrieve_directorys_for_all_folders() await self.content_sharepoint_manager.retrieve_directorys_for_all_folders()
@log_entry_exit_async @log_entry_exit_async
async def refresh_selected(self) -> None: async def refresh_selected(self) -> None:
selected_folders = [ selected_folders = [

View File

@ -1,11 +1,11 @@
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
from .app_settings import app_settings from .app_settings import app_settings
class LogSettings(BaseSettings): class LogSettings(BaseSettings):
LOG_LEVEL: str = "DEBUG" LOG_LEVEL: str = "DEBUG"
LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH
LOG_PATH: str = LOG_BASE_PATH + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log' LOG_PATH: str = LOG_BASE_PATH + "/" + app_settings.BACKEND_LOG_FILE_NAME + ".log"
LOG_RETENTION: str = "14 days" LOG_RETENTION: str = "14 days"
LOG_ROTATION: str = "00:00" # mid night LOG_ROTATION: str = "00:00" # mid night

View File

@ -11,9 +11,7 @@ class LoggerBase:
binded_loggers = {} binded_loggers = {}
logger_lock = threading.Lock() logger_lock = threading.Lock()
def __init__( def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self, logger_name: str, extra_fileds: dict[str, any]
) -> None:
self.__logger_name = logger_name self.__logger_name = logger_name
self.extra_fileds = extra_fileds self.extra_fileds = extra_fileds
with LoggerBase.logger_lock: with LoggerBase.logger_lock:
@ -21,9 +19,7 @@ class LoggerBase:
self.logger = LoggerBase.binded_loggers[self.__logger_name] self.logger = LoggerBase.binded_loggers[self.__logger_name]
return return
log_filename = ( log_filename = log_settings.LOG_BASE_PATH + "/" + self.__logger_name + ".log"
log_settings.LOG_PATH_BASE + "/" + self.__logger_name + ".log"
)
log_retention = log_settings.LOG_RETENTION log_retention = log_settings.LOG_RETENTION
log_rotation = log_settings.LOG_ROTATION log_rotation = log_settings.LOG_ROTATION
log_level = "INFO" log_level = "INFO"
@ -57,14 +53,14 @@ class LoggerBase:
subject: str, subject: str,
event: str, event: str,
properties: dict[str, any], properties: dict[str, any],
text: str = "" text: str = "",
) -> None: ) -> None:
local_logger = self.logger.bind( local_logger = self.logger.bind(
sender_id=sender_id, sender_id=sender_id,
receiver_id=receiver_id, receiver_id=receiver_id,
subject=subject, subject=subject,
event=event, event=event,
properties=properties properties=properties,
) )
local_logger.info(text) local_logger.info(text)
@ -83,7 +79,7 @@ class LoggerBase:
subject=subject, subject=subject,
event="exception", event="exception",
properties=properties, properties=properties,
exception=exception exception=exception,
) )
local_logger.exception(text) local_logger.exception(text)
@ -103,7 +99,7 @@ class LoggerBase:
properties=properties, properties=properties,
) )
local_logger.info(text) local_logger.info(text)
async def log_warning( async def log_warning(
self, self,
sender_id: str, sender_id: str,
@ -120,7 +116,7 @@ class LoggerBase:
properties=properties, properties=properties,
) )
local_logger.warning(text) local_logger.warning(text)
async def log_error( async def log_error(
self, self,
sender_id: str, sender_id: str,

View File

@ -14,6 +14,5 @@ async def create_scheduler() -> AsyncIOScheduler:
@log_entry_exit_async @log_entry_exit_async
async def register_job(scheduler): async def register_job(scheduler):
await init_lock(ScheduleJobLocker.ANALYZE_CODE_DEPOT_JOB_LOCKER) await init_lock(ScheduleJobLocker.REFRESH_SHAREPOINT_CONTENT_JOB_LOCKER)
scheduler.add_job(refresh_sharepoint_content_job, "interval", seconds=(3600 + 3)) scheduler.add_job(refresh_sharepoint_content_job, "interval", seconds=(3600 + 3))

View File

@ -1,8 +1,9 @@
from scheduler.scheduler_manager import create_scheduler
import asyncio import asyncio
def register(app): def register(app):
@app.on_event("startup") @app.on_event("startup")
async def start_scheduler(): async def start_scheduler():
#create your scheduler here scheduler = await create_scheduler()
pass scheduler.start()

View File

@ -16,5 +16,5 @@ router = APIRouter()
) )
# @cache(expire=300) # Cache results for 5 minutes # @cache(expire=300) # Cache results for 5 minutes
async def retrieve_directories_for_folder(folder_name: str, region: UserRegion): async def retrieve_directories_for_folder(folder_name: str, region: UserRegion):
result = await ContentService().retrieve_directories_for_folder(folder_name, region) result = await ContentService().retrieve_content_directories_for_folder(folder_name, region)
return JSONResponse(content=jsonable_encoder(result)) return JSONResponse(content=jsonable_encoder(result))