from datetime import datetime as dt, timezone as tz from app.central_storage.backend.models.models import DocumentDoc from app.central_storage.backend.models.constants import MediaType, DataFormat from infra.exception.exceptions import DoesNotExistError from app.central_storage.backend.infra.azure_storage.blob_manager import ( AzureBlobManager, ) import base64 import os from re import match class DocumentService: def __init__(self) -> None: self.__document_doc = None self.blob_manager = AzureBlobManager() return def __normalize_file_name__(file_name: str) -> str: # There are some restriction on file naming, while we want to save the client file name # so every time we will generate an internal file name return file_name.strip("./\\ ") def __retrieve_media_type_from_file_name__(file_name) -> MediaType: extension = os.path.splitext(file_name)[1][1:] if extension == None or extension == "": return MediaType.UNKNOWN elif extension.lower() == "png": return MediaType.PNG elif extension.lower() == "pdf": return MediaType.PDF elif extension.lower() == "txt": return MediaType.TXT else: return MediaType.UNKNOWN def __get_prefix_for_media_data__(self) -> str: return "data:{};base64,".format(self.get_file_media_type_string()) def __validate_document_doc(self) -> None: if not self.__document_doc: raise Exception( status_code=500, detail="Please call new_document() or load_document() to initialize the internal variables.", ) return def __get_container_name__(self): self.__validate_document_doc() return self.__document_doc.created_by def __get_blob_name__(self) -> str: self.__validate_document_doc() return self.__document_doc.location + "-" + self.__document_doc.file_name async def load_document(self, document_id: str) -> None: document = await DocumentDoc.find_one(DocumentDoc.document_id == document_id) if not document: raise DoesNotExistError( "Cannot find record with document id {document_id}".format( self.document_id ), ) self.__document_doc = document return async def new_document( self, file_name: str, media_type: MediaType, data_format: DataFormat, created_by: str, ) -> None: # We just do some simple check here. For detailed requirement, please read # https://learn.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata if not bool(match("^[a-z0-9\-]{4,64}$", created_by)): raise ValueError( "invalid 'created_by':'{}'. It can contain lower case alphabet, digits, and hyphens with length between 4 and 64".format( created_by ) ) document = DocumentDoc( document_id=None, location=None, file_name=DocumentManager.__normalize_file_name__(file_name), created_by=created_by, create_time=dt.now(tz.utc), updated_by=created_by, update_time=dt.now(tz.utc), version_number=1, media_type=( DocumentManager.__retrieve_media_type_from_file_name__(file_name) if media_type == MediaType.UNKNOWN else media_type ), data_format=data_format, ) new_document = await document.create() # Set document_id and location location = "{}-{}".format(created_by, new_document.id) new_document.document_id = str(new_document.id) new_document.location = location await new_document.save() self.__document_doc = new_document return async def save_document_file(self, file_raw: bytes) -> str: self.__validate_document_doc() await self.blob_manager.upload_blob( container_name=self.__get_container_name__(), blob_name=self.__get_blob_name__(), blob_data=file_raw, ) return self.__document_doc.document_id async def remove_document_file(self) -> None: self.__validate_document_doc() self.__document_doc.is_deleted = True self.__document_doc.save() await self.blob_manager.delete_blob( container_name=self.__get_container_name__(), blob_name=self.__get_blob_name__(), ) async def read_document_file_as_http_media_data(self) -> str: self.__validate_document_doc() raw_data = await self.blob_manager.fetch_blob_data_as_bytes( container_name=self.__get_container_name__(), file_name=self.__get_blob_name__(), ) base64_data = base64.b64encode(raw_data).decode() return self.__get_prefix_for_media_data__() + base64_data async def fetch_document_file_as_http_download(self) -> str: self.__validate_document_doc() return await self.blob_manager.generate_download_url( container_name=self.__get_container_name__(), file_name=self.__get_blob_name__(), ) def get_file_name(self) -> str: self.__validate_document_doc() return self.__document_doc.file_name def get_file_media_type_string(self) -> str: self.__validate_document_doc() media_type = "application/octet-stream" if self.__document_doc.media_type == MediaType.PNG: media_type = "image/png" elif self.__document_doc.media_type == MediaType.PDF: media_type = "application/pdf" elif self.__document_doc.media_type == MediaType.TXT: media_type = "text/plain" return media_type def get_document_id(self) -> str: self.__validate_document_doc() return self.__document_doc.document_id