Merge pull request 'Add Devops Service' (#6) from feature/dax/devops-svc into dev

Reviewed-on: freeleaps/freeleaps-service-hub#6
Reviewed-by: jingyao1991 <jingyao1991@noreply.gitea.freeleaps.mathmast.com>
This commit is contained in:
dax.li 2025-07-15 05:51:09 +00:00
commit 0313cf7061
84 changed files with 2257 additions and 0 deletions

20
apps/devops/README.md Normal file
View File

@ -0,0 +1,20 @@
This is a template backend service based on fastapi + mongodb app
To start development in local, go to the root directory of the project YOUR_WORKSPACE_PATH/helloworld/
```bash
docker compose -f app/scripts/mongodb/docker-compose.yml up -d
```
Then run the app
```bash
uvicorn app.main:app --reload
```
In case a new dependency is added, run the following command to update the requirements.txt file
```bash
# optional: if you have not installed pipreqs
pip3 install pipreqs
# generate requirements.txt
pipreqs . --force
```

View File

View File

View File

@ -0,0 +1,81 @@
import logging
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from app.providers import common
from app.providers.logger import register_logger
from app.providers import router
from app.providers import database
from app.providers import metrics
from app.providers import probes
from app.providers import exception_handler
from app.common.config.app_settings import app_settings
def create_app() -> FastAPI:
logging.info("App initializing")
app = FreeleapsApp()
register_logger()
register(app, exception_handler)
register(app, database)
register(app, router)
register(app, common)
# Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app)
# Register probe APIs if enabled
if app_settings.PROBES_ENABLED:
register(app, probes)
# Register metrics APIs if enabled
if app_settings.METRICS_ENABLED:
register(app, metrics)
return app
# This function overrides the OpenAPI schema version to 3.0.0
def customize_openapi_security(app: FastAPI) -> None:
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
# Generate OpenAPI schema
openapi_schema = get_openapi(
title="FreeLeaps API",
version="3.1.0",
description="FreeLeaps API Documentation",
routes=app.routes,
)
# Ensure the components section exists in the OpenAPI schema
if "components" not in openapi_schema:
openapi_schema["components"] = {}
# Add security scheme to components
openapi_schema["components"]["securitySchemes"] = {
"bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
}
# Add security requirement globally
openapi_schema["security"] = [{"bearerAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
def register(app, provider):
logging.info(provider.__name__ + " registering")
provider.register(app)
def boot(app, provider):
logging.info(provider.__name__ + " booting")
provider.boot(app)
class FreeleapsApp(FastAPI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View File

View File

@ -0,0 +1,29 @@
from pydantic_settings import BaseSettings
# NOTE: The values fall backs to your environment variables when not set here
class AppSettings(BaseSettings):
NAME: str = "YOUR_APP_NAME"
APP_NAME: str = NAME
APP_ENV: str = "alpha"
JWT_SECRET_KEY: str = ""
ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600
REFRESH_TOKEN_EXPIRE_DAYS: int = 1
METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True
APP_MONGODB_URI: str = "mongodb://localhost:27017"
APP_MONGODB_NAME: str = "testdb"
LOG_BASE_PATH: str = "./log"
BACKEND_LOG_FILE_NAME: str = APP_NAME
APPLICATION_ACTIVITY_LOG: str = APP_NAME + "-application-activity"
class Config:
env_file = ".myapp.env"
env_file_encoding = "utf-8"
app_settings = AppSettings()

View File

@ -0,0 +1,16 @@
import os
from dataclasses import dataclass
from .app_settings import app_settings
@dataclass
class LogSettings:
LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
APP_NAME: str = app_settings.APP_NAME
ENVIRONMENT: str = app_settings.APP_ENV
log_settings = LogSettings()

View File

@ -0,0 +1,35 @@
import os
from pydantic_settings import BaseSettings
# NOTE: The values fall backs to your environment variables when not set here
class SiteSettings(BaseSettings):
NAME: str = "appname"
DEBUG: bool = True
ENV: str = "dev"
SERVER_HOST: str = "localhost"
SERVER_PORT: int = 8000
URL: str = "http://localhost"
TIME_ZONE: str = "UTC"
BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__))))
BASE_GITEA_URL: str = "https://gitea.freeleaps.mathmast.com"
# TODO: confirm with Zhenyu
BASE_RECONSILE_URL: str = "https://reconcile.freeleaps.mathmast.com"
# TODO: modify this with actual Loki URL
BASE_LOKI_URL: str = "http://localhost:3100"
class Config:
env_file = ".devbase-webapi.env"
env_file_encoding = "utf-8"
site_settings = SiteSettings()

View File

View File

@ -0,0 +1,12 @@
from .base_logger import LoggerBase
from app.common.config.app_settings import app_settings
class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if application_activities:
extra_fileds.update(application_activities)
super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fileds=extra_fileds,
)

View File

