Initial check-in for central_storage

This commit is contained in:
jetli 2024-10-19 05:37:34 +00:00
parent 9defc78732
commit c25252ceda
39 changed files with 1196 additions and 25 deletions

View File

@ -0,0 +1,19 @@
# Dockerfile for Python Service
FROM python:3.11-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt to the working directory and install dependencies
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# Copy the application code to the working directory
COPY . /app
# Expose the port used by the FastAPI app
EXPOSE 8005
# Run the application using the start script
CMD ["uvicorn", "webapi.main:app", "--reload", "--port=8003", "--host=0.0.0.0"]

View File

@ -0,0 +1,15 @@
from app.central_storage.backend.business.document_manager import (
DocumentBusinessManager,
)
class DocumentHub:
def __init__(self, user_id: str):
self.user_id = user_id
self.document_business_manager = DocumentBusinessManager(self.user_id)
return
async def get_document_by_id(self, document_id: str):
return await self.document_business_manager.get_document_details_by_id(
document_id
)

View File

@ -0,0 +1,19 @@
from app.central_storage.backend.services.document_service import DocumentService
class DocumentBusinessManager:
def __init__(self, user_id) -> None:
self.user_id = user_id
self.document_service = DocumentService()
async def get_document_details_by_id(self, document_id: str):
await self.document_service.load_document(document_id=document_id)
download_link = (
await self.document_service.fetch_document_file_as_http_download()
)
file_name = self.document_service.get_file_name()
return {
"file_name": file_name,
"file_download_url": download_link,
}

View File

@ -0,0 +1,124 @@
from app.central_storage.common.config.app_settings import app_settings
import datetime
import asyncio
from azure.storage.blob.aio import (
BlobServiceClient,
ContainerClient,
BlobClient,
)
from azure.storage.blob import (
BlobSasPermissions,
UserDelegationKey,
generate_container_sas,
generate_blob_sas,
)
from azure.core.exceptions import ResourceExistsError
class AzureBlobManager:
def __init__(self) -> None:
pass
def __create_user_delegation_sas_token__(
self,
blob_client: BlobClient,
user_delegation_key: UserDelegationKey,
valid_minutes: int,
):
# Create a SAS token that's valid for one day, as an example
start_time = datetime.datetime.now(datetime.timezone.utc)
expiry_time = start_time + datetime.timedelta(minutes=valid_minutes)
sas_token = generate_blob_sas(
account_name=blob_client.account_name,
container_name=blob_client.container_name,
blob_name=blob_client.blob_name,
user_delegation_key=user_delegation_key,
permission=BlobSasPermissions(read=True),
expiry=expiry_time,
start=start_time,
)
return sas_token
async def create_container(self, container_name: str):
try:
async with BlobServiceClient(
app_settings.AZURE_STORAGE_DOCUMENT_API_ENDPOINT,
credential=app_settings.AZURE_STORAGE_DOCUMENT_API_KEY,
) as blob_service_client:
await blob_service_client.create_container(name=container_name)
except ResourceExistsError:
# swallow the error for the resource existing error
pass
async def delete_container(self, container_name: str):
raise NotImplementedError("Deleting container is not supported")
async def upload_blob(self, container_name: str, blob_name: str, blob_data: bytes):
async with ContainerClient(
app_settings.AZURE_STORAGE_DOCUMENT_API_ENDPOINT,
credential=app_settings.AZURE_STORAGE_DOCUMENT_API_KEY,
container_name=container_name,
) as container_client:
if not await container_client.exists():
await container_client.create_container()
await container_client.upload_blob(
name=blob_name,
data=blob_data,
length=len(blob_data),
blob_type="BlockBlob",
overwrite=True,
)
async def delete_blob(self, container_name: str, blob_name: str):
async with BlobServiceClient(
app_settings.AZURE_STORAGE_DOCUMENT_API_ENDPOINT,
credential=app_settings.AZURE_STORAGE_DOCUMENT_API_KEY,
) as blob_service_client:
blob_client = await blob_service_client.get_blob_client(
container=container_name, blob=blob_name
)
await blob_client.delete_blob()
async def generate_download_url(self, container_name: str, file_name: str):
token_valid_minutes = 10
key_start_time = datetime.datetime.now(datetime.timezone.utc)
key_expiry_time = key_start_time + datetime.timedelta(
minutes=token_valid_minutes
)
async with BlobServiceClient(
app_settings.AZURE_STORAGE_DOCUMENT_API_ENDPOINT,
credential=app_settings.AZURE_STORAGE_DOCUMENT_API_KEY,
) as blob_service_client:
blob_client = blob_service_client.get_blob_client(
container=container_name, blob=file_name
)
sas_token = generate_blob_sas(
account_name=blob_client.account_name,
container_name=blob_client.container_name,
blob_name=blob_client.blob_name,
# user_delegation_key=user_delegation_key,
account_key=app_settings.AZURE_STORAGE_DOCUMENT_API_KEY,
permission=BlobSasPermissions(read=True),
expiry=key_expiry_time,
start=key_start_time,
)
sas_url = f"{blob_client.url}?{sas_token}"
return sas_url
async def fetch_blob_data_as_bytes(
self, container_name: str, file_name: str
) -> bytes:
async with BlobServiceClient(
app_settings.AZURE_STORAGE_DOCUMENT_API_ENDPOINT,
credential=app_settings.AZURE_STORAGE_DOCUMENT_API_KEY,
) as blob_service_client:
blob_client = blob_service_client.get_blob_client(
container=container_name, blob=file_name
)
downloader = await blob_client.download_blob(max_concurrency=1)
blob_bytes = await downloader.readall()
return blob_bytes

