167 lines
5.9 KiB
Python
167 lines
5.9 KiB
Python
from datetime import datetime as dt, timezone as tz
|
|
from app.central_storage.backend.models.models import DocumentDoc
|
|
from app.central_storage.backend.models.constants import MediaType, DataFormat
|
|
|
|
from infra.exception.exceptions import DoesNotExistError
|
|
from app.central_storage.backend.infra.azure_storage.blob_handler import (
|
|
AzureBlobHandler,
|
|
)
|
|
import base64
|
|
import os
|
|
from re import match
|
|
|
|
|
|
class DocumentService:
|
|
def __init__(self) -> None:
|
|
self.__document_doc = None
|
|
self.blob_manager = AzureBlobHandler()
|
|
return
|
|
|
|
def __normalize_file_name__(file_name: str) -> str:
|
|
# There are some restriction on file naming, while we want to save the client file name
|
|
# so every time we will generate an internal file name
|
|
return file_name.strip("./\\ ")
|
|
|
|
def __retrieve_media_type_from_file_name__(file_name) -> MediaType:
|
|
extension = os.path.splitext(file_name)[1][1:]
|
|
if extension == None or extension == "":
|
|
return MediaType.UNKNOWN
|
|
elif extension.lower() == "png":
|
|
return MediaType.PNG
|
|
elif extension.lower() == "pdf":
|
|
return MediaType.PDF
|
|
elif extension.lower() == "txt":
|
|
return MediaType.TXT
|
|
else:
|
|
return MediaType.UNKNOWN
|
|
|
|
def __get_prefix_for_media_data__(self) -> str:
|
|
return "data:{};base64,".format(self.get_file_media_type_string())
|
|
|
|
def __validate_document_doc(self) -> None:
|
|
if not self.__document_doc:
|
|
raise Exception(
|
|
status_code=500,
|
|
detail="Please call new_document() or load_document() to initialize the internal variables.",
|
|
)
|
|
return
|
|
|
|
def __get_container_name__(self):
|
|
self.__validate_document_doc()
|
|
|
|
return self.__document_doc.created_by
|
|
|
|
def __get_blob_name__(self) -> str:
|
|
self.__validate_document_doc()
|
|
|
|
return self.__document_doc.location + "-" + self.__document_doc.file_name
|
|
|
|
async def load_document(self, document_id: str) -> None:
|
|
document = await DocumentDoc.find_one(DocumentDoc.document_id == document_id)
|
|
if not document:
|
|
raise DoesNotExistError(
|
|
f"Cannot find record with document id {document_id}"
|
|
)
|
|
self.__document_doc = document
|
|
return
|
|
|
|
async def new_document(
|
|
self,
|
|
file_name: str,
|
|
media_type: MediaType,
|
|
data_format: DataFormat,
|
|
created_by: str,
|
|
) -> None:
|
|
# We just do some simple check here. For detailed requirement, please read
|
|
# https://learn.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata
|
|
if not bool(match("^[a-z0-9\-]{4,64}$", created_by)):
|
|
raise ValueError(
|
|
"invalid 'created_by':'{}'. It can contain lower case alphabet, digits, and hyphens with length between 4 and 64".format(
|
|
created_by
|
|
)
|
|
)
|
|
|
|
document = DocumentDoc(
|
|
document_id=None,
|
|
location=None,
|
|
file_name=DocumentService.__normalize_file_name__(file_name),
|
|
created_by=created_by,
|
|
create_time=dt.now(tz.utc),
|
|
updated_by=created_by,
|
|
update_time=dt.now(tz.utc),
|
|
version_number=1,
|
|
media_type=(
|
|
DocumentService.__retrieve_media_type_from_file_name__(file_name)
|
|
if media_type == MediaType.UNKNOWN
|
|
else media_type
|
|
),
|
|
data_format=data_format,
|
|
)
|
|
|
|
new_document = await document.create()
|
|
|
|
# Set document_id and location
|
|
location = "{}-{}".format(created_by, new_document.id)
|
|
new_document.document_id = str(new_document.id)
|
|
new_document.location = location
|
|
await new_document.save()
|
|
|
|
self.__document_doc = new_document
|
|
return
|
|
|
|
async def save_document_file(self, file_raw: bytes) -> str:
|
|
self.__validate_document_doc()
|
|
await self.blob_manager.upload_blob(
|
|
container_name=self.__get_container_name__(),
|
|
blob_name=self.__get_blob_name__(),
|
|
blob_data=file_raw,
|
|
)
|
|
return self.__document_doc.document_id
|
|
|
|
async def remove_document_file(self) -> None:
|
|
self.__validate_document_doc()
|
|
|
|
self.__document_doc.is_deleted = True
|
|
self.__document_doc.save()
|
|
await self.blob_manager.delete_blob(
|
|
container_name=self.__get_container_name__(),
|
|
blob_name=self.__get_blob_name__(),
|
|
)
|
|
|
|
async def read_document_file_as_http_media_data(self) -> str:
|
|
self.__validate_document_doc()
|
|
raw_data = await self.blob_manager.fetch_blob_data_as_bytes(
|
|
container_name=self.__get_container_name__(),
|
|
file_name=self.__get_blob_name__(),
|
|
)
|
|
base64_data = base64.b64encode(raw_data).decode()
|
|
return self.__get_prefix_for_media_data__() + base64_data
|
|
|
|
async def fetch_document_file_as_http_download(self) -> str:
|
|
self.__validate_document_doc()
|
|
|
|
return await self.blob_manager.generate_download_url(
|
|
container_name=self.__get_container_name__(),
|
|
file_name=self.__get_blob_name__(),
|
|
)
|
|
|
|
def get_file_name(self) -> str:
|
|
self.__validate_document_doc()
|
|
|
|
return self.__document_doc.file_name
|
|
|
|
def get_file_media_type_string(self) -> str:
|
|
self.__validate_document_doc()
|
|
media_type = "application/octet-stream"
|
|
if self.__document_doc.media_type == MediaType.PNG:
|
|
media_type = "image/png"
|
|
elif self.__document_doc.media_type == MediaType.PDF:
|
|
media_type = "application/pdf"
|
|
elif self.__document_doc.media_type == MediaType.TXT:
|
|
media_type = "text/plain"
|
|
return media_type
|
|
|
|
def get_document_id(self) -> str:
|
|
self.__validate_document_doc()
|
|
return self.__document_doc.document_id
|