@ -0,0 +1,136 @@
from loguru import logger as guru_logger
from app.common.config.log_settings import log_settings
from typing import Dict, Any, Optional
import socket
import json
import threading
import os
import sys
import inspect
import logging
from app.common.log.json_sink import JsonSink
class LoggerBase:
binded_loggers = {}
logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name
self.extra_fileds = extra_fileds
with LoggerBase.logger_lock:
if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name]
return
log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
guru_logger.remove()
file_sink = JsonSink(
log_file_path=log_filename,
rotation_size_bytes=rotation_bytes,
max_backup_files=log_settings.MAX_BACKUP_FILES
)
guru_logger.add(
sink=file_sink,
level=log_level,
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
self.logger = guru_logger.bind(
topic=self.__logger_name,
host_ip=host_ip,
host_name=host_name,
app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
)
with LoggerBase.logger_lock:
LoggerBase.binded_loggers[self.__logger_name] = self.logger
def _get_log_context(self) -> dict:
frame = inspect.currentframe().f_back.f_back
filename = os.path.basename(frame.f_code.co_filename)
lineno = frame.f_lineno
return {"log_file": filename, "log_line": lineno}
def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
props = {} if properties is None else properties.copy()
props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
return props, props_str
async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
local_logger.exception(text)
async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
local_logger.warning(text)
async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -0,0 +1,85 @@
import json
import datetime
import traceback
from pathlib import Path
from typing import Optional
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -0,0 +1,7 @@
from app.common.models.code_depot.code_depot import CodeDepotDoc
from app.common.models.deployment.deployment import Deployment
from app.common.models.deployment.deployment import DevOpsReconcileRequest, DevOpsReconcileOperationType
# list of beanie document models,
# must add here so that the mongo db collection can be automatically created
db_models = [Deployment, CodeDepotDoc]

View File

@ -0,0 +1,39 @@
from datetime import datetime, timezone
from typing import List, Optional, Dict
from beanie import Document
from enum import IntEnum
from pymongo import IndexModel
class DepotStatus(IntEnum):
TO_BE_CREATED = 0
CREATED = 1
DELETED = 2
class UserAccountStatus(IntEnum):
TO_BE_CREATED = 0
CREATED = 1
DELETED = 2
DEACTIVATED = 3
class CodeDepotDoc(Document):
depot_name: str
product_id: str
depot_status: DepotStatus
collaborators: List[str] = []
total_commits: Optional[int] = 0
last_commiter: Optional[str] = ""
last_update: Optional[datetime] = datetime.now(timezone.utc)
weekly_commits: Optional[Dict[str, int]] = {}
class Settings:
name = "code_depot"
indexes = [
IndexModel([("product_id", 1)])
]

View File

@ -0,0 +1,89 @@
from datetime import datetime, timedelta
from typing import Literal, List, Optional
from dataclasses import dataclass
from enum import Enum
from beanie import Document
from pydantic import Field, field_validator
from pydantic import BaseModel
from pymongo import IndexModel
class Deployment(Document):
deployment_id: str
deployment_stage: str
deployment_status: Literal["started", "failed", "succeeded", "aborted"]
deployment_target_env: Literal["alpha", "prod"]
deployment_ttl_hours: int = 2
deployment_project_id: str
deployment_project_name: str
deployment_product_id: str
deployment_product_name: str
deployment_git_url: str
deployment_git_sha256: str
deployment_reason: str
deployment_app_url: str = "" # URL to access the deployed application, keep it empty to be filled later
deployed_by: str
created_at: datetime = datetime.now()
updated_at: datetime = datetime.now()
class Settings:
name = "deployment"
indexes = [
IndexModel([("deployment_product_id", 1), ("created_at", 1)]),
IndexModel([("deployment_id", 1), ("deployment_status", 1)]),
IndexModel([("deployment_id", 1), ("deployment_stage", 1)], unique=True)
]
class InitDeploymentRequest(BaseModel):
product_id: str
sha256: str = ""
target_env: Literal["alpha", "prod"]
user_id: str
reason: str = "not provided"
ttl_hours: int = 3
class CheckDeploymentStatusRequest(BaseModel):
product_id: str
target_env: str
user_id: str
class CheckApplicationLogsRequest(BaseModel):
product_id: str
target_env: Literal["alpha", "prod"]
user_id: str = ''
log_level: List[Literal["info", "error", "debug"]] = Field(default_factory=lambda: ["info"])
start_time: datetime = datetime.now() - timedelta(minutes=5)
end_time: datetime = datetime.now()
limit: int = 1000
class CheckApplicationLogsResponse(BaseModel):
product_id: str
target_env: Literal["alpha", "prod"]
user_id: str = ''
log_level: List[Literal["info", "error", "debug"]]
start_time: datetime
end_time: datetime
limit: int
logs: list[str]
class DevOpsReconcileOperationType(Enum):
START = "start"
TERMINATE = "terminate"
RESTART = "restart"
@dataclass
class DevOpsReconcileRequest(BaseModel):
operation: DevOpsReconcileOperationType
id: str
devops_proj_id: str
triggered_user_id: str
causes: str
commit_sha256: Optional[str] = None
target_env: Literal["alpha", "prod"]
ttl_controled: bool = False
ttl: int = 10800

View File

@ -0,0 +1,140 @@
import logging
from enum import Enum
from typing import Optional, Callable, Tuple, Dict
import inspect
from datetime import datetime, timezone
# ProbeType is an Enum that defines the types of probes that can be registered.
class ProbeType(Enum):
LIVENESS = "liveness"
READINESS = "readiness"
STARTUP = "startup"
# ProbeResult is a class that represents the result of a probe check.
class ProbeResult:
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
self.success = success
self.message = message
self.data = data or {}
def to_dict(self) -> dict:
return {
"success": self.success,
"message": self.message,
"data": self.data
}
# Probe is a class that represents a probe that can be registered.
class Probe:
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
self.type = type
self.path = path
self.check_fn = check_fn
self.name = name or f"{type.value}-{id(self)}"
async def execute(self) -> ProbeResult:
try:
result = self.check_fn()
if inspect.isawaitable(result):
result = await result
if isinstance(result, ProbeResult):
return result
elif isinstance(result, bool):
return ProbeResult(result, "ok" if result else "failed")
else:
return ProbeResult(True, "ok")
except Exception as e:
return ProbeResult(False, str(e))
# ProbeGroup is a class that represents a group of probes that can be checked together.
class ProbeGroup:
def __init__(self, path: str):
self.path = path
self.probes: Dict[str, Probe] = {}
def add_probe(self, probe: Probe):
self.probes[probe.name] = probe
async def check_all(self) -> Tuple[bool, dict]:
results = {}
all_success = True
for name, probe in self.probes.items():
result = await probe.execute()
results[name] = result.to_dict()
if not result.success:
all_success = False
return all_success, results
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
class FrameworkAdapter:
async def handle_request(self, group: ProbeGroup):
all_success, results = await group.check_all()
status_code = 200 if all_success else 503
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
def register_route(self, path: str, handler: Callable):
raise NotImplementedError
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
class ProbeManager:
_default_paths = {
ProbeType.LIVENESS: "/_/livez",
ProbeType.READINESS: "/_/readyz",
ProbeType.STARTUP: "/_/healthz"
}
def __init__(self):
self.groups: Dict[str, ProbeGroup] = {}
self.adapters: Dict[str, FrameworkAdapter] = {}
self._startup_complete = False
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
self.adapters[framework] = adapter
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
def register(
self,
type: ProbeType,
check_func: Optional[Callable] = None,
path: Optional[str] = None,
prefix: str = "",
name: Optional[str] = None,
frameworks: Optional[list] = None
):
path = path or self._default_paths.get(type, "/_/healthz")
if prefix:
path = f"{prefix}{path}"
if type == ProbeType.STARTUP and check_func is None:
check_func = self._default_startup_check
probe = Probe(type, path, check_func or (lambda: True), name)
if path not in self.groups:
self.groups[path] = ProbeGroup(path)
self.groups[path].add_probe(probe)
for framework in (frameworks or ["default"]):
self._register_route(framework, path)
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
def _register_route(self, framework: str, path: str):
if framework not in self.adapters:
return
adapter = self.adapters[framework]
group = self.groups[path]
async def handler():
return await adapter.handle_request(group)
adapter.register_route(path, handler)
def _default_startup_check(self) -> bool:
return self._startup_complete
def mark_startup_complete(self):
self._startup_complete = True

View File

@ -0,0 +1,15 @@
from . import FrameworkAdapter
from fastapi.responses import JSONResponse
from typing import Callable
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
class FastAPIAdapter(FrameworkAdapter):
def __init__(self, app):
self.app = app
def register_route(self,path: str, handler: Callable):
async def wrapper():
data, status_code = await handler()
return JSONResponse(content=data, status_code=status_code)
self.app.add_api_route(path, wrapper, methods=["GET"])

View File

View File

16
apps/devops/app/main.py Normal file
View File

@ -0,0 +1,16 @@
from fastapi.responses import RedirectResponse
from app.common.config.site_settings import site_settings
from app.bootstrap.application import create_app
app = create_app()
@app.get("/", status_code=301)
async def root():
"""
TODO: redirect client to /doc#
"""
return RedirectResponse("docs")
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT, reload=True)