View File

@ -0,0 +1,13 @@
from enum import IntEnum
class MediaType(IntEnum):
UNKNOWN = 0
PDF = 1
PNG = 2
TXT = 3
class DataFormat(IntEnum):
RAW = 0
BASED64 = 1

View File

@ -0,0 +1,22 @@
from datetime import datetime
from typing import Optional
from beanie import Document
from .constants import MediaType, DataFormat
class DocumentDoc(Document):
document_id: Optional[str]
file_name: str
created_by: str
create_time: datetime
updated_by: str
update_time: datetime
location: Optional[str]
version_number: int
media_type: MediaType
data_format: DataFormat
is_deleted: Optional[bool] = False
class Settings:
name = "document"

View File

@ -0,0 +1,168 @@
from datetime import datetime as dt, timezone as tz
from app.central_storage.backend.models.models import DocumentDoc
from app.central_storage.backend.models.constants import MediaType, DataFormat
from infra.exception.exceptions import DoesNotExistError
from app.central_storage.backend.infra.azure_storage.blob_manager import (
AzureBlobManager,
)
import base64
import os
from re import match
class DocumentService:
def __init__(self) -> None:
self.__document_doc = None
self.blob_manager = AzureBlobManager()
return
def __normalize_file_name__(file_name: str) -> str:
# There are some restriction on file naming, while we want to save the client file name
# so every time we will generate an internal file name
return file_name.strip("./\\ ")
def __retrieve_media_type_from_file_name__(file_name) -> MediaType:
extension = os.path.splitext(file_name)[1][1:]
if extension == None or extension == "":
return MediaType.UNKNOWN
elif extension.lower() == "png":
return MediaType.PNG
elif extension.lower() == "pdf":
return MediaType.PDF
elif extension.lower() == "txt":
return MediaType.TXT
else:
return MediaType.UNKNOWN
def __get_prefix_for_media_data__(self) -> str:
return "data:{};base64,".format(self.get_file_media_type_string())
def __validate_document_doc(self) -> None:
if not self.__document_doc:
raise Exception(
status_code=500,
detail="Please call new_document() or load_document() to initialize the internal variables.",
)
return
def __get_container_name__(self):
self.__validate_document_doc()
return self.__document_doc.created_by
def __get_blob_name__(self) -> str:
self.__validate_document_doc()
return self.__document_doc.location + "-" + self.__document_doc.file_name
async def load_document(self, document_id: str) -> None:
document = await DocumentDoc.find_one(DocumentDoc.document_id == document_id)
if not document:
raise DoesNotExistError(
"Cannot find record with document id {document_id}".format(
self.document_id
),
)
self.__document_doc = document
return
async def new_document(
self,
file_name: str,
media_type: MediaType,
data_format: DataFormat,
created_by: str,
) -> None:
# We just do some simple check here. For detailed requirement, please read
# https://learn.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata
if not bool(match("^[a-z0-9\-]{4,64}$", created_by)):
raise ValueError(
"invalid 'created_by':'{}'. It can contain lower case alphabet, digits, and hyphens with length between 4 and 64".format(
created_by
)
)
document = DocumentDoc(
document_id=None,
location=None,
file_name=DocumentManager.__normalize_file_name__(file_name),
created_by=created_by,
create_time=dt.now(tz.utc),
updated_by=created_by,
update_time=dt.now(tz.utc),
version_number=1,
media_type=(
DocumentManager.__retrieve_media_type_from_file_name__(file_name)
if media_type == MediaType.UNKNOWN
else media_type
),
data_format=data_format,
)
new_document = await document.create()
# Set document_id and location
location = "{}-{}".format(created_by, new_document.id)
new_document.document_id = str(new_document.id)
new_document.location = location
await new_document.save()
self.__document_doc = new_document
return
async def save_document_file(self, file_raw: bytes) -> str:
self.__validate_document_doc()
await self.blob_manager.upload_blob(
container_name=self.__get_container_name__(),
blob_name=self.__get_blob_name__(),
blob_data=file_raw,
)
return self.__document_doc.document_id
async def remove_document_file(self) -> None:
self.__validate_document_doc()
self.__document_doc.is_deleted = True
self.__document_doc.save()
await self.blob_manager.delete_blob(
container_name=self.__get_container_name__(),
blob_name=self.__get_blob_name__(),
)
async def read_document_file_as_http_media_data(self) -> str:
self.__validate_document_doc()
raw_data = await self.blob_manager.fetch_blob_data_as_bytes(
container_name=self.__get_container_name__(),
file_name=self.__get_blob_name__(),
)
base64_data = base64.b64encode(raw_data).decode()
return self.__get_prefix_for_media_data__() + base64_data
async def fetch_document_file_as_http_download(self) -> str:
self.__validate_document_doc()
return await self.blob_manager.generate_download_url(
container_name=self.__get_container_name__(),
file_name=self.__get_blob_name__(),
)
def get_file_name(self) -> str:
self.__validate_document_doc()
return self.__document_doc.file_name
def get_file_media_type_string(self) -> str:
self.__validate_document_doc()
media_type = "application/octet-stream"
if self.__document_doc.media_type == MediaType.PNG:
media_type = "image/png"
elif self.__document_doc.media_type == MediaType.PDF:
media_type = "application/pdf"
elif self.__document_doc.media_type == MediaType.TXT:
media_type = "text/plain"
return media_type
def get_document_id(self) -> str:
self.__validate_document_doc()
return self.__document_doc.document_id

View File

@ -0,0 +1,18 @@
import os
from pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
NAME: str = "central_storage"
AZURE_STORAGE_DOCUMENT_API_ENDPOINT: str = (
"https://freeleaps1document.blob.core.windows.net/"
)
AZURE_STORAGE_DOCUMENT_API_KEY: str = ""
class Config:
env_file = ".myapp.env"
env_file_encoding = "utf-8"
app_settings = AppSettings()

View File

@ -0,0 +1,17 @@
class LogSettings():
LOG_LEVEL: str = "DEBUG"
LOG_PATH_BASE: str = (
"./logs"
)
LOG_PATH: str = LOG_PATH_BASE + '/' + "app" + '.log'
LOG_RETENTION: str = "14 days"
LOG_ROTATION: str = "00:00" # mid night
class Config:
env_file = ".log.env"
env_file_encoding = "utf-8"
log_settings = LogSettings()

View File

@ -0,0 +1,3 @@
pika==1.3.2
fastapi
uvicorn

View File

@ -0,0 +1,4 @@
#!/bin/sh
# This script starts the FastAPI server using uvicorn
uvicorn webapi.bootstrap.application:create_app --reload --host 0.0.0.0 --port 8005

View File

@ -0,0 +1,75 @@
import logging
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from webapi.providers import common
from webapi.providers import logger
from webapi.providers import router
from webapi.providers import database
from webapi.providers import scheduler
from webapi.providers import exception_handler
from .freeleaps_app import FreeleapsApp
def create_app() -> FastAPI:
logging.info("App initializing")
app = FreeleapsApp()
register(app, exception_handler)
register(app, database)
register(app, logger)
register(app, router)
register(app, scheduler)
register(app, common)
# Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app)
return app
# This function overrides the OpenAPI schema version to 3.0.0
def customize_openapi_security(app: FastAPI) -> None:
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
# Generate OpenAPI schema
openapi_schema = get_openapi(
title="FreeLeaps API",
version="3.1.0",
description="FreeLeaps API Documentation",
routes=app.routes,
)
# Ensure the components section exists in the OpenAPI schema
if "components" not in openapi_schema:
openapi_schema["components"] = {}
# Add security scheme to components
openapi_schema["components"]["securitySchemes"] = {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
# Add security requirement globally
openapi_schema["security"] = [{"bearerAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
def register(app, provider):
logging.info(provider.__name__ + " registering")
provider.register(app)
def boot(app, provider):
logging.info(provider.__name__ + " booting")
provider.boot(app)

View File

@ -0,0 +1,6 @@
from fastapi import FastAPI
class FreeleapsApp(FastAPI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View File

@ -0,0 +1,25 @@
import os
from pydantic_settings import BaseSettings
class SiteSettings(BaseSettings):
NAME: str = "appname"
DEBUG: bool = True
ENV: str = "dev"
SERVER_HOST: str = "0.0.0.0"
SERVER_PORT: int = 8005
URL: str = "http://localhost"
TIME_ZONE: str = "UTC"
BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__))))
class Config:
env_file = ".devbase-webapi.env"
env_file_encoding = "utf-8"
site_settings = SiteSettings()