View File

View File

@ -0,0 +1,31 @@
from fastapi.middleware.cors import CORSMiddleware
from app.common.config.site_settings import site_settings
def register(app):
app.debug = site_settings.DEBUG
app.title = site_settings.NAME
add_global_middleware(app)
# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.on_event("startup")
def startup():
pass
# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.on_event("shutdown")
def shutdown():
pass
def add_global_middleware(app):
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

View File

@ -0,0 +1,34 @@
import asyncio
from app.common.config.app_settings import app_settings
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from app.common.models import db_models
from app.common.probes import ProbeResult
client = AsyncIOMotorClient(
app_settings.APP_MONGODB_URI,
serverSelectionTimeoutMS=60000,
minPoolSize=5, # Minimum number of connections in the pool
maxPoolSize=20, # Maximum number of connections in the pool
)
def register(app):
app.debug = "auth_mongo_debug"
app.title = "auth_mongo_name"
@app.on_event("startup")
async def start_database():
await initiate_database()
async def check_database_initialized() -> ProbeResult:
try:
await asyncio.wait_for(client.server_info(), timeout=5)
return ProbeResult(success=True, message="service has been initialized and ready to serve")
except Exception:
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
async def initiate_database():
await init_beanie(
database=client[app_settings.APP_MONGODB_NAME], document_models=db_models
)

View File

@ -0,0 +1,39 @@
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.status import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
HTTP_422_UNPROCESSABLE_ENTITY,
HTTP_500_INTERNAL_SERVER_ERROR,
)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": exc.detail},
)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"error": str(exc)},
)
async def exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(exc)},
)
def register(app: FastAPI):
app.add_exception_handler(HTTPException, custom_http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, exception_handler)

View File

@ -0,0 +1,7 @@
from app.common.log.base_logger import LoggerBase
def register_logger():
print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
print("✅ Logging interception complete. Logs are formatted and deduplicated!")

View File

@ -0,0 +1,16 @@
import logging
from prometheus_fastapi_instrumentator import Instrumentator
from app.common.config.app_settings import app_settings
def register(app):
instrumentator = (
Instrumentator().instrument(
app,
metric_namespace="freeleaps",
metric_subsystem=app_settings.APP_NAME)
)
@app.on_event("startup")
async def startup():
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
logging.info("Metrics endpoint exposed at /api/_/metrics")

View File

@ -0,0 +1,25 @@
from app.common.probes import ProbeManager, ProbeType
from app.common.probes.adapters import FastAPIAdapter
from .database import check_database_initialized
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
async def readiness_checker():
return await check_database_initialized()
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=readiness_checker,
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()

View File

@ -0,0 +1,34 @@
from app.routes import api_router
from starlette import routing
def register(app):
app.include_router(
api_router,
prefix="/api",
tags=["api"],
dependencies=[],
responses={404: {"description": "no page found"}},
)
if app.debug:
for route in app.routes:
if not isinstance(route, routing.WebSocketRoute):
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"methods": route.methods,
}
)
else:
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"type": "web socket route",
}
)

View File

@ -0,0 +1,8 @@
import asyncio
def register(app):
@app.on_event("startup")
async def start_scheduler():
#create your scheduler here
pass

View File