View File

@ -0,0 +1,30 @@
from webapi.bootstrap.application import create_app
from webapi.config.site_settings import site_settings
from fastapi.responses import RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from strawberry.fastapi import GraphQLRouter
from strawberry.fastapi.handlers import GraphQLTransportWSHandler, GraphQLWSHandler
import uvicorn
from typing import Any
app = create_app()
@app.get("/", status_code=301)
async def root():
"""
TODO: redirect client to /doc#
"""
return RedirectResponse("docs")
if __name__ == "__main__":
uvicorn.run(app="main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT)
def get_context() -> Any:
# Define your context function. This is where you can set up authentication, database connections, etc.
return {}
def get_root_value() -> Any:
# Define your root value function. This can be used to customize the root value for GraphQL operations.
return {}

View File

@ -0,0 +1,31 @@
from fastapi.middleware.cors import CORSMiddleware
from webapi.config.site_settings import site_settings
def register(app):
app.debug = site_settings.DEBUG
app.title = site_settings.NAME
add_global_middleware(app)
# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.on_event("startup")
def startup():
pass
# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.on_event("shutdown")
def shutdown():
pass
def add_global_middleware(app):
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

View File

@ -0,0 +1,17 @@
from webapi.config.site_settings import site_settings
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
def register(app):
app.debug = site_settings.DEBUG
app.title = site_settings.NAME
@app.on_event("startup")
async def start_database():
await initiate_database()
async def initiate_database():
#init your database here
pass

View File

@ -0,0 +1,39 @@
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.status import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
HTTP_422_UNPROCESSABLE_ENTITY,
HTTP_500_INTERNAL_SERVER_ERROR,
)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": exc.detail},
)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"error": str(exc)},
)
async def exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(exc)},
)
def register(app: FastAPI):
app.add_exception_handler(HTTPException, custom_http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, exception_handler)

View File

@ -0,0 +1,60 @@
import logging
import sys
from loguru import logger
from common.config.log_settings import log_settings
def register(app=None):
level = log_settings.LOG_LEVEL
file_path = log_settings.LOG_PATH
retention = log_settings.LOG_RETENTION
rotation = log_settings.LOG_ROTATION
# intercept everything at the root logger
logging.root.handlers = [InterceptHandler()]
logging.root.setLevel(level)
# remove every other logger's handlers
# and propagate to root logger
for name in logging.root.manager.loggerDict.keys():
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = True
# configure loguru
logger.add(
sink=sys.stdout
)
logger.add(
sink=file_path,
level=level,
retention=retention,
rotation=rotation
)
logger.disable("pika.adapters")
logger.disable("pika.connection")
logger.disable("pika.channel")
logger.disable("pika.callback")
logger.disable("pika.frame")
logger.disable("pika.spec")
logger.disable("aiormq.connection")
logger.disable("urllib3.connectionpool")
class InterceptHandler(logging.Handler):
def emit(self, record):
# Get corresponding Loguru level if it exists
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Find caller from where originated the logged message
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)

View File

@ -0,0 +1,34 @@
from webapi.routes import api_router
from starlette import routing
def register(app):
app.include_router(
api_router,
prefix="/api",
tags=["api"],
dependencies=[],
responses={404: {"description": "no page found"}},
)
if app.debug:
for route in app.routes:
if not isinstance(route, routing.WebSocketRoute):
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"methods": route.methods,
}
)
else:
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"type": "web socket route",
}
)

View File

@ -0,0 +1,8 @@
import asyncio
def register(app):
@app.on_event("startup")
async def start_scheduler():
#create your scheduler here
pass

View File

@ -0,0 +1,5 @@
from fastapi import APIRouter
api_router = APIRouter()
websocket_router = APIRouter()

View File

@ -0,0 +1,15 @@
from fastapi.routing import APIRoute
from starlette import routing
def post_process_router(app) -> None:
"""
Simplify operation IDs so that generated API clients have simpler function
names.
Should be called only after all routes have been added.
"""
for route in app.routes:
if isinstance(route, APIRoute):
if hasattr(route, "operation_id"):
route.operation_id = route.name # in this case, 'read_items'

View File

@ -0,0 +1,34 @@
from fastapi import APIRouter, Security, HTTPException
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi_jwt import JwtAuthorizationCredentials
from backend.infra.authentication.auth import access_security
from app.central_storage.backend.application.document_app import DocumentHub
router = APIRouter()
# Web API
# Fetch document by ID
@router.get(
"/get-document-by-id/{document_id}",
operation_id="get-document-by-id",
summary="Fetch a document by its ID",
description="Retrieve a specific document by its document ID and return file name and download URL",
response_description="The document details including file name and download URL",
)
async def get_document_by_id(
document_id: str,
credentials: JwtAuthorizationCredentials = Security(access_security),
):
user_id = credentials["id"]
# Fetch the document using DocumentHub
document = await DocumentHub(user_id).get_document_by_id(document_id)
# If document is not found, raise 404 error
if not document:
raise HTTPException(status_code=404, detail="Document not found")
# Return the document details
return JSONResponse(content=jsonable_encoder({"document": document}))

View File

@ -1,32 +1,34 @@
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # RabbitMQ communication port
- "15672:15672" # RabbitMQ management port
networks:
- rabbitmq_network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:15672"]
interval: 30s
retries: 5
start_period: 10s
timeout: 10s
# rabbitmq:
# image: rabbitmq:3-management
# ports:
# - "5672:5672" # RabbitMQ communication port
# - "15672:15672" # RabbitMQ management port
# networks:
# - freeleaps_service_hub_network
# healthcheck:
# test: [ "CMD", "curl", "-f", "http://localhost:15672" ]
# interval: 30s
# retries: 5
# start_period: 10s
# timeout: 10s
python_service:
central_storage:
build:
context: .
context: app/central_storage
dockerfile: Dockerfile
depends_on:
rabbitmq:
condition: service_healthy
restart: always
environment:
RABBITMQ_HOST: rabbitmq
- AZURE_STORAGE_DOCUMENT_API_KEY=${AZURE_STORAGE_DOCUMENT_API_KEY}
ports:
- "8005:8005" # Map the central_storage service port
networks:
- rabbitmq_network
- freeleaps_service_hub_network
env_file:
- sites/central_storage/.env
volumes:
- .:/app # Mount the current directory to /app in the container
networks:
rabbitmq_network:
driver: bridge
freeleaps_service_hub_network:
driver: bridge