@ -0,0 +1,7 @@
from fastapi import APIRouter
from app.routes.deployment.apis import router as deployment_api
api_router = APIRouter()
# TODO: add custom routers here
api_router.include_router(deployment_api, tags=["deployment"])

View File

@ -0,0 +1,75 @@
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends
from loguru import logger
from app.common.models import CodeDepotDoc
from app.common.models.deployment.deployment import Deployment, InitDeploymentRequest, CheckDeploymentStatusRequest, \
CheckApplicationLogsRequest, CheckApplicationLogsResponse
from app.routes.deployment.service import DeploymentService, get_deployment_service
router = APIRouter(prefix="/deployment")
@router.post("/initDeployment")
## insert a new Deployment object to db
async def init_deployment(
request: InitDeploymentRequest,
service: DeploymentService = Depends(get_deployment_service)
) -> Deployment:
return await service.init_deployment(request)
@router.get('/getLatestDeployment')
async def get_latest_deployment(
product_id: str,
target_env: str = "alpha",
service: DeploymentService = Depends(get_deployment_service)
) -> Optional[Deployment]:
"""
Get the latest deployment for a given product ID.
"""
return await service.get_latest_deployment(product_id, target_env)
@router.post("/updateDeploymentStatus")
async def update_deployment(
request: Deployment,
service: DeploymentService = Depends(get_deployment_service)
) -> Deployment:
return await service.update_deployment_status(request)
@router.get("/checkDeploymentStatus")
async def check_deployment_status(
deployment_id: str,
service: DeploymentService = Depends(get_deployment_service)
) -> List[Deployment]:
return await service.check_deployment_status(deployment_id)
@router.post("/createDummyCodeDepot")
async def create_dummy_code_depot(
service: DeploymentService = Depends(get_deployment_service)
) -> CodeDepotDoc:
"""
Create a dummy code depot for testing purposes.
"""
try:
depot_name = await service.create_dummy_code_depot()
return depot_name
except Exception as e:
logger.error(f"Failed to create dummy code depot: {e}")
raise e
@router.post("/checkApplicationLogs")
async def check_application_logs(
request: CheckApplicationLogsRequest,
service: DeploymentService = Depends(get_deployment_service)
) -> CheckApplicationLogsResponse:
"""
Check application logs for a given deployment.
"""
try:
res = await service.check_application_logs(request)
return res
except Exception as e:
logger.error(f"Failed to check application logs: {e}")
raise e

View File