View File

@ -0,0 +1,15 @@
import os
from pydantic_settings import BaseSettings
class AppSettings():
NAME: str = "myapp"
class Config:
env_file = ".myapp.env"
env_file_encoding = "utf-8"
APPLICATION_ACTIVITY_LOG: str = "myapp-application-activity"
USER_ACTIVITY_LOG: str = "myapp-user-activity"
BUSINESS_METRIC_LOG: str = "myapp-business-metrics"
app_settings = AppSettings()

View File

@ -0,0 +1,17 @@
class LogSettings():
LOG_LEVEL: str = "DEBUG"
LOG_PATH_BASE: str = (
"./logs"
)
LOG_PATH: str = LOG_PATH_BASE + '/' + "app" + '.log'
LOG_RETENTION: str = "14 days"
LOG_ROTATION: str = "00:00" # mid night
class Config:
env_file = ".log.env"
env_file_encoding = "utf-8"
log_settings = LogSettings()

View File

View File

@ -0,0 +1,23 @@
class DoesNotExistError(Exception):
def __init__(self, message: str = "Does Not Exist"):
self.message = message
class AuthenticationError(Exception):
def __init__(self, message: str = "Unauthorized"):
self.message = message
class AuthorizationError(Exception):
def __init__(self, message: str = "Forbidden"):
self.message = message
class InvalidOperationError(Exception):
def __init__(self, message: str = "Invalid Operation"):
self.message = message
class InvalidDataError(Exception):
def __init__(self, message: str = "Invalid Data"):
self.message = message

0
infra/log/__init__.py Normal file
View File

View File

@ -0,0 +1,14 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if application_activities:
extra_fileds.update(application_activities)
super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fileds=extra_fileds,
)

139
infra/log/base_logger.py Normal file
View File