@ -0,0 +1,262 @@
import uuid
from collections import defaultdict
from datetime import datetime, timedelta
from typing import List
import httpx
import requests
from fastapi import HTTPException, Depends
from app.common.config.site_settings import site_settings
from app.common.models import Deployment, DevOpsReconcileRequest, DevOpsReconcileOperationType
from app.common.models.code_depot.code_depot import CodeDepotDoc, DepotStatus
from app.common.models.deployment.deployment import InitDeploymentRequest, CheckApplicationLogsRequest, \
CheckApplicationLogsResponse
from loguru import logger
class DeploymentService:
def __init__(self):
pass
async def init_deployment(
self,
request: InitDeploymentRequest,
) -> Deployment:
"""
"""
# TODO validate permission with user_id
# currently skip
code_depot = await self._get_code_depot_by_product_id(request.product_id)
git_url = await self._compose_git_url(code_depot.depot_name)
# retrieve project name
project_name = "TODO"
# retrieve product info, depot name should be the same as product name
product_id = request.product_id
product_name = code_depot.depot_name
deployment = Deployment.model_construct(
deployment_id = str(uuid.uuid4()),
deployment_stage = "init",
deployment_status = "started",
deployment_target_env = request.target_env,
deployment_ttl_hours = request.ttl_hours,
deployment_project_id = "project_id",
deployment_project_name = "project_name",
deployment_product_id = product_id,
deployment_product_name = product_name,
deployment_git_url = git_url,
deployment_git_sha256 = request.sha256,
deployment_reason = request.reason,
deployed_by = request.user_id,
created_at = datetime.now(),
updated_at = datetime.now(),
)
await self._start_deployment(deployment)
res = await Deployment.insert(deployment)
return res
async def get_latest_deployment(
self,
product_id: str,
target_env: str,
) -> Deployment:
time_threshold = datetime.now() - timedelta(hours=168) # 7 days
deployment_records = await Deployment.find(
Deployment.deployment_product_id == product_id,
Deployment.deployment_target_env == target_env,
Deployment.updated_at >= time_threshold
).to_list()
if not deployment_records or len(deployment_records) == 0:
logger.warning(f"No deployment records found for product ID: {product_id} in the last 7 days")
return None
latest_deployment = max(deployment_records, key=lambda d: (d.updated_at, d.created_at))
return latest_deployment
async def check_deployment_status(
self,
product_id: str,
) -> List[Deployment]:
"""
Check the deployment status of the application, only check past 48 hours
"""
# TODO implement this function
time_threshold = datetime.now() - timedelta(hours=48)
deployment_records = await Deployment.find(
Deployment.deployment_product_id == product_id,
Deployment.created_at >= time_threshold
).to_list()
grouped = defaultdict(list)
for deployment in deployment_records:
grouped[deployment.deployment_id].append(deployment)
for deployment_list in grouped.values():
deployment_list.sort(key=lambda d: (d.created_at, d.updated_at), reverse=True)
latest_deployments = [deployments[-1] for deployments in grouped.values()]
return latest_deployments
async def update_deployment_status(
self,
deployment: Deployment
) -> Deployment:
latest_record = await Deployment.find_one(
Deployment.deployment_id == deployment.deployment_id,
sort=[("created_at", -1)]
)
if not latest_record:
raise HTTPException(status_code=404, detail="No record found, please initiate deployment first")
# TODO add more sanity check logic here
# if updating the same stage, just update the status and timestamp
# else, create a new record with the same deployment_id
res = None
if deployment.deployment_stage == latest_record.deployment_stage:
# update existing record
latest_record.deployment_status = deployment.deployment_status
latest_record.updated_at = deployment.updated_at or datetime.now()
res = await latest_record.save()
else:
# create new record
deployment.deployment_id = latest_record.deployment_id
deployment.created_at = datetime.now()
deployment.updated_at = datetime.now()
res = await deployment.insert()
return res
async def _get_code_depot_by_product_id(
self,
product_id: str,
) -> CodeDepotDoc:
"""
Retrieve code depot by product id
"""
code_depot = await CodeDepotDoc.find_one(CodeDepotDoc.product_id == product_id)
if not code_depot:
raise HTTPException(status_code=404,
detail="Code depot not found for the given product id, "
"please initialize the product first"
)
return code_depot
async def _compose_git_url(
self,
code_depot_name: str,
gitea_base_url: str = site_settings.BASE_GITEA_URL
) -> str:
"""
Retrieve git url by product id
"""
return f"{gitea_base_url}/prodcuts/{code_depot_name.lower()}.git"
async def _start_deployment(
self,
deployment: Deployment,
reconsile_base_url: str = site_settings.BASE_RECONSILE_URL,
) -> bool:
"""
Start the deployment
Return true atm, modify calling reconcile service later
"""
# construct request body
request = DevOpsReconcileRequest(
operation=DevOpsReconcileOperationType.START,
id=deployment.deployment_id,
devops_proj_id=deployment.deployment_project_id,
triggered_user_id=deployment.deployed_by,
causes=deployment.deployment_reason,
target_env=deployment.deployment_target_env,
ttl_controled=True,
ttl=deployment.deployment_ttl_hours,
commit_sha256=deployment.deployment_git_sha256,
)
# send request to reoncile service
async with httpx.AsyncClient() as client:
response = await client.post(
f"{reconsile_base_url}/api/devops/reconcile",
json=request.model_dump()
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return True
async def check_application_logs(
self,
request: CheckApplicationLogsRequest,
loki_url: str = site_settings.BASE_LOKI_URL,
) -> CheckApplicationLogsResponse:
# Convert to nanoseconds since epoch
start_ns = int(request.start_time.timestamp() * 1e9)
end_ns = int(request.end_time.timestamp() * 1e9)
# TODO: convert product_id to application name if needed
base_query = f'{{application="{request.product_id}", environment="{request.target_env}"}}'
log_level = '|'.join(request.log_level) if request.log_level else ''
loki_query = f'{base_query} |~ "{log_level}"'
params = {
"query": loki_query,
"limit": request.limit,
"start": start_ns,
"end": end_ns,
}
url = f"{loki_url}/loki/api/v1/query_range"
response = requests.get(url, params=params)
if response.status_code != 200:
raise Exception(f"Query failed: {response.status_code} - {response.text}")
data = response.json()
streams = data.get("data", {}).get("result", [])
logs = []
for stream in streams:
for ts, log in stream.get("values", []):
timestamp = datetime.fromtimestamp(int(ts) / 1e9)
logs.append(f"[{timestamp}] {log.strip()}")
return CheckApplicationLogsResponse(
product_id=request.product_id,
target_env=request.target_env,
user_id=request.user_id,
log_level=request.log_level,
start_time=request.start_time,
end_time=request.end_time,
limit=request.limit,
logs=logs
)
# TODO: dummy test code, remove later
async def create_dummy_code_depot(
self,
) -> CodeDepotDoc:
"""
Create a dummy code depot for testing purposes.
"""
depot_name = f"dummy-depot-{uuid.uuid4()}"
code_depot = CodeDepotDoc(
depot_name=depot_name,
product_id="dummy-product-id",
depot_status=DepotStatus.CREATED
)
return await CodeDepotDoc.insert_one(code_depot)
deployment_service = DeploymentService()
def get_deployment_service() -> DeploymentService:
return deployment_service

View File

@ -0,0 +1,18 @@
version: '3.8'
services:
mongodb:
image: mongo:6.0 # You can change to the desired version
container_name: mongodb
restart: unless-stopped
ports:
- "27017:27017"
environment:
MONGO_INITDB_DATABASE: testdb # <-- This creates the initial database
volumes:
- mongodb_data:/data/db
command: ["mongod", "--noauth"] # <-- Disable authentication
volumes:
mongodb_data:

View File

@ -0,0 +1,14 @@
beanie==1.29.0
fastapi==0.115.12
loguru==0.7.3
motor==3.7.0
prometheus_fastapi_instrumentator==7.1.0
pydantic_settings==2.9.1
pytest==7.1.2
starlette==0.46.2
uvicorn==0.34.2
httpx==0.24.0
pydantic-settings~=2.9.1
pymongo~=4.12.1
pydantic~=2.11.4
requests~=2.32.3

View File

View File

View File

@ -0,0 +1,27 @@
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from app.main import app
from app.routes.hello_world.apis import get_hello_world_dao
def test_hello_world():
with TestClient(app) as client:
response = client.get("/api/hello_world/")
assert response.status_code == 200
assert response.json() == {"message": "Hello, World!"}
# mock out initiate_database so it doesnt run during tests
@patch("app.providers.database.initiate_database", new_callable=AsyncMock)
def test_insert_hello_world(mock_db_init):
class MockHelloWorldDao:
async def create_hello_world(self, msg: str, user_id: int):
return {"message": msg, "user_id": user_id}
app.dependency_overrides[get_hello_world_dao] = lambda: MockHelloWorldDao()
with TestClient(app) as client:
response = client.post("/api/hello_world/insert", params={"msg": "Test Message"})
assert response.status_code == 200
assert response.json() == {"message": "Test Message", "user_id": 1}
app.dependency_overrides.clear()

View File

@ -0,0 +1,8 @@
# Test your FastAPI endpoints
GET http://localhost:8000/api/hello_world/
Accept: application/json
###
POST http://localhost:8000/api/hello_world/insert?msg=Hello%20World
Accept: application/json

20
apps/helloworld/README.md Normal file
View File

@ -0,0 +1,20 @@
This is a template backend service based on fastapi + mongodb app
To start development in local, go to the root directory of the project YOUR_WORKSPACE_PATH/helloworld/
```bash
docker compose -f app/scripts/mongodb/docker-compose.yml up -d
```
Then run the app
```bash
uvicorn app.main:app --reload
```
In case a new dependency is added, run the following command to update the requirements.txt file
```bash
# optional: if you have not installed pipreqs
pip3 install pipreqs
# generate requirements.txt
pipreqs . --force
```

View File

View File

@ -0,0 +1,82 @@
import logging
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from app.providers import common
from app.providers.logger import register_logger
from app.providers import router
from app.providers import database
from app.providers import metrics
from app.providers import probes
from app.providers import exception_handler
from app.common.config.app_settings import app_settings
def create_app() -> FastAPI:
logging.info("App initializing")
app = FreeleapsApp()
register_logger()
register(app, exception_handler)
register(app, database)
register(app, router)
# register(app, scheduler)
register(app, common)
# Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app)
# Register probe APIs if enabled
if app_settings.PROBES_ENABLED:
register(app, probes)
# Register metrics APIs if enabled
if app_settings.METRICS_ENABLED:
register(app, metrics)
return app
# This function overrides the OpenAPI schema version to 3.0.0
def customize_openapi_security(app: FastAPI) -> None:
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
# Generate OpenAPI schema
openapi_schema = get_openapi(
title="FreeLeaps API",
version="3.1.0",
description="FreeLeaps API Documentation",
routes=app.routes,
)
# Ensure the components section exists in the OpenAPI schema
if "components" not in openapi_schema:
openapi_schema["components"] = {}
# Add security scheme to components
openapi_schema["components"]["securitySchemes"] = {
"bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
}
# Add security requirement globally
openapi_schema["security"] = [{"bearerAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
def register(app, provider):
logging.info(provider.__name__ + " registering")
provider.register(app)
def boot(app, provider):
logging.info(provider.__name__ + " booting")
provider.boot(app)
class FreeleapsApp(FastAPI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View File

View File

@ -0,0 +1,29 @@
from pydantic_settings import BaseSettings
# NOTE: The values fall backs to your environment variables when not set here
class AppSettings(BaseSettings):
NAME: str = "YOUR_APP_NAME"
APP_NAME: str = NAME
APP_ENV: str = "alpha"
JWT_SECRET_KEY: str = ""
ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600
REFRESH_TOKEN_EXPIRE_DAYS: int = 1
METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True
APP_MONGODB_URI: str = "mongodb://localhost:27017"
APP_MONGODB_NAME: str = "testdb"
LOG_BASE_PATH: str = "./log"
BACKEND_LOG_FILE_NAME: str = APP_NAME
APPLICATION_ACTIVITY_LOG: str = APP_NAME + "-application-activity"
class Config:
env_file = ".myapp.env"
env_file_encoding = "utf-8"
app_settings = AppSettings()

View File

@ -0,0 +1,16 @@
import os
from dataclasses import dataclass
from .app_settings import app_settings
@dataclass
class LogSettings:
LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
APP_NAME: str = app_settings.APP_NAME
ENVIRONMENT: str = app_settings.APP_ENV
log_settings = LogSettings()

View File

@ -0,0 +1,27 @@
import os
from pydantic_settings import BaseSettings
# NOTE: The values fall backs to your environment variables when not set here
class SiteSettings(BaseSettings):
NAME: str = "appname"
DEBUG: bool = True
ENV: str = "dev"
SERVER_HOST: str = "localhost"
SERVER_PORT: int = 8000
URL: str = "http://localhost"
TIME_ZONE: str = "UTC"
BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__))))
class Config:
env_file = ".devbase-webapi.env"
env_file_encoding = "utf-8"
site_settings = SiteSettings()