@ -0,0 +1,139 @@
from loguru import logger as guru_logger
from common.config.log_settings import log_settings
from typing import List
import socket
import json
import logging
import threading
class LoggerBase:
binded_loggers = {}
logger_lock = threading.Lock()
def __init__(
self, logger_name: str, extra_fileds: dict[str, any]
) -> None:
self.__logger_name = logger_name
self.extra_fileds = extra_fileds
with LoggerBase.logger_lock:
if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name]
return
log_filename = (
log_settings.LOG_PATH_BASE + "/" + self.__logger_name + ".log"
)
log_retention = log_settings.LOG_RETENTION
log_rotation = log_settings.LOG_ROTATION
log_level = "INFO"
log_message_format = "{message}"
guru_logger.add(
sink=log_filename,
level=log_level,
retention=log_retention,
rotation=log_rotation,
format=log_message_format,
serialize=True,
filter=lambda record: "extra" in record
and "topic" in record["extra"]
and record["extra"]["topic"] == self.__logger_name,
)
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
self.logger = guru_logger.bind(
topic=self.__logger_name,
host_ip=host_ip,
host_name=host_name,
)
with LoggerBase.logger_lock:
LoggerBase.binded_loggers[self.__logger_name] = self.logger
async def log_event(
self,
sender_id: str,
receiver_id: str,
subject: str,
event: str,
properties: dict[str, any],
text: str = ""
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event=event,
properties=properties
)
local_logger.info(text)
async def log_exception(
self,
sender_id: str,
receiver_id: str,
subject: str,
exception: Exception,
text: str = "",
properties: dict[str, any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="exception",
properties=properties,
exception=exception
)
local_logger.exception(text)
async def log_info(
self,
sender_id: str,
receiver_id: str,
subject: str,
text: str = "",
properties: dict[str, any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="information",
properties=properties,
)
local_logger.info(text)
async def log_warning(
self,
sender_id: str,
receiver_id: str,
subject: str,
text: str = "",
properties: dict[str, any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="warning",
properties=properties,
)
local_logger.warning(text)
async def log_error(
self,
sender_id: str,
receiver_id: str,
subject: str,
text: str = "",
properties: dict[str, any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="error",
properties=properties,
)
local_logger.error(text)

View File

@ -0,0 +1,25 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class BusinessMetricLogger(LoggerBase):
def __init__(self, business_metrics: dict[str, any] = {}) -> None:
extra_fileds = {}
if business_metrics:
extra_fileds.update(business_metrics)
super().__init__(
logger_name=app_settings.BUSINESS_METRIC_LOG,
extra_fileds=extra_fileds,
)
async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None:
return await super().log_event(
sender_id="business_metric_manager",
receiver_id="business_metric_logger",
subject="metrics",
event="logging",
properties=business_metrics,
text="business metric logged"
)

View File

@ -0,0 +1,50 @@
from .application_logger import ApplicationLogger
class FunctionLogger(ApplicationLogger):
def __init__(self, sender_id: str, receiver_id:str) -> None:
super().__init__()
self.event_sender_id = sender_id
self.event_receiver_id = receiver_id
self.event_subject = "function"
async def log_enter(self, function: str, file: str):
return await super().log_event(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
event="enter",
properties={
"function": function,
"file": file,
},
text="Enter:{} of {}".format(function, file)
)
async def log_exit(self, function: str, file: str, excution_time_in_ns: int):
return await super().log_event(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
event="exit",
properties={
"function": function,
"file": file,
"excution_time_in_ns": excution_time_in_ns
},
text="Exit:{} of {}".format(function, file)
)
async def log_exception(self, exception: Exception, function: str, file: str, excution_time_in_ns: int) -> None:
return await super().log_exception(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
exception=exception,
text="Exception:{} of {}".format(function, file),
properties={
"function": function,
"file": file,
"excution_time_in_ns": excution_time_in_ns
},
)

25
infra/log/log_utils.py Normal file
View File

@ -0,0 +1,25 @@
import os
from .function_logger import FunctionLogger
import time
import functools
def log_entry_exit_async(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
file_path = os.path.relpath(func.__code__.co_filename)
function_logger = FunctionLogger(sender_id="log_entry_exit_async", receiver_id="function_logger")
start_time = time.process_time_ns()
try:
await function_logger.log_enter(func.__name__, file_path)
result = await func(*args, **kwargs)
await function_logger.log_exit(func.__name__, file_path, time.process_time_ns() - start_time)
return result
except Exception as exception:
await function_logger.log_exception(
exception=exception,
function=func.__name__,
file=file_path,
excution_time_in_ns=time.process_time_ns() - start_time)
raise
return wrapper

View File

@ -0,0 +1,46 @@
from .application_logger import ApplicationLogger
class ModuleLogger(ApplicationLogger):
def __init__(self, sender_id: str) -> None:
super().__init__()
self.event_sender_id = sender_id
self.event_receiver_id = "ModuleLogger"
self.event_subject = "module"
async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None:
return await super().log_exception(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
exception=exception,
text=text,
properties=properties,
)
async def log_info(self, info: str, properties: dict[str, any] = None) -> None:
return await super().log_info(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=info,
properties=properties,
)
async def log_warning(self, warning: str, properties: dict[str, any] = None) -> None:
return await super().log_warning(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=warning,
properties=properties,
)
async def log_error(self, error: str, properties: dict[str, any] = None) -> None:
return await super().log_error(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=error,
properties=properties,
)

14
infra/log/user_logger.py Normal file
View File

@ -0,0 +1,14 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class UserLogger(LoggerBase):
def __init__(self, user_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if user_activities:
extra_fileds.update(user_activities)
super().__init__(
logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds
)

View File

@ -1 +0,0 @@
pika==1.3.2

View File

@ -0,0 +1 @@
export AZURE_STORAGE_DOCUMENT_API_KEY=xbiFtFeQ6v5dozgVM99fZ9huUomL7QcLu6s0y8zYHtIXZ8XdneKDMcg4liQr/9oNlVoRFcZhWjLY+ASt9cjICQ==