View File

@ -0,0 +1,6 @@
from app.common.daos.hello_world.hello_world_dao import HelloWorldDao
hello_world_dao = HelloWorldDao()
def get_hello_world_dao() -> HelloWorldDao:
return hello_world_dao

View File

@ -0,0 +1,30 @@
from app.common.models.hello_world.hello_world import HelloWorld
class HelloWorldDao:
def __init__(self):
pass
async def create_hello_world(self, message: str, count: int):
hello_world = HelloWorld(message=message, count=count)
await hello_world.insert()
return hello_world
async def get_hello_world(self, id: str):
hello_world = await HelloWorld.get(id)
return hello_world
async def update_hello_world(self, id: str, message: str, count: int):
hello_world = await HelloWorld.get(id)
if hello_world:
hello_world.message = message
hello_world.count = count
await hello_world.save()
return hello_world
return None
async def delete_hello_world(self, id: str):
hello_world = await HelloWorld.get(id)
if hello_world:
await hello_world.delete()
return True
return False

View File

@ -0,0 +1,12 @@
from .base_logger import LoggerBase
from app.common.config.app_settings import app_settings
class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if application_activities:
extra_fileds.update(application_activities)
super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fileds=extra_fileds,
)

View File

@ -0,0 +1,136 @@
from loguru import logger as guru_logger
from app.common.config.log_settings import log_settings
from typing import Dict, Any, Optional
import socket
import json
import threading
import os
import sys
import inspect
import logging
from app.common.log.json_sink import JsonSink
class LoggerBase:
binded_loggers = {}
logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name
self.extra_fileds = extra_fileds
with LoggerBase.logger_lock:
if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name]
return
log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
guru_logger.remove()
file_sink = JsonSink(
log_file_path=log_filename,
rotation_size_bytes=rotation_bytes,
max_backup_files=log_settings.MAX_BACKUP_FILES
)
guru_logger.add(
sink=file_sink,
level=log_level,
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
self.logger = guru_logger.bind(
topic=self.__logger_name,
host_ip=host_ip,
host_name=host_name,
app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
)
with LoggerBase.logger_lock:
LoggerBase.binded_loggers[self.__logger_name] = self.logger
def _get_log_context(self) -> dict:
frame = inspect.currentframe().f_back.f_back
filename = os.path.basename(frame.f_code.co_filename)
lineno = frame.f_lineno
return {"log_file": filename, "log_line": lineno}
def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
props = {} if properties is None else properties.copy()
props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
return props, props_str
async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
local_logger.exception(text)
async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
local_logger.warning(text)
async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -0,0 +1,85 @@
import json
import datetime
import traceback
from pathlib import Path
from typing import Optional
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -0,0 +1,4 @@
from app.common.models.hello_world.hello_world import HelloWorld
# list of beanie document models
db_models = [HelloWorld]

View File

@ -0,0 +1,17 @@
from datetime import datetime
from beanie import Document
class HelloWorld(Document):
message: str
count: int = 0
created_time: datetime = datetime.now()
class Settings:
name = "hello_world"
indexes = [
[("message", 1), ("count", 1)]
]

View File

@ -0,0 +1,140 @@
import logging
from enum import Enum
from typing import Optional, Callable, Tuple, Dict
import inspect
from datetime import datetime, timezone
# ProbeType is an Enum that defines the types of probes that can be registered.
class ProbeType(Enum):
LIVENESS = "liveness"
READINESS = "readiness"
STARTUP = "startup"
# ProbeResult is a class that represents the result of a probe check.
class ProbeResult:
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
self.success = success
self.message = message
self.data = data or {}
def to_dict(self) -> dict:
return {
"success": self.success,
"message": self.message,
"data": self.data
}
# Probe is a class that represents a probe that can be registered.
class Probe:
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
self.type = type
self.path = path
self.check_fn = check_fn
self.name = name or f"{type.value}-{id(self)}"
async def execute(self) -> ProbeResult:
try:
result = self.check_fn()
if inspect.isawaitable(result):
result = await result
if isinstance(result, ProbeResult):
return result
elif isinstance(result, bool):
return ProbeResult(result, "ok" if result else "failed")
else:
return ProbeResult(True, "ok")
except Exception as e:
return ProbeResult(False, str(e))
# ProbeGroup is a class that represents a group of probes that can be checked together.
class ProbeGroup:
def __init__(self, path: str):
self.path = path
self.probes: Dict[str, Probe] = {}
def add_probe(self, probe: Probe):
self.probes[probe.name] = probe
async def check_all(self) -> Tuple[bool, dict]:
results = {}
all_success = True
for name, probe in self.probes.items():
result = await probe.execute()
results[name] = result.to_dict()
if not result.success:
all_success = False
return all_success, results
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
class FrameworkAdapter:
async def handle_request(self, group: ProbeGroup):
all_success, results = await group.check_all()
status_code = 200 if all_success else 503
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
def register_route(self, path: str, handler: Callable):
raise NotImplementedError
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
class ProbeManager:
_default_paths = {
ProbeType.LIVENESS: "/_/livez",
ProbeType.READINESS: "/_/readyz",
ProbeType.STARTUP: "/_/healthz"
}
def __init__(self):
self.groups: Dict[str, ProbeGroup] = {}
self.adapters: Dict[str, FrameworkAdapter] = {}
self._startup_complete = False
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
self.adapters[framework] = adapter
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
def register(
self,
type: ProbeType,
check_func: Optional[Callable] = None,
path: Optional[str] = None,
prefix: str = "",
name: Optional[str] = None,
frameworks: Optional[list] = None
):
path = path or self._default_paths.get(type, "/_/healthz")
if prefix:
path = f"{prefix}{path}"
if type == ProbeType.STARTUP and check_func is None:
check_func = self._default_startup_check
probe = Probe(type, path, check_func or (lambda: True), name)
if path not in self.groups:
self.groups[path] = ProbeGroup(path)
self.groups[path].add_probe(probe)
for framework in (frameworks or ["default"]):
self._register_route(framework, path)
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
def _register_route(self, framework: str, path: str):
if framework not in self.adapters:
return
adapter = self.adapters[framework]
group = self.groups[path]
async def handler():
return await adapter.handle_request(group)
adapter.register_route(path, handler)
def _default_startup_check(self) -> bool:
return self._startup_complete
def mark_startup_complete(self):
self._startup_complete = True

View File

@ -0,0 +1,15 @@
from . import FrameworkAdapter
from fastapi.responses import JSONResponse
from typing import Callable
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
class FastAPIAdapter(FrameworkAdapter):
def __init__(self, app):
self.app = app
def register_route(self,path: str, handler: Callable):
async def wrapper():
data, status_code = await handler()
return JSONResponse(content=data, status_code=status_code)
self.app.add_api_route(path, wrapper, methods=["GET"])

View File

View File

View File

@ -0,0 +1,16 @@
from fastapi.responses import RedirectResponse
from app.common.config.site_settings import site_settings
from app.bootstrap.application import create_app
app = create_app()
@app.get("/", status_code=301)
async def root():
"""
TODO: redirect client to /doc#
"""
return RedirectResponse("docs")
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT, reload=True)

View File

@ -0,0 +1,31 @@
from fastapi.middleware.cors import CORSMiddleware
from app.common.config.site_settings import site_settings
def register(app):
app.debug = site_settings.DEBUG
app.title = site_settings.NAME
add_global_middleware(app)
# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.on_event("startup")
def startup():
pass
# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.on_event("shutdown")
def shutdown():
pass
def add_global_middleware(app):
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

View File

@ -0,0 +1,34 @@
import asyncio
from app.common.config.app_settings import app_settings
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from app.common.models import db_models
from app.common.probes import ProbeResult
client = AsyncIOMotorClient(
app_settings.APP_MONGODB_URI,
serverSelectionTimeoutMS=60000,
minPoolSize=5, # Minimum number of connections in the pool
maxPoolSize=20, # Maximum number of connections in the pool
)
def register(app):
app.debug = "auth_mongo_debug"
app.title = "auth_mongo_name"
@app.on_event("startup")
async def start_database():
await initiate_database()
async def check_database_initialized() -> ProbeResult:
try:
await asyncio.wait_for(client.server_info(), timeout=5)
return ProbeResult(success=True, message="service has been initialized and ready to serve")
except Exception:
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
async def initiate_database():
await init_beanie(
database=client[app_settings.APP_MONGODB_NAME], document_models=db_models
)

View File

@ -0,0 +1,39 @@
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.status import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
HTTP_422_UNPROCESSABLE_ENTITY,
HTTP_500_INTERNAL_SERVER_ERROR,
)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": exc.detail},
)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"error": str(exc)},
)
async def exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(exc)},
)
def register(app: FastAPI):
app.add_exception_handler(HTTPException, custom_http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, exception_handler)

View File

@ -0,0 +1,7 @@
from app.common.log.base_logger import LoggerBase
def register_logger():
print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
print("✅ Logging interception complete. Logs are formatted and deduplicated!")

View File

@ -0,0 +1,16 @@
import logging
from prometheus_fastapi_instrumentator import Instrumentator
from app.common.config.app_settings import app_settings
def register(app):
instrumentator = (
Instrumentator().instrument(
app,
metric_namespace="freeleaps",
metric_subsystem=app_settings.APP_NAME)
)
@app.on_event("startup")
async def startup():
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
logging.info("Metrics endpoint exposed at /api/_/metrics")

View File

@ -0,0 +1,25 @@
from app.common.probes import ProbeManager, ProbeType
from app.common.probes.adapters import FastAPIAdapter
from .database import check_database_initialized
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
async def readiness_checker():
return await check_database_initialized()
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=readiness_checker,
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()

View File

@ -0,0 +1,34 @@
from app.routes import api_router
from starlette import routing
def register(app):
app.include_router(
api_router,
prefix="/api",
tags=["api"],
dependencies=[],
responses={404: {"description": "no page found"}},
)
if app.debug:
for route in app.routes:
if not isinstance(route, routing.WebSocketRoute):
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"methods": route.methods,
}
)
else:
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"type": "web socket route",
}
)

View File

@ -0,0 +1,8 @@
import asyncio
def register(app):
@app.on_event("startup")
async def start_scheduler():
#create your scheduler here
pass

View File

@ -0,0 +1,8 @@
from fastapi import APIRouter
from app.routes.hello_world import router as hello_world_router
api_router = APIRouter()
# TODO: add custom routers here
api_router.include_router(hello_world_router, tags=["hello_world"])

View File

@ -0,0 +1,7 @@
from fastapi import APIRouter
from .apis import router as hello_world_api
router = APIRouter(prefix="/hello_world")
router.include_router(hello_world_api, tags=["hello_world"])

View File

@ -0,0 +1,25 @@
from fastapi import APIRouter, Depends
from loguru import logger
from app.common.daos.hello_world import get_hello_world_dao, HelloWorldDao
router = APIRouter()
@router.get("/")
async def hello_world():
logger.info("Hello, World! endpoint was called")
return {"message": "Hello, World!"}
@router.post("/insert")
async def insert_hello_world(msg: str, dao: HelloWorldDao = Depends(get_hello_world_dao)):
"""
Insert a HelloWorld document into the database.
"""
hello_world = await dao.create_hello_world(msg, 1)
return hello_world

View File

@ -0,0 +1,18 @@
version: '3.8'
services:
mongodb:
image: mongo:6.0 # You can change to the desired version
container_name: mongodb
restart: unless-stopped
ports:
- "27017:27017"
environment:
MONGO_INITDB_DATABASE: testdb # <-- This creates the initial database
volumes:
- mongodb_data:/data/db
command: ["mongod", "--noauth"] # <-- Disable authentication
volumes:
mongodb_data:

View File

@ -0,0 +1,10 @@
beanie==1.29.0
fastapi==0.115.12
loguru==0.7.3
motor==3.7.0
prometheus_fastapi_instrumentator==7.1.0
pydantic_settings==2.9.1
pytest==7.1.2
starlette==0.46.2
uvicorn==0.34.2
httpx==0.24.0

View File

View File

View File

@ -0,0 +1,27 @@
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from app.main import app
from app.routes.hello_world.apis import get_hello_world_dao
def test_hello_world():
with TestClient(app) as client:
response = client.get("/api/hello_world/")
assert response.status_code == 200
assert response.json() == {"message": "Hello, World!"}
# mock out initiate_database so it doesnt run during tests
@patch("app.providers.database.initiate_database", new_callable=AsyncMock)
def test_insert_hello_world(mock_db_init):
class MockHelloWorldDao:
async def create_hello_world(self, msg: str, user_id: int):
return {"message": msg, "user_id": user_id}
app.dependency_overrides[get_hello_world_dao] = lambda: MockHelloWorldDao()
with TestClient(app) as client:
response = client.post("/api/hello_world/insert", params={"msg": "Test Message"})
assert response.status_code == 200
assert response.json() == {"message": "Test Message", "user_id": 1}
app.dependency_overrides.clear()

View File

@ -0,0 +1,8 @@
# Test your FastAPI endpoints
GET http://localhost:8000/api/hello_world/
Accept: application/json
###
POST http://localhost:8000/api/hello_world/insert?msg=Hello%20World
Accept: application/json