diff --git a/.gitignore b/.gitignore index b022725..c6032c1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ *.pyc freedev.code-workspace .idea/ -.pytest_cache/ \ No newline at end of file +.pytest_cache/ +CLAUDE.md diff --git a/apps/devops/Dockerfile b/apps/devops/Dockerfile new file mode 100644 index 0000000..1d96106 --- /dev/null +++ b/apps/devops/Dockerfile @@ -0,0 +1,31 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Set environment variables +ENV LOG_BASE_PATH=/app/log/devsvc + +# Create necessary directories +RUN mkdir -p /app/log/devops + +# Expose port +EXPOSE 8014 + +# Health check +HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8014/api/_/healthz || exit 1 + +# Start the application +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8014", "--reload"] \ No newline at end of file diff --git a/apps/devops/README.md b/apps/devops/README.md new file mode 100644 index 0000000..2fb84b3 --- /dev/null +++ b/apps/devops/README.md @@ -0,0 +1,20 @@ +This is a template backend service based on fastapi + mongodb app + +To start development in local, go to the root directory of the project YOUR_WORKSPACE_PATH/helloworld/ +```bash +docker compose -f app/scripts/mongodb/docker-compose.yml up -d +``` + +Then run the app +```bash +uvicorn app.main:app --reload +``` + +In case a new dependency is added, run the following command to update the requirements.txt file +```bash +# optional: if you have not installed pipreqs +pip3 install pipreqs + +# generate requirements.txt +pipreqs . --force +``` diff --git a/apps/devops/app/__init__.py b/apps/devops/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/backend/infra/rabbitmq/__init__.py b/apps/devops/app/backend/infra/rabbitmq/__init__.py new file mode 100644 index 0000000..881d42a --- /dev/null +++ b/apps/devops/app/backend/infra/rabbitmq/__init__.py @@ -0,0 +1 @@ +# RabbitMQ infrastructure for DevOps service \ No newline at end of file diff --git a/apps/devops/app/backend/infra/rabbitmq/async_client.py b/apps/devops/app/backend/infra/rabbitmq/async_client.py new file mode 100644 index 0000000..39e9f28 --- /dev/null +++ b/apps/devops/app/backend/infra/rabbitmq/async_client.py @@ -0,0 +1,66 @@ +from app.common.config.app_settings import app_settings +from app.common.log.module_logger import ModuleLogger +import asyncio +from asyncio import AbstractEventLoop +import aio_pika + + +class AsyncMQClient: + exchange_name_format = "freeleaps.devops.exchange.{}" + exchange_type = "direct" + + def __init__(self, channel_name: str) -> None: + self.exchange_name_format = AsyncMQClient.exchange_name_format + self.channel_name = channel_name + self.exchange_type = AsyncMQClient.exchange_type + self.exchange_name = self.exchange_name_format.format(self.channel_name) + self.process_callable = None + self.routing_key = self.channel_name + self.module_logger = ModuleLogger(sender_id="AsyncMQClient") + + async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None): + retry_count = 0 + retry_interval = 1 + + while retry_count < max_retries: + try: + self.connection = await aio_pika.connect_robust( + host=app_settings.RABBITMQ_HOST, + port=int(app_settings.RABBITMQ_PORT), + login=app_settings.RABBITMQ_USERNAME, + password=app_settings.RABBITMQ_PASSWORD, + virtualhost=app_settings.RABBITMQ_VIRTUAL_HOST, + loop=event_loop, + ) + self.channel = await self.connection.channel() + self.exchange = await self.channel.declare_exchange( + name=self.exchange_name, type="direct", auto_delete=False + ) + + # Connect to existing named queue instead of creating anonymous queue + # channel_name already contains the full queue name from environment variable + self.queue = await self.channel.declare_queue( + name=self.channel_name, exclusive=False, auto_delete=False, durable=True + ) + await self.queue.bind( + exchange=self.exchange, routing_key=self.routing_key + ) + break + except Exception as e: + await self.module_logger.log_exception( + exception=e, + text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}", + ) + await asyncio.sleep(retry_interval) + retry_interval = min(retry_interval * 2, 60) + retry_count += 1 + + if retry_count >= max_retries: + raise ConnectionError( + "Unable to connect to RabbitMQ after multiple retries." + ) + + async def close(self): + """Unbind the queue and close the connection gracefully.""" + await self.queue.unbind(self.exchange, self.routing_key) + await self.connection.close() \ No newline at end of file diff --git a/apps/devops/app/backend/infra/rabbitmq/async_subscriber.py b/apps/devops/app/backend/infra/rabbitmq/async_subscriber.py new file mode 100644 index 0000000..05ef7e4 --- /dev/null +++ b/apps/devops/app/backend/infra/rabbitmq/async_subscriber.py @@ -0,0 +1,84 @@ +from asyncio import AbstractEventLoop +from app.common.log.module_logger import ModuleLogger +import json +import asyncio +from .async_client import AsyncMQClient + + +class AsyncMQSubscriber(AsyncMQClient): + def __init__(self, channel_name: str) -> None: + super().__init__(channel_name=channel_name) + self.process_callable = None + self.routing_key = self.channel_name + self.consumer_callbacks = {} + self.consumer_callbacks_lock = asyncio.Lock() + self.module_logger = ModuleLogger(sender_id="AsyncMQSubscriber") + + async def process_incoming_message(self, message): + """Processing incoming message from RabbitMQ""" + await message.ack() + body = message.body + if body: + async with self.consumer_callbacks_lock: + for registry_key, callback_info in self.consumer_callbacks.items(): + try: + await callback_info["method"]( + registry_key, json.loads(body), callback_info["args"] + ) + except Exception as err: + await self.module_logger.log_exception( + exception=err, + text=f"Error processing message for consumer '{registry_key}'", + ) + + async def subscribe(self, max_retries=10, event_loop: AbstractEventLoop = None): + """Attempts to bind and consume messages, with retry mechanism.""" + retries = 0 + while retries < max_retries: + try: + await self.bind(max_retries=5, event_loop=event_loop) + await self.queue.consume( + no_ack=False, exclusive=True, callback=self.process_incoming_message + ) + break + except Exception as e: + await self.module_logger.log_exception( + exception=e, + text=f"Failed to subscribe at {retries} time, will retry", + ) + retries += 1 + await asyncio.sleep(5) + else: + await self.module_logger.log_exception( + exception=ConnectionError( + f"Exceeded max retries ({max_retries}) for subscription." + ), + text=f"Subscription failed for {self.channel_name} after {max_retries} attempts.", + ) + + async def register_consumer( + self, + registry_key: str, + callback_method, + args: dict, + ): + """Register a consumer callback with a unique key.""" + async with self.consumer_callbacks_lock: + self.consumer_callbacks[registry_key] = { + "method": callback_method, + "args": args, + } + + async def unregister_consumer( + self, + registry_key: str, + ): + """Unregister a consumer callback by its key.""" + async with self.consumer_callbacks_lock: + if registry_key in self.consumer_callbacks: + del self.consumer_callbacks[registry_key] + + async def clear_all_consumers(self): + """Unregister all consumer callbacks.""" + async with self.consumer_callbacks_lock: + self.consumer_callbacks.clear() \ No newline at end of file diff --git a/apps/devops/app/backend/services/deployment_status_update_service.py b/apps/devops/app/backend/services/deployment_status_update_service.py new file mode 100644 index 0000000..2dfa53a --- /dev/null +++ b/apps/devops/app/backend/services/deployment_status_update_service.py @@ -0,0 +1,100 @@ +from datetime import datetime +from typing import Dict, Literal +from app.common.log.module_logger import ModuleLogger +from app.common.models.deployment.deployment import Deployment +from app.common.models.deployment.heartbeat import DevOpsReconcileJobHeartbeatMessage + + +class DeploymentStatusUpdateService: + def __init__(self): + self.module_logger = ModuleLogger(sender_id="DeploymentStatusUpdateService") + + # Status mapping from heartbeat to deployment model + self.status_mapping: Dict[str, Literal["started", "failed", "succeeded", "aborted"]] = { + "running": "started", + "success": "succeeded", + "failed": "failed", + "terminated": "aborted" + } + + # Phase to stage mapping for more detailed tracking + self.phase_to_stage_mapping: Dict[str, str] = { + "initializing": "initialization", + "jenkins_build": "build", + "building": "build", + "deploying": "deployment", + "finished": "completed" + } + + async def process_heartbeat_message(self, registry_key: str, message_data: dict, args: dict): + """Process incoming heartbeat message and update deployment status""" + # registry_key and args are provided by the message queue framework but not used in this implementation + _ = registry_key, args + try: + # Parse the message using our Pydantic model + heartbeat_message = DevOpsReconcileJobHeartbeatMessage(**message_data) + payload = heartbeat_message.payload + + await self.module_logger.log_info( + text=f"Processing heartbeat for deployment {payload.id}: {payload.status} - {payload.phase}", + data={"deployment_id": payload.id, "status": payload.status, "phase": payload.phase} + ) + + # Find the deployment by ID + deployment = await Deployment.find_one(Deployment.deployment_id == payload.id) + if not deployment: + await self.module_logger.log_warning( + text=f"Deployment not found: {payload.id}", + data={"deployment_id": payload.id} + ) + return + + # Map heartbeat status to deployment status + if payload.status in self.status_mapping: + deployment.deployment_status = self.status_mapping[payload.status] + else: + await self.module_logger.log_warning( + text=f"Unknown status received: {payload.status}", + data={"deployment_id": payload.id, "status": payload.status} + ) + return + + # Map phase to deployment stage + if payload.phase in self.phase_to_stage_mapping: + deployment.deployment_stage = self.phase_to_stage_mapping[payload.phase] + else: + deployment.deployment_stage = payload.phase + + # Update app URL if provided and deployment is successful + if payload.url and payload.status == "success": + deployment.deployment_app_url = payload.url + + # Update timestamp + deployment.updated_at = datetime.now() + + # Save the updated deployment + await deployment.save() + + await self.module_logger.log_info( + text=f"Updated deployment {payload.id}: status={deployment.deployment_status}, stage={deployment.deployment_stage}", + data={ + "deployment_id": payload.id, + "status": deployment.deployment_status, + "stage": deployment.deployment_stage, + "app_url": deployment.deployment_app_url, + "error": payload.error + } + ) + + # Log errors if present + if payload.error: + await self.module_logger.log_error( + text=f"Deployment {payload.id} failed: {payload.error}", + data={"deployment_id": payload.id, "error": payload.error, "phase": payload.phase} + ) + + except Exception as e: + await self.module_logger.log_exception( + exception=e, + text=f"Error processing heartbeat message: {message_data}", + ) \ No newline at end of file diff --git a/apps/devops/app/bootstrap/__init__.py b/apps/devops/app/bootstrap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/bootstrap/application.py b/apps/devops/app/bootstrap/application.py new file mode 100644 index 0000000..3cf635b --- /dev/null +++ b/apps/devops/app/bootstrap/application.py @@ -0,0 +1,83 @@ +import logging +from fastapi import FastAPI +from fastapi.openapi.utils import get_openapi + +from app.providers import common +from app.providers.logger import register_logger +from app.providers import router +from app.providers import database +from app.providers import metrics +from app.providers import probes +from app.providers import exception_handler +from app.providers import message_queue +from app.common.config.app_settings import app_settings + +def create_app() -> FastAPI: + logging.info("App initializing") + + app = FreeleapsApp() + + register_logger() + register(app, exception_handler) + register(app, database) + register(app, router) + register(app, common) + register(app, message_queue) + + # Call the custom_openapi function to change the OpenAPI version + customize_openapi_security(app) + # Register probe APIs if enabled + if app_settings.PROBES_ENABLED: + register(app, probes) + + # Register metrics APIs if enabled + if app_settings.METRICS_ENABLED: + register(app, metrics) + return app + + +# This function overrides the OpenAPI schema version to 3.0.0 +def customize_openapi_security(app: FastAPI) -> None: + + def custom_openapi(): + if app.openapi_schema: + return app.openapi_schema + + # Generate OpenAPI schema + openapi_schema = get_openapi( + title="FreeLeaps API", + version="3.1.0", + description="FreeLeaps API Documentation", + routes=app.routes, + ) + + # Ensure the components section exists in the OpenAPI schema + if "components" not in openapi_schema: + openapi_schema["components"] = {} + + # Add security scheme to components + openapi_schema["components"]["securitySchemes"] = { + "bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"} + } + + # Add security requirement globally + openapi_schema["security"] = [{"bearerAuth": []}] + + app.openapi_schema = openapi_schema + return app.openapi_schema + + app.openapi = custom_openapi + + +def register(app, provider): + logging.info(provider.__name__ + " registering") + provider.register(app) + + +def boot(app, provider): + logging.info(provider.__name__ + " booting") + provider.boot(app) + +class FreeleapsApp(FastAPI): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) diff --git a/apps/devops/app/common/__init__.py b/apps/devops/app/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/common/config/__init__.py b/apps/devops/app/common/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/common/config/app_settings.py b/apps/devops/app/common/config/app_settings.py new file mode 100644 index 0000000..d7b3c7d --- /dev/null +++ b/apps/devops/app/common/config/app_settings.py @@ -0,0 +1,35 @@ +from pydantic_settings import BaseSettings + +# NOTE: The values fall backs to your environment variables when not set here +class AppSettings(BaseSettings): + NAME: str = "YOUR_APP_NAME" + APP_NAME: str = NAME + APP_ENV: str = "alpha" + + JWT_SECRET_KEY: str = "" + ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600 + REFRESH_TOKEN_EXPIRE_DAYS: int = 1 + + METRICS_ENABLED: bool = False + PROBES_ENABLED: bool = True + + APP_MONGODB_URI: str = "mongodb://localhost:27017" + APP_MONGODB_NAME: str = "testdb" + + LOG_BASE_PATH: str = "./log" + BACKEND_LOG_FILE_NAME: str = APP_NAME + APPLICATION_ACTIVITY_LOG: str = APP_NAME + "-application-activity"\ + + RABBITMQ_HOST: str = "localhost" + RABBITMQ_PORT: int = 5672 + RABBITMQ_USERNAME: str = "guest" + RABBITMQ_PASSWORD: str = "guest" + RABBITMQ_VIRTUAL_HOST: str = "/" + + + class Config: + env_file = ".myapp.env" + env_file_encoding = "utf-8" + + +app_settings = AppSettings() diff --git a/apps/devops/app/common/config/log_settings.py b/apps/devops/app/common/config/log_settings.py new file mode 100644 index 0000000..2f6985c --- /dev/null +++ b/apps/devops/app/common/config/log_settings.py @@ -0,0 +1,16 @@ +import os +from dataclasses import dataclass +from .app_settings import app_settings + +@dataclass +class LogSettings: + LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH + LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days") + LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight + MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5)) + LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB + APP_NAME: str = app_settings.APP_NAME + ENVIRONMENT: str = app_settings.APP_ENV + + +log_settings = LogSettings() diff --git a/apps/devops/app/common/config/site_settings.py b/apps/devops/app/common/config/site_settings.py new file mode 100644 index 0000000..0bf92eb --- /dev/null +++ b/apps/devops/app/common/config/site_settings.py @@ -0,0 +1,35 @@ +import os + +from pydantic_settings import BaseSettings + + +# NOTE: The values fall backs to your environment variables when not set here +class SiteSettings(BaseSettings): + NAME: str = "appname" + DEBUG: bool = True + + ENV: str = "dev" + + SERVER_HOST: str = "localhost" + SERVER_PORT: int = 8000 + + URL: str = "http://localhost" + TIME_ZONE: str = "UTC" + + BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__)))) + + BASE_GITEA_URL: str = "https://gitea.freeleaps.mathmast.com" + + # TODO: confirm with Zhenyu + BASE_RECONCILE_URL: str = "https://reconcile.freeleaps.mathmast.com" + + # TODO: modify this with actual Loki URL + BASE_LOKI_URL: str = "http://localhost:3100" + + class Config: + env_file = ".devbase-webapi.env" + env_file_encoding = "utf-8" + + +site_settings = SiteSettings() + diff --git a/apps/devops/app/common/log/__init__.py b/apps/devops/app/common/log/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/common/log/application_logger.py b/apps/devops/app/common/log/application_logger.py new file mode 100644 index 0000000..896c044 --- /dev/null +++ b/apps/devops/app/common/log/application_logger.py @@ -0,0 +1,12 @@ +from .base_logger import LoggerBase +from app.common.config.app_settings import app_settings + +class ApplicationLogger(LoggerBase): + def __init__(self, application_activities: dict[str, any] = {}) -> None: + extra_fileds = {} + if application_activities: + extra_fileds.update(application_activities) + super().__init__( + logger_name=app_settings.APPLICATION_ACTIVITY_LOG, + extra_fileds=extra_fileds, + ) diff --git a/apps/devops/app/common/log/base_logger.py b/apps/devops/app/common/log/base_logger.py new file mode 100644 index 0000000..a370296 --- /dev/null +++ b/apps/devops/app/common/log/base_logger.py @@ -0,0 +1,136 @@ +from loguru import logger as guru_logger +from app.common.config.log_settings import log_settings +from typing import Dict, Any, Optional +import socket +import json +import threading +import os +import sys +import inspect +import logging + +from app.common.log.json_sink import JsonSink + +class LoggerBase: + binded_loggers = {} + logger_lock = threading.Lock() + + def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None: + self.__logger_name = logger_name + self.extra_fileds = extra_fileds + with LoggerBase.logger_lock: + if self.__logger_name in LoggerBase.binded_loggers: + self.logger = LoggerBase.binded_loggers[self.__logger_name] + return + + log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" + log_level = "INFO" + rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024) + + guru_logger.remove() + + file_sink = JsonSink( + log_file_path=log_filename, + rotation_size_bytes=rotation_bytes, + max_backup_files=log_settings.MAX_BACKUP_FILES + ) + guru_logger.add( + sink=file_sink, + level=log_level, + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + guru_logger.add( + sink=sys.stderr, + level=log_level, + format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}", + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + self.logger = guru_logger.bind( + topic=self.__logger_name, + host_ip=host_ip, + host_name=host_name, + app=log_settings.APP_NAME, + env=log_settings.ENVIRONMENT, + ) + with LoggerBase.logger_lock: + LoggerBase.binded_loggers[self.__logger_name] = self.logger + + def _get_log_context(self) -> dict: + frame = inspect.currentframe().f_back.f_back + filename = os.path.basename(frame.f_code.co_filename) + lineno = frame.f_lineno + return {"log_file": filename, "log_line": lineno} + + def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]: + props = {} if properties is None else properties.copy() + props_str = json.dumps(props, ensure_ascii=False) if props else "{}" + return props, props_str + + async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context) + local_logger.info(text) + + async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context) + local_logger.exception(text) + + async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context) + local_logger.info(text) + + async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context) + local_logger.warning(text) + + async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context) + local_logger.error(text) + + @staticmethod + def configure_uvicorn_logging(): + print("πŸ“’ Setting up uvicorn logging interception...") + + # Intercept logs from these loggers + intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] + + class InterceptHandler(logging.Handler): + def emit(self, record): + level = ( + guru_logger.level(record.levelname).name + if guru_logger.level(record.levelname, None) + else record.levelno + ) + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + guru_logger.opt(depth=depth, exception=record.exc_info).log( + level, + f"[{record.name}] {record.getMessage()}", + ) + + # Replace default handlers + logging.root.handlers.clear() + logging.root.setLevel(logging.INFO) + logging.root.handlers = [InterceptHandler()] + + # Configure specific uvicorn loggers + for logger_name in intercept_loggers: + logging_logger = logging.getLogger(logger_name) + logging_logger.handlers.clear() # Remove default handlers + logging_logger.propagate = True # Ensure propagation through Loguru diff --git a/apps/devops/app/common/log/json_sink.py b/apps/devops/app/common/log/json_sink.py new file mode 100644 index 0000000..a798156 --- /dev/null +++ b/apps/devops/app/common/log/json_sink.py @@ -0,0 +1,85 @@ +import json +import datetime +import traceback +from pathlib import Path +from typing import Optional + +class JsonSink: + def __init__( + self, + log_file_path: str, + rotation_size_bytes: int = 10 * 1024 * 1024, + max_backup_files: int = 5, + ): + self.log_file_path = Path(log_file_path) + self.rotation_size = rotation_size_bytes + self.max_backup_files = max_backup_files + self._open_log_file() + + def _open_log_file(self): + # ensure the parent directory exists + parent_dir = self.log_file_path.parent + if not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + self.log_file = self.log_file_path.open("a", encoding="utf-8") + + def _should_rotate(self) -> bool: + return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size + + def _rotate(self): + self.log_file.close() + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}") + self.log_file_path.rename(rotated_path) + self._cleanup_old_backups() + self._open_log_file() + + def _cleanup_old_backups(self): + parent = self.log_file_path.parent + stem = self.log_file_path.stem + suffix = self.log_file_path.suffix + + backup_files = sorted( + parent.glob(f"{stem}_*{suffix}"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + + for old_file in backup_files[self.max_backup_files:]: + try: + old_file.unlink() + except Exception as e: + print(f"Failed to delete old backup {old_file}: {e}") + + def __call__(self, message): + record = message.record + if self._should_rotate(): + self._rotate() + + log_entry = { + "level": record["level"].name.lower(), + "timestamp": int(record["time"].timestamp() * 1000), + "text": record["message"], + "fields": record["extra"].get("properties", {}), + "context": { + "app": record["extra"].get("app"), + "env": record["extra"].get("env"), + "log_file": record["extra"].get("log_file"), + "log_line": record["extra"].get("log_line"), + "topic": record["extra"].get("topic"), + "sender_id": record["extra"].get("sender_id"), + "receiver_id": record["extra"].get("receiver_id"), + "subject": record["extra"].get("subject"), + "event": record["extra"].get("event"), + "host_ip": record["extra"].get("host_ip"), + "host_name": record["extra"].get("host_name"), + }, + "stacktrace": None + } + + if record["exception"]: + exc_type, exc_value, exc_tb = record["exception"] + log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) + + self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + self.log_file.flush() diff --git a/apps/devops/app/common/log/module_logger.py b/apps/devops/app/common/log/module_logger.py new file mode 100644 index 0000000..3e82f74 --- /dev/null +++ b/apps/devops/app/common/log/module_logger.py @@ -0,0 +1,46 @@ +from .application_logger import ApplicationLogger + + +class ModuleLogger(ApplicationLogger): + def __init__(self, sender_id: str) -> None: + super().__init__() + self.event_sender_id = sender_id + self.event_receiver_id = "ModuleLogger" + self.event_subject = "module" + + async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None: + return await super().log_exception( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + exception=exception, + text=text, + properties=properties, + ) + + async def log_info(self, text: str, data: dict[str, any] = None) -> None: + return await super().log_info( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=text, + properties=data, + ) + + async def log_warning(self, text: str, data: dict[str, any] = None) -> None: + return await super().log_warning( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=text, + properties=data, + ) + + async def log_error(self, text: str, data: dict[str, any] = None) -> None: + return await super().log_error( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=text, + properties=data, + ) \ No newline at end of file diff --git a/apps/devops/app/common/models/__init__.py b/apps/devops/app/common/models/__init__.py new file mode 100644 index 0000000..10e5320 --- /dev/null +++ b/apps/devops/app/common/models/__init__.py @@ -0,0 +1,7 @@ +from app.common.models.code_depot.code_depot import CodeDepotDoc +from app.common.models.deployment.deployment import Deployment +from app.common.models.deployment.deployment import DevOpsReconcileRequest, DevOpsReconcileOperationType + +# list of beanie document models, +# must add here so that the mongo db collection can be automatically created +db_models = [Deployment, CodeDepotDoc] \ No newline at end of file diff --git a/apps/devops/app/common/models/code_depot/__init__.py b/apps/devops/app/common/models/code_depot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/common/models/code_depot/code_depot.py b/apps/devops/app/common/models/code_depot/code_depot.py new file mode 100644 index 0000000..c3d8ffc --- /dev/null +++ b/apps/devops/app/common/models/code_depot/code_depot.py @@ -0,0 +1,39 @@ +from datetime import datetime, timezone +from typing import List, Optional, Dict + +from beanie import Document +from enum import IntEnum + +from pymongo import IndexModel + + +class DepotStatus(IntEnum): + TO_BE_CREATED = 0 + CREATED = 1 + DELETED = 2 + + +class UserAccountStatus(IntEnum): + TO_BE_CREATED = 0 + CREATED = 1 + DELETED = 2 + DEACTIVATED = 3 + +class CodeDepotDoc(Document): + depot_name: str + product_id: str + depot_status: DepotStatus + collaborators: List[str] = [] + total_commits: Optional[int] = 0 + last_commiter: Optional[str] = "" + last_update: Optional[datetime] = datetime.now(timezone.utc) + weekly_commits: Optional[Dict[str, int]] = {} + + class Settings: + name = "code_depot" + indexes = [ + IndexModel([("product_id", 1)]) + ] + + + diff --git a/apps/devops/app/common/models/deployment/__init__.py b/apps/devops/app/common/models/deployment/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/common/models/deployment/deployment.py b/apps/devops/app/common/models/deployment/deployment.py new file mode 100644 index 0000000..b2c719a --- /dev/null +++ b/apps/devops/app/common/models/deployment/deployment.py @@ -0,0 +1,88 @@ +from datetime import datetime, timedelta +from typing import Literal, List, Optional +from dataclasses import dataclass +from enum import Enum + +from beanie import Document +from pydantic import Field, field_validator +from pydantic import BaseModel +from pymongo import IndexModel + + +class Deployment(Document): + deployment_id: str + + deployment_stage: str + deployment_status: Literal["started", "failed", "succeeded", "aborted"] + + deployment_target_env: Literal["alpha", "prod"] + deployment_ttl_hours: int = 2 + + deployment_project_id: str + deployment_project_name: str + deployment_product_id: str + deployment_product_name: str + deployment_git_url: str + deployment_git_sha256: str + deployment_reason: str + deployment_app_url: str = "" # URL to access the deployed application, keep it empty to be filled later + + deployed_by: str + created_at: datetime = datetime.now() + updated_at: datetime = datetime.now() + + class Settings: + name = "deployment" + indexes = [ + IndexModel([("deployment_product_id", 1), ("created_at", 1)]), + IndexModel([("deployment_id", 1), ("deployment_status", 1)]), + IndexModel([("deployment_id", 1), ("deployment_stage", 1)], unique=True) + ] + +class InitDeploymentRequest(BaseModel): + product_id: str + sha256: str = "" + target_env: Literal["alpha", "prod"] + user_id: str + reason: str = "not provided" + ttl_hours: int = 3 + +class CheckDeploymentStatusRequest(BaseModel): + product_id: str + target_env: str + user_id: str + +class CheckApplicationLogsRequest(BaseModel): + product_id: str + target_env: Literal["alpha", "prod"] + user_id: str = '' + log_level: List[Literal["info", "error", "debug"]] = Field(default_factory=lambda: ["info"]) + start_time: datetime = datetime.now() - timedelta(minutes=5) + end_time: datetime = datetime.now() + limit: int = 1000 + +class CheckApplicationLogsResponse(BaseModel): + product_id: str + target_env: Literal["alpha", "prod"] + user_id: str = '' + log_level: List[Literal["info", "error", "debug"]] + start_time: datetime + end_time: datetime + limit: int + logs: list[str] + +class DevOpsReconcileOperationType(str, Enum): + START = "start" + TERMINATE = "terminate" + RESTART = "restart" + +class DevOpsReconcileRequest(BaseModel): + operation: DevOpsReconcileOperationType + id: str + devops_proj_id: str + triggered_user_id: str + causes: str + commit_sha256: Optional[str] = None + target_env: Literal["alpha", "prod"] + ttl_control: bool = False + ttl: int = 10800 \ No newline at end of file diff --git a/apps/devops/app/common/models/deployment/heartbeat.py b/apps/devops/app/common/models/deployment/heartbeat.py new file mode 100644 index 0000000..7c7a624 --- /dev/null +++ b/apps/devops/app/common/models/deployment/heartbeat.py @@ -0,0 +1,17 @@ +from typing import Literal, Optional +from pydantic import BaseModel + + +class DevOpsReconcileJobHeartbeatPayload(BaseModel): + operation: Literal["heartbeat"] = "heartbeat" + id: str + status: Literal["running", "success", "failed", "terminated"] + phase: Literal["initializing", "jenkins_build", "building", "deploying", "finished"] + phase_message: str + error: Optional[str] = None + url: Optional[str] = None + + +class DevOpsReconcileJobHeartbeatMessage(BaseModel): + event_type: Literal["DevOpsReconcileJobHeartbeat"] + payload: DevOpsReconcileJobHeartbeatPayload \ No newline at end of file diff --git a/apps/devops/app/common/probes/__init__.py b/apps/devops/app/common/probes/__init__.py new file mode 100644 index 0000000..4071df8 --- /dev/null +++ b/apps/devops/app/common/probes/__init__.py @@ -0,0 +1,140 @@ +import logging +from enum import Enum +from typing import Optional, Callable, Tuple, Dict +import inspect +from datetime import datetime, timezone + +# ProbeType is an Enum that defines the types of probes that can be registered. +class ProbeType(Enum): + LIVENESS = "liveness" + READINESS = "readiness" + STARTUP = "startup" + +# ProbeResult is a class that represents the result of a probe check. +class ProbeResult: + def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): + self.success = success + self.message = message + self.data = data or {} + + def to_dict(self) -> dict: + return { + "success": self.success, + "message": self.message, + "data": self.data + } + +# Probe is a class that represents a probe that can be registered. +class Probe: + def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): + self.type = type + self.path = path + self.check_fn = check_fn + self.name = name or f"{type.value}-{id(self)}" + + async def execute(self) -> ProbeResult: + try: + result = self.check_fn() + if inspect.isawaitable(result): + result = await result + + if isinstance(result, ProbeResult): + return result + elif isinstance(result, bool): + return ProbeResult(result, "ok" if result else "failed") + else: + return ProbeResult(True, "ok") + except Exception as e: + return ProbeResult(False, str(e)) + +# ProbeGroup is a class that represents a group of probes that can be checked together. +class ProbeGroup: + def __init__(self, path: str): + self.path = path + self.probes: Dict[str, Probe] = {} + + def add_probe(self, probe: Probe): + self.probes[probe.name] = probe + + async def check_all(self) -> Tuple[bool, dict]: + results = {} + all_success = True + + for name, probe in self.probes.items(): + result = await probe.execute() + results[name] = result.to_dict() + if not result.success: + all_success = False + + return all_success, results + +# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. +class FrameworkAdapter: + async def handle_request(self, group: ProbeGroup): + all_success, results = await group.check_all() + status_code = 200 if all_success else 503 + return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code + + def register_route(self, path: str, handler: Callable): + raise NotImplementedError + +# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. +class ProbeManager: + _default_paths = { + ProbeType.LIVENESS: "/_/livez", + ProbeType.READINESS: "/_/readyz", + ProbeType.STARTUP: "/_/healthz" + } + + def __init__(self): + self.groups: Dict[str, ProbeGroup] = {} + self.adapters: Dict[str, FrameworkAdapter] = {} + self._startup_complete = False + + def register_adapter(self, framework: str, adapter: FrameworkAdapter): + self.adapters[framework] = adapter + logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") + + def register( + self, + type: ProbeType, + check_func: Optional[Callable] = None, + path: Optional[str] = None, + prefix: str = "", + name: Optional[str] = None, + frameworks: Optional[list] = None + ): + path = path or self._default_paths.get(type, "/_/healthz") + if prefix: + path = f"{prefix}{path}" + + if type == ProbeType.STARTUP and check_func is None: + check_func = self._default_startup_check + + probe = Probe(type, path, check_func or (lambda: True), name) + + if path not in self.groups: + self.groups[path] = ProbeGroup(path) + self.groups[path].add_probe(probe) + + for framework in (frameworks or ["default"]): + self._register_route(framework, path) + logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") + + def _register_route(self, framework: str, path: str): + if framework not in self.adapters: + return + + adapter = self.adapters[framework] + group = self.groups[path] + + async def handler(): + return await adapter.handle_request(group) + + adapter.register_route(path, handler) + + def _default_startup_check(self) -> bool: + return self._startup_complete + + def mark_startup_complete(self): + self._startup_complete = True \ No newline at end of file diff --git a/apps/devops/app/common/probes/adapters.py b/apps/devops/app/common/probes/adapters.py new file mode 100644 index 0000000..2ecd38a --- /dev/null +++ b/apps/devops/app/common/probes/adapters.py @@ -0,0 +1,15 @@ +from . import FrameworkAdapter +from fastapi.responses import JSONResponse +from typing import Callable + +# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI. +class FastAPIAdapter(FrameworkAdapter): + def __init__(self, app): + self.app = app + + def register_route(self,path: str, handler: Callable): + async def wrapper(): + data, status_code = await handler() + return JSONResponse(content=data, status_code=status_code) + + self.app.add_api_route(path, wrapper, methods=["GET"]) diff --git a/apps/devops/app/envs/alpha.yml b/apps/devops/app/envs/alpha.yml new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/envs/prod.yml b/apps/devops/app/envs/prod.yml new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/main.py b/apps/devops/app/main.py new file mode 100644 index 0000000..1bfc3d4 --- /dev/null +++ b/apps/devops/app/main.py @@ -0,0 +1,17 @@ +from fastapi.responses import RedirectResponse +from app.common.config.site_settings import site_settings +from app.bootstrap.application import create_app + +app = create_app() + +@app.get("/", status_code=301) +async def root(): + """ + TODO: redirect client to /doc# + """ + return RedirectResponse("docs") + +if __name__ == "__main__": + import uvicorn + print("Starting FastAPI server...") + uvicorn.run("main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT, reload=True) \ No newline at end of file diff --git a/apps/devops/app/providers/__init__.py b/apps/devops/app/providers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/providers/common.py b/apps/devops/app/providers/common.py new file mode 100644 index 0000000..656bcc3 --- /dev/null +++ b/apps/devops/app/providers/common.py @@ -0,0 +1,31 @@ +from fastapi.middleware.cors import CORSMiddleware +from app.common.config.site_settings import site_settings + + +def register(app): + app.debug = site_settings.DEBUG + app.title = site_settings.NAME + + add_global_middleware(app) + + # This hook ensures that a connection is opened to handle any queries + # generated by the request. + @app.on_event("startup") + async def startup(): + pass + + # This hook ensures that the connection is closed when we've finished + # processing the request. + @app.on_event("shutdown") + async def shutdown(): + pass + + +def add_global_middleware(app): + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) diff --git a/apps/devops/app/providers/database.py b/apps/devops/app/providers/database.py new file mode 100644 index 0000000..8716b8e --- /dev/null +++ b/apps/devops/app/providers/database.py @@ -0,0 +1,34 @@ +import asyncio +from app.common.config.app_settings import app_settings +from beanie import init_beanie +from motor.motor_asyncio import AsyncIOMotorClient +from app.common.models import db_models +from app.common.probes import ProbeResult + +client = AsyncIOMotorClient( + app_settings.APP_MONGODB_URI, + serverSelectionTimeoutMS=60000, + minPoolSize=5, # Minimum number of connections in the pool + maxPoolSize=20, # Maximum number of connections in the pool +) + +def register(app): + app.debug = "auth_mongo_debug" + app.title = "auth_mongo_name" + + @app.on_event("startup") + async def start_database(): + await initiate_database() + +async def check_database_initialized() -> ProbeResult: + try: + await asyncio.wait_for(client.server_info(), timeout=5) + return ProbeResult(success=True, message="service has been initialized and ready to serve") + except Exception: + return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"}) + + +async def initiate_database(): + await init_beanie( + database=client[app_settings.APP_MONGODB_NAME], document_models=db_models + ) diff --git a/apps/devops/app/providers/exception_handler.py b/apps/devops/app/providers/exception_handler.py new file mode 100644 index 0000000..21117a5 --- /dev/null +++ b/apps/devops/app/providers/exception_handler.py @@ -0,0 +1,39 @@ +from fastapi import FastAPI, HTTPException +from fastapi.exceptions import RequestValidationError +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.status import ( + HTTP_400_BAD_REQUEST, + HTTP_401_UNAUTHORIZED, + HTTP_403_FORBIDDEN, + HTTP_404_NOT_FOUND, + HTTP_422_UNPROCESSABLE_ENTITY, + HTTP_500_INTERNAL_SERVER_ERROR, +) + + +async def custom_http_exception_handler(request: Request, exc: HTTPException): + return JSONResponse( + status_code=exc.status_code, + content={"error": exc.detail}, + ) + + + +async def validation_exception_handler(request: Request, exc: RequestValidationError): + return JSONResponse( + status_code=HTTP_400_BAD_REQUEST, + content={"error": str(exc)}, + ) + +async def exception_handler(request: Request, exc: Exception): + return JSONResponse( + status_code=HTTP_500_INTERNAL_SERVER_ERROR, + content={"error": str(exc)}, + ) + + +def register(app: FastAPI): + app.add_exception_handler(HTTPException, custom_http_exception_handler) + app.add_exception_handler(RequestValidationError, validation_exception_handler) + app.add_exception_handler(Exception, exception_handler) diff --git a/apps/devops/app/providers/logger.py b/apps/devops/app/providers/logger.py new file mode 100644 index 0000000..2785603 --- /dev/null +++ b/apps/devops/app/providers/logger.py @@ -0,0 +1,7 @@ +from app.common.log.base_logger import LoggerBase + + +def register_logger(): + print("πŸ“’ Setting up logging interception...") + LoggerBase.configure_uvicorn_logging() + print("βœ… Logging interception complete. Logs are formatted and deduplicated!") diff --git a/apps/devops/app/providers/message_queue.py b/apps/devops/app/providers/message_queue.py new file mode 100644 index 0000000..b2d87cb --- /dev/null +++ b/apps/devops/app/providers/message_queue.py @@ -0,0 +1,60 @@ +import asyncio +import os +from app.backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber +from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService + + +def register(app): + # Initialize the message subscriber and status update service lazily to avoid blocking startup + app.deployment_heartbeat_subscriber = None + app.deployment_status_service = None + + @app.on_event("startup") + async def start_message_consumers(): + print("πŸš€ Starting message consumers...") + + try: + # Initialize services during startup to avoid blocking app initialization + print("πŸ”§ Initializing services...") + output_queue_name = os.getenv("RABBITMQ_OUTPUT_QUEUE_NAME", "freeleaps.devops.reconciler.output") + print(f"Using output queue: {output_queue_name}") + app.deployment_heartbeat_subscriber = AsyncMQSubscriber(output_queue_name) + app.deployment_status_service = DeploymentStatusUpdateService() + print("βœ… Services initialized") + + # Register the heartbeat processor + print("πŸ“ Registering deployment heartbeat processor...") + await app.deployment_heartbeat_subscriber.register_consumer( + registry_key="deployment_heartbeat_processor", + callback_method=app.deployment_status_service.process_heartbeat_message, + args={} + ) + print("βœ… Registered deployment heartbeat processor") + + # Start the subscriber in the background + print("πŸ”„ Starting subscriber in background...") + loop = asyncio.get_running_loop() + loop.create_task( + app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop) + ) + print("βœ… Started deployment heartbeat subscriber") + print("πŸŽ‰ Message consumers startup complete!") + except Exception as e: + print(f"❌ Error in message consumer startup: {e}") + # Don't raise the exception to prevent app startup failure + print("⚠️ App will continue without message queue functionality") + + @app.on_event("shutdown") + async def stop_message_consumers(): + # Clear consumers and close connection + print("Stopping message consumers") + if app.deployment_heartbeat_subscriber: + try: + await app.deployment_heartbeat_subscriber.clear_all_consumers() + print("Cleared all consumers") + await app.deployment_heartbeat_subscriber.close() + print("Closed deployment heartbeat subscriber") + except Exception as e: + print(f"Error during shutdown: {e}") + else: + print("No message consumers to stop") \ No newline at end of file diff --git a/apps/devops/app/providers/metrics.py b/apps/devops/app/providers/metrics.py new file mode 100644 index 0000000..1ae941a --- /dev/null +++ b/apps/devops/app/providers/metrics.py @@ -0,0 +1,16 @@ +import logging +from prometheus_fastapi_instrumentator import Instrumentator +from app.common.config.app_settings import app_settings + +def register(app): + instrumentator = ( + Instrumentator().instrument( + app, + metric_namespace="freeleaps", + metric_subsystem=app_settings.APP_NAME) + ) + + @app.on_event("startup") + async def startup(): + instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True) + logging.info("Metrics endpoint exposed at /api/_/metrics") \ No newline at end of file diff --git a/apps/devops/app/providers/probes.py b/apps/devops/app/providers/probes.py new file mode 100644 index 0000000..883e3d6 --- /dev/null +++ b/apps/devops/app/providers/probes.py @@ -0,0 +1,25 @@ +from app.common.probes import ProbeManager, ProbeType +from app.common.probes.adapters import FastAPIAdapter +from .database import check_database_initialized + +def register(app): + probes_manager = ProbeManager() + probes_manager.register_adapter("fastapi", FastAPIAdapter(app)) + + async def readiness_checker(): + return await check_database_initialized() + + probes_manager.register( + name="readiness", + prefix="/api", + type=ProbeType.READINESS, + check_func=readiness_checker, + frameworks=["fastapi"] + ) + + probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"]) + probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"]) + + @app.on_event("startup") + async def mark_startup_complete(): + probes_manager.mark_startup_complete() \ No newline at end of file diff --git a/apps/devops/app/providers/router.py b/apps/devops/app/providers/router.py new file mode 100644 index 0000000..b273eb8 --- /dev/null +++ b/apps/devops/app/providers/router.py @@ -0,0 +1,34 @@ +from app.routes import api_router + +from starlette import routing + + +def register(app): + app.include_router( + api_router, + prefix="/api", + tags=["api"], + dependencies=[], + responses={404: {"description": "no page found"}}, + ) + + if app.debug: + for route in app.routes: + if not isinstance(route, routing.WebSocketRoute): + print( + { + "path": route.path, + "endpoint": route.endpoint, + "name": route.name, + "methods": route.methods, + } + ) + else: + print( + { + "path": route.path, + "endpoint": route.endpoint, + "name": route.name, + "type": "web socket route", + } + ) diff --git a/apps/devops/app/providers/scheduler.py b/apps/devops/app/providers/scheduler.py new file mode 100644 index 0000000..7ea8d6c --- /dev/null +++ b/apps/devops/app/providers/scheduler.py @@ -0,0 +1,8 @@ +import asyncio + + +def register(app): + @app.on_event("startup") + async def start_scheduler(): + #create your scheduler here + pass diff --git a/apps/devops/app/routes/__init__.py b/apps/devops/app/routes/__init__.py new file mode 100644 index 0000000..ab02a04 --- /dev/null +++ b/apps/devops/app/routes/__init__.py @@ -0,0 +1,7 @@ +from fastapi import APIRouter +from app.routes.deployment.apis import router as deployment_api + +api_router = APIRouter() + +# TODO: add custom routers here +api_router.include_router(deployment_api, tags=["deployment"]) diff --git a/apps/devops/app/routes/deployment/__init__.py b/apps/devops/app/routes/deployment/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/app/routes/deployment/apis.py b/apps/devops/app/routes/deployment/apis.py new file mode 100644 index 0000000..31d61d3 --- /dev/null +++ b/apps/devops/app/routes/deployment/apis.py @@ -0,0 +1,75 @@ +from datetime import datetime +from typing import List, Optional + +from fastapi import APIRouter, Depends +from loguru import logger + +from app.common.models import CodeDepotDoc +from app.common.models.deployment.deployment import Deployment, InitDeploymentRequest, CheckDeploymentStatusRequest, \ + CheckApplicationLogsRequest, CheckApplicationLogsResponse +from app.routes.deployment.service import DeploymentService, get_deployment_service + +router = APIRouter(prefix="/deployment") + +@router.post("/initDeployment") +## insert a new Deployment object to db +async def init_deployment( + request: InitDeploymentRequest, + service: DeploymentService = Depends(get_deployment_service) +) -> Deployment: + return await service.init_deployment(request) + +@router.get('/getLatestDeployment') +async def get_latest_deployment( + product_id: str, + target_env: str = "alpha", + service: DeploymentService = Depends(get_deployment_service) +) -> Optional[Deployment]: + """ + Get the latest deployment for a given product ID. + """ + return await service.get_latest_deployment(product_id, target_env) + +@router.post("/updateDeploymentStatus") +async def update_deployment( + request: Deployment, + service: DeploymentService = Depends(get_deployment_service) +) -> Deployment: + return await service.update_deployment_status(request) + +@router.get("/checkDeploymentStatus") +async def check_deployment_status( + product_id: str, + service: DeploymentService = Depends(get_deployment_service) +) -> List[Deployment]: + return await service.check_deployment_status(product_id) + +@router.post("/createDummyCodeDepot") +async def create_dummy_code_depot( + service: DeploymentService = Depends(get_deployment_service) +) -> CodeDepotDoc: + """ + Create a dummy code depot for testing purposes. + """ + try: + depot_name = await service.create_dummy_code_depot() + return depot_name + except Exception as e: + logger.error(f"Failed to create dummy code depot: {e}") + raise e + +@router.post("/checkApplicationLogs") +async def check_application_logs( + request: CheckApplicationLogsRequest, + service: DeploymentService = Depends(get_deployment_service) +) -> CheckApplicationLogsResponse: + """ + Check application logs for a given deployment. + """ + try: + res = await service.check_application_logs(request) + return res + except Exception as e: + logger.error(f"Failed to check application logs: {e}") + raise e + diff --git a/apps/devops/app/routes/deployment/service.py b/apps/devops/app/routes/deployment/service.py new file mode 100644 index 0000000..650dd8d --- /dev/null +++ b/apps/devops/app/routes/deployment/service.py @@ -0,0 +1,262 @@ +import uuid +from collections import defaultdict +from datetime import datetime, timedelta +from typing import List + +import httpx +import requests +from fastapi import HTTPException, Depends + +from app.common.config.site_settings import site_settings +from app.common.models import Deployment, DevOpsReconcileRequest, DevOpsReconcileOperationType +from app.common.models.code_depot.code_depot import CodeDepotDoc, DepotStatus +from app.common.models.deployment.deployment import InitDeploymentRequest, CheckApplicationLogsRequest, \ + CheckApplicationLogsResponse + +from loguru import logger + + +class DeploymentService: + + def __init__(self): + pass + + async def init_deployment( + self, + request: InitDeploymentRequest, + ) -> Deployment: + """ + """ + # TODO validate permission with user_id + # currently skip + + code_depot = await self._get_code_depot_by_product_id(request.product_id) + + git_url = await self._compose_git_url(code_depot.depot_name) + + + # retrieve project name + project_name = "TODO" + + # retrieve product info, depot name should be the same as product name + product_id = request.product_id + product_name = code_depot.depot_name + + deployment = Deployment.model_construct( + deployment_id = str(uuid.uuid4()), + deployment_stage = "init", + deployment_status = "started", + deployment_target_env = request.target_env, + deployment_ttl_hours = request.ttl_hours, + deployment_project_id = "project_id", + deployment_project_name = "project_name", + deployment_product_id = product_id, + deployment_product_name = product_name, + deployment_git_url = git_url, + deployment_git_sha256 = request.sha256, + deployment_reason = request.reason, + deployed_by = request.user_id, + created_at = datetime.now(), + updated_at = datetime.now(), + ) + + await self._start_deployment(deployment) + res = await Deployment.insert(deployment) + + return res + + async def get_latest_deployment( + self, + product_id: str, + target_env: str, + ) -> Deployment: + time_threshold = datetime.now() - timedelta(hours=168) # 7 days + deployment_records = await Deployment.find( + Deployment.deployment_product_id == product_id, + Deployment.deployment_target_env == target_env, + Deployment.updated_at >= time_threshold + ).to_list() + + if not deployment_records or len(deployment_records) == 0: + logger.warning(f"No deployment records found for product ID: {product_id} in the last 7 days") + return None + + latest_deployment = max(deployment_records, key=lambda d: (d.updated_at, d.created_at)) + return latest_deployment + + async def check_deployment_status( + self, + product_id: str, + ) -> List[Deployment]: + """ + Check the deployment status of the application, only check past 48 hours + """ + # TODO implement this function + time_threshold = datetime.now() - timedelta(hours=48) + deployment_records = await Deployment.find( + Deployment.deployment_product_id == product_id, + Deployment.created_at >= time_threshold + ).to_list() + grouped = defaultdict(list) + for deployment in deployment_records: + grouped[deployment.deployment_id].append(deployment) + for deployment_list in grouped.values(): + deployment_list.sort(key=lambda d: (d.created_at, d.updated_at)) + latest_deployments = [deployments[-1] for deployments in grouped.values()] + + return latest_deployments + + async def update_deployment_status( + self, + deployment: Deployment + ) -> Deployment: + latest_record = await Deployment.find_one( + Deployment.deployment_id == deployment.deployment_id, + sort=[("created_at", -1)] + ) + + if not latest_record: + raise HTTPException(status_code=404, detail="No record found, please initiate deployment first") + + # TODO add more sanity check logic here + + # if updating the same stage, just update the status and timestamp + # else, create a new record with the same deployment_id + res = None + if deployment.deployment_stage == latest_record.deployment_stage: + # update existing record + latest_record.deployment_status = deployment.deployment_status + latest_record.updated_at = deployment.updated_at or datetime.now() + res = await latest_record.save() + else: + # create new record + deployment.deployment_id = latest_record.deployment_id + deployment.created_at = datetime.now() + deployment.updated_at = datetime.now() + res = await deployment.insert() + + return res + + async def _get_code_depot_by_product_id( + self, + product_id: str, + ) -> CodeDepotDoc: + """ + Retrieve code depot by product id + """ + code_depot = await CodeDepotDoc.find_one(CodeDepotDoc.product_id == product_id) + if not code_depot: + raise HTTPException(status_code=404, + detail="Code depot not found for the given product id, " + "please initialize the product first" + ) + return code_depot + + async def _compose_git_url( + self, + code_depot_name: str, + gitea_base_url: str = site_settings.BASE_GITEA_URL + ) -> str: + """ + Retrieve git url by product id + """ + return f"{gitea_base_url}/prodcuts/{code_depot_name.lower()}.git" + + async def _start_deployment( + self, + deployment: Deployment, + reconsile_base_url: str = site_settings.BASE_RECONCILE_URL, + ) -> bool: + """ + Start the deployment + Return true atm, modify calling reconcile service later + """ + # construct request body + request = DevOpsReconcileRequest( + operation=DevOpsReconcileOperationType.START, + id=deployment.deployment_id, + devops_proj_id=deployment.deployment_product_id, + triggered_user_id=deployment.deployed_by, + causes=deployment.deployment_reason, + target_env=deployment.deployment_target_env, + ttl_control=True, + ttl=deployment.deployment_ttl_hours * 60 * 60, + commit_sha256=deployment.deployment_git_sha256, + ) + # send request to reoncile service + async with httpx.AsyncClient() as client: + response = await client.post( + f"{reconsile_base_url}/api/devops/reconcile", + json=request.model_dump() + ) + if response.status_code != 200: + raise HTTPException(status_code=response.status_code, detail=response.text) + return True + + async def check_application_logs( + self, + request: CheckApplicationLogsRequest, + loki_url: str = site_settings.BASE_LOKI_URL, + ) -> CheckApplicationLogsResponse: + # Convert to nanoseconds since epoch + start_ns = int(request.start_time.timestamp() * 1e9) + end_ns = int(request.end_time.timestamp() * 1e9) + + # TODO: convert product_id to application name if needed + base_query = f'{{application="{request.product_id}", environment="{request.target_env}"}}' + log_level = '|'.join(request.log_level) if request.log_level else '' + loki_query = f'{base_query} |~ "{log_level}"' + + params = { + "query": loki_query, + "limit": request.limit, + "start": start_ns, + "end": end_ns, + } + + url = f"{loki_url}/loki/api/v1/query_range" + response = requests.get(url, params=params) + + if response.status_code != 200: + raise Exception(f"Query failed: {response.status_code} - {response.text}") + + data = response.json() + streams = data.get("data", {}).get("result", []) + + logs = [] + for stream in streams: + for ts, log in stream.get("values", []): + timestamp = datetime.fromtimestamp(int(ts) / 1e9) + logs.append(f"[{timestamp}] {log.strip()}") + + return CheckApplicationLogsResponse( + product_id=request.product_id, + target_env=request.target_env, + user_id=request.user_id, + log_level=request.log_level, + start_time=request.start_time, + end_time=request.end_time, + limit=request.limit, + logs=logs + ) + +# TODO: dummy test code, remove later + async def create_dummy_code_depot( + self, + ) -> CodeDepotDoc: + """ + Create a dummy code depot for testing purposes. + """ + depot_name = f"dummy-depot-{uuid.uuid4()}" + code_depot = CodeDepotDoc( + depot_name=depot_name, + product_id="dummy-product-id", + depot_status=DepotStatus.CREATED + ) + + return await CodeDepotDoc.insert_one(code_depot) + +deployment_service = DeploymentService() + +def get_deployment_service() -> DeploymentService: + return deployment_service \ No newline at end of file diff --git a/apps/devops/app/scripts/mongodb/docker-compose.yml b/apps/devops/app/scripts/mongodb/docker-compose.yml new file mode 100644 index 0000000..8ab07c7 --- /dev/null +++ b/apps/devops/app/scripts/mongodb/docker-compose.yml @@ -0,0 +1,18 @@ +version: '3.8' + +services: + mongodb: + image: mongo:6.0 # You can change to the desired version + container_name: mongodb + restart: unless-stopped + ports: + - "27017:27017" + environment: + MONGO_INITDB_DATABASE: testdb # <-- This creates the initial database + volumes: + - mongodb_data:/data/db + command: ["mongod", "--noauth"] # <-- Disable authentication + + +volumes: + mongodb_data: \ No newline at end of file diff --git a/apps/devops/requirements.txt b/apps/devops/requirements.txt new file mode 100644 index 0000000..f6f1fa8 --- /dev/null +++ b/apps/devops/requirements.txt @@ -0,0 +1,15 @@ +beanie==1.29.0 +fastapi==0.115.12 +loguru==0.7.3 +motor==3.7.0 +prometheus_fastapi_instrumentator==7.1.0 +pydantic_settings==2.9.1 +pytest==7.1.2 +starlette==0.46.2 +uvicorn==0.34.2 +httpx==0.24.0 +pydantic-settings~=2.9.1 +pymongo~=4.12.1 +pydantic~=2.11.4 +requests~=2.32.3 +aio-pika==9.4.3 \ No newline at end of file diff --git a/apps/devops/tests/__init__.py b/apps/devops/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/tests/routes/__init__.py b/apps/devops/tests/routes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devops/tests/routes/test_hello_world.py b/apps/devops/tests/routes/test_hello_world.py new file mode 100644 index 0000000..638a4b2 --- /dev/null +++ b/apps/devops/tests/routes/test_hello_world.py @@ -0,0 +1,27 @@ +from unittest.mock import AsyncMock, patch +from fastapi.testclient import TestClient +from app.main import app +from app.routes.hello_world.apis import get_hello_world_dao + + +def test_hello_world(): + with TestClient(app) as client: + response = client.get("/api/hello_world/") + assert response.status_code == 200 + assert response.json() == {"message": "Hello, World!"} + + +# mock out initiate_database so it doesn’t run during tests +@patch("app.providers.database.initiate_database", new_callable=AsyncMock) +def test_insert_hello_world(mock_db_init): + + class MockHelloWorldDao: + async def create_hello_world(self, msg: str, user_id: int): + return {"message": msg, "user_id": user_id} + + app.dependency_overrides[get_hello_world_dao] = lambda: MockHelloWorldDao() + with TestClient(app) as client: + response = client.post("/api/hello_world/insert", params={"msg": "Test Message"}) + assert response.status_code == 200 + assert response.json() == {"message": "Test Message", "user_id": 1} + app.dependency_overrides.clear() diff --git a/apps/devops/tests/services/test_deployment_status_update_service.py b/apps/devops/tests/services/test_deployment_status_update_service.py new file mode 100644 index 0000000..8bd4a01 --- /dev/null +++ b/apps/devops/tests/services/test_deployment_status_update_service.py @@ -0,0 +1,216 @@ +import pytest +from unittest.mock import AsyncMock +from datetime import datetime +from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService +from app.common.models.deployment.deployment import Deployment + + +@pytest.fixture +def status_update_service(): + return DeploymentStatusUpdateService() + + +@pytest.fixture +def sample_heartbeat_message(): + return { + "event_type": "DevOpsReconcileJobHeartbeat", + "payload": { + "operation": "heartbeat", + "id": "deployment-123-abc", + "status": "running", + "phase": "building", + "phase_message": "Building container image", + "error": None, + "url": None + } + } + + +@pytest.fixture +def sample_success_message(): + return { + "event_type": "DevOpsReconcileJobHeartbeat", + "payload": { + "operation": "heartbeat", + "id": "deployment-789-ghi", + "status": "success", + "phase": "finished", + "phase_message": "Deployment completed successfully", + "error": None, + "url": "https://my-app-alpha.freeleaps.com" + } + } + + +@pytest.fixture +def sample_failed_message(): + return { + "event_type": "DevOpsReconcileJobHeartbeat", + "payload": { + "operation": "heartbeat", + "id": "deployment-456-def", + "status": "failed", + "phase": "jenkins_build", + "phase_message": "Build failed due to compilation errors", + "error": "Build step 'Invoke top-level Maven targets' marked build as failure", + "url": None + } + } + + +@pytest.fixture +def mock_deployment(): + from unittest.mock import AsyncMock + + class MockDeployment: + def __init__(self): + self.deployment_id = "deployment-123-abc" + self.deployment_status = "started" + self.deployment_stage = "initialization" + self.deployment_app_url = "" + self.updated_at = datetime.now() + self.save = AsyncMock() + + return MockDeployment() + + +class TestDeploymentStatusUpdateService: + + @pytest.mark.asyncio + async def test_status_mapping(self, status_update_service): + """Test that status mapping works correctly""" + assert status_update_service.status_mapping["running"] == "started" + assert status_update_service.status_mapping["success"] == "succeeded" + assert status_update_service.status_mapping["failed"] == "failed" + assert status_update_service.status_mapping["terminated"] == "aborted" + + @pytest.mark.asyncio + async def test_phase_to_stage_mapping(self, status_update_service): + """Test that phase to stage mapping works correctly""" + assert status_update_service.phase_to_stage_mapping["initializing"] == "initialization" + assert status_update_service.phase_to_stage_mapping["jenkins_build"] == "build" + assert status_update_service.phase_to_stage_mapping["building"] == "build" + assert status_update_service.phase_to_stage_mapping["deploying"] == "deployment" + assert status_update_service.phase_to_stage_mapping["finished"] == "completed" + + @pytest.mark.asyncio + async def test_process_running_heartbeat_message(self, status_update_service, sample_heartbeat_message, mock_deployment, monkeypatch): + """Test processing a running status heartbeat""" + # Mock Deployment.find_one to return our mock deployment + async def mock_find_one(query): + _ = query # Parameter required by interface but not used in mock + return mock_deployment + + # Mock the logger methods to avoid actual logging during tests + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Mock the Beanie query mechanism properly + mock_deployment_class = AsyncMock() + mock_deployment_class.find_one = mock_find_one + monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class) + + await status_update_service.process_heartbeat_message( + "test_key", sample_heartbeat_message, {} + ) + + # Verify the deployment was updated correctly + assert mock_deployment.deployment_status == "started" + assert mock_deployment.deployment_stage == "build" + mock_deployment.save.assert_called_once() + + @pytest.mark.asyncio + async def test_process_success_heartbeat_message(self, status_update_service, sample_success_message, mock_deployment, monkeypatch): + """Test processing a success status heartbeat with URL""" + async def mock_find_one(query): + _ = query # Parameter required by interface but not used in mock + return mock_deployment + + # Mock the logger methods + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Mock the Beanie query mechanism properly + mock_deployment_class = AsyncMock() + mock_deployment_class.find_one = mock_find_one + monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class) + + await status_update_service.process_heartbeat_message( + "test_key", sample_success_message, {} + ) + + # Verify the deployment was updated correctly + assert mock_deployment.deployment_status == "succeeded" + assert mock_deployment.deployment_stage == "completed" + assert mock_deployment.deployment_app_url == "https://my-app-alpha.freeleaps.com" + mock_deployment.save.assert_called_once() + + @pytest.mark.asyncio + async def test_process_failed_heartbeat_message(self, status_update_service, sample_failed_message, mock_deployment, monkeypatch): + """Test processing a failed status heartbeat""" + async def mock_find_one(query): + _ = query # Parameter required by interface but not used in mock + return mock_deployment + + # Mock the logger methods + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Mock the Beanie query mechanism properly + mock_deployment_class = AsyncMock() + mock_deployment_class.find_one = mock_find_one + monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class) + + await status_update_service.process_heartbeat_message( + "test_key", sample_failed_message, {} + ) + + # Verify the deployment was updated correctly + assert mock_deployment.deployment_status == "failed" + assert mock_deployment.deployment_stage == "build" + mock_deployment.save.assert_called_once() + + @pytest.mark.asyncio + async def test_deployment_not_found(self, status_update_service, sample_heartbeat_message, monkeypatch): + """Test handling when deployment is not found""" + async def mock_find_one(query): + _ = query # Parameter required by interface but not used in mock + return None + + # Mock the logger methods + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Mock the Beanie query mechanism properly + mock_deployment_class = AsyncMock() + mock_deployment_class.find_one = mock_find_one + monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class) + + # Should not raise an exception + await status_update_service.process_heartbeat_message( + "test_key", sample_heartbeat_message, {} + ) + + @pytest.mark.asyncio + async def test_invalid_message_format(self, status_update_service): + """Test handling invalid message format""" + invalid_message = {"invalid": "format"} + + # Mock the logger methods + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Should not raise an exception due to try/catch in the method + await status_update_service.process_heartbeat_message( + "test_key", invalid_message, {} + ) \ No newline at end of file diff --git a/apps/devops/tests/test_main.http b/apps/devops/tests/test_main.http new file mode 100644 index 0000000..b847198 --- /dev/null +++ b/apps/devops/tests/test_main.http @@ -0,0 +1,8 @@ +# Test your FastAPI endpoints + +GET http://localhost:8000/api/hello_world/ +Accept: application/json + +### +POST http://localhost:8000/api/hello_world/insert?msg=Hello%20World +Accept: application/json diff --git a/apps/helloworld/README.md b/apps/helloworld/README.md new file mode 100644 index 0000000..2fb84b3 --- /dev/null +++ b/apps/helloworld/README.md @@ -0,0 +1,20 @@ +This is a template backend service based on fastapi + mongodb app + +To start development in local, go to the root directory of the project YOUR_WORKSPACE_PATH/helloworld/ +```bash +docker compose -f app/scripts/mongodb/docker-compose.yml up -d +``` + +Then run the app +```bash +uvicorn app.main:app --reload +``` + +In case a new dependency is added, run the following command to update the requirements.txt file +```bash +# optional: if you have not installed pipreqs +pip3 install pipreqs + +# generate requirements.txt +pipreqs . --force +``` diff --git a/apps/helloworld/app/__init__.py b/apps/helloworld/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/bootstrap/__init__.py b/apps/helloworld/app/bootstrap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/bootstrap/application.py b/apps/helloworld/app/bootstrap/application.py new file mode 100644 index 0000000..24223b6 --- /dev/null +++ b/apps/helloworld/app/bootstrap/application.py @@ -0,0 +1,82 @@ +import logging +from fastapi import FastAPI +from fastapi.openapi.utils import get_openapi + +from app.providers import common +from app.providers.logger import register_logger +from app.providers import router +from app.providers import database +from app.providers import metrics +from app.providers import probes +from app.providers import exception_handler +from app.common.config.app_settings import app_settings + +def create_app() -> FastAPI: + logging.info("App initializing") + + app = FreeleapsApp() + + register_logger() + register(app, exception_handler) + register(app, database) + register(app, router) + # register(app, scheduler) + register(app, common) + + # Call the custom_openapi function to change the OpenAPI version + customize_openapi_security(app) + # Register probe APIs if enabled + if app_settings.PROBES_ENABLED: + register(app, probes) + + # Register metrics APIs if enabled + if app_settings.METRICS_ENABLED: + register(app, metrics) + return app + + +# This function overrides the OpenAPI schema version to 3.0.0 +def customize_openapi_security(app: FastAPI) -> None: + + def custom_openapi(): + if app.openapi_schema: + return app.openapi_schema + + # Generate OpenAPI schema + openapi_schema = get_openapi( + title="FreeLeaps API", + version="3.1.0", + description="FreeLeaps API Documentation", + routes=app.routes, + ) + + # Ensure the components section exists in the OpenAPI schema + if "components" not in openapi_schema: + openapi_schema["components"] = {} + + # Add security scheme to components + openapi_schema["components"]["securitySchemes"] = { + "bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"} + } + + # Add security requirement globally + openapi_schema["security"] = [{"bearerAuth": []}] + + app.openapi_schema = openapi_schema + return app.openapi_schema + + app.openapi = custom_openapi + + +def register(app, provider): + logging.info(provider.__name__ + " registering") + provider.register(app) + + +def boot(app, provider): + logging.info(provider.__name__ + " booting") + provider.boot(app) + +class FreeleapsApp(FastAPI): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) diff --git a/apps/helloworld/app/common/__init__.py b/apps/helloworld/app/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/common/config/__init__.py b/apps/helloworld/app/common/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/common/config/app_settings.py b/apps/helloworld/app/common/config/app_settings.py new file mode 100644 index 0000000..9a73bcb --- /dev/null +++ b/apps/helloworld/app/common/config/app_settings.py @@ -0,0 +1,29 @@ +from pydantic_settings import BaseSettings + +# NOTE: The values fall backs to your environment variables when not set here +class AppSettings(BaseSettings): + NAME: str = "YOUR_APP_NAME" + APP_NAME: str = NAME + APP_ENV: str = "alpha" + + JWT_SECRET_KEY: str = "" + ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600 + REFRESH_TOKEN_EXPIRE_DAYS: int = 1 + + METRICS_ENABLED: bool = False + PROBES_ENABLED: bool = True + + APP_MONGODB_URI: str = "mongodb://localhost:27017" + APP_MONGODB_NAME: str = "testdb" + + LOG_BASE_PATH: str = "./log" + BACKEND_LOG_FILE_NAME: str = APP_NAME + APPLICATION_ACTIVITY_LOG: str = APP_NAME + "-application-activity" + + + class Config: + env_file = ".myapp.env" + env_file_encoding = "utf-8" + + +app_settings = AppSettings() diff --git a/apps/helloworld/app/common/config/log_settings.py b/apps/helloworld/app/common/config/log_settings.py new file mode 100644 index 0000000..2f6985c --- /dev/null +++ b/apps/helloworld/app/common/config/log_settings.py @@ -0,0 +1,16 @@ +import os +from dataclasses import dataclass +from .app_settings import app_settings + +@dataclass +class LogSettings: + LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH + LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days") + LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight + MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5)) + LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB + APP_NAME: str = app_settings.APP_NAME + ENVIRONMENT: str = app_settings.APP_ENV + + +log_settings = LogSettings() diff --git a/apps/helloworld/app/common/config/site_settings.py b/apps/helloworld/app/common/config/site_settings.py new file mode 100644 index 0000000..76e5af1 --- /dev/null +++ b/apps/helloworld/app/common/config/site_settings.py @@ -0,0 +1,27 @@ +import os + +from pydantic_settings import BaseSettings + + +# NOTE: The values fall backs to your environment variables when not set here +class SiteSettings(BaseSettings): + NAME: str = "appname" + DEBUG: bool = True + + ENV: str = "dev" + + SERVER_HOST: str = "localhost" + SERVER_PORT: int = 8000 + + URL: str = "http://localhost" + TIME_ZONE: str = "UTC" + + BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__)))) + + class Config: + env_file = ".devbase-webapi.env" + env_file_encoding = "utf-8" + + +site_settings = SiteSettings() + diff --git a/apps/helloworld/app/common/daos/__init__.py b/apps/helloworld/app/common/daos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/common/daos/hello_world/__init__.py b/apps/helloworld/app/common/daos/hello_world/__init__.py new file mode 100644 index 0000000..463a9cb --- /dev/null +++ b/apps/helloworld/app/common/daos/hello_world/__init__.py @@ -0,0 +1,6 @@ +from app.common.daos.hello_world.hello_world_dao import HelloWorldDao + +hello_world_dao = HelloWorldDao() + +def get_hello_world_dao() -> HelloWorldDao: + return hello_world_dao diff --git a/apps/helloworld/app/common/daos/hello_world/hello_world_dao.py b/apps/helloworld/app/common/daos/hello_world/hello_world_dao.py new file mode 100644 index 0000000..3b3a112 --- /dev/null +++ b/apps/helloworld/app/common/daos/hello_world/hello_world_dao.py @@ -0,0 +1,30 @@ +from app.common.models.hello_world.hello_world import HelloWorld + +class HelloWorldDao: + def __init__(self): + pass + + async def create_hello_world(self, message: str, count: int): + hello_world = HelloWorld(message=message, count=count) + await hello_world.insert() + return hello_world + + async def get_hello_world(self, id: str): + hello_world = await HelloWorld.get(id) + return hello_world + + async def update_hello_world(self, id: str, message: str, count: int): + hello_world = await HelloWorld.get(id) + if hello_world: + hello_world.message = message + hello_world.count = count + await hello_world.save() + return hello_world + return None + + async def delete_hello_world(self, id: str): + hello_world = await HelloWorld.get(id) + if hello_world: + await hello_world.delete() + return True + return False \ No newline at end of file diff --git a/apps/helloworld/app/common/log/__init__.py b/apps/helloworld/app/common/log/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/common/log/application_logger.py b/apps/helloworld/app/common/log/application_logger.py new file mode 100644 index 0000000..896c044 --- /dev/null +++ b/apps/helloworld/app/common/log/application_logger.py @@ -0,0 +1,12 @@ +from .base_logger import LoggerBase +from app.common.config.app_settings import app_settings + +class ApplicationLogger(LoggerBase): + def __init__(self, application_activities: dict[str, any] = {}) -> None: + extra_fileds = {} + if application_activities: + extra_fileds.update(application_activities) + super().__init__( + logger_name=app_settings.APPLICATION_ACTIVITY_LOG, + extra_fileds=extra_fileds, + ) diff --git a/apps/helloworld/app/common/log/base_logger.py b/apps/helloworld/app/common/log/base_logger.py new file mode 100644 index 0000000..a370296 --- /dev/null +++ b/apps/helloworld/app/common/log/base_logger.py @@ -0,0 +1,136 @@ +from loguru import logger as guru_logger +from app.common.config.log_settings import log_settings +from typing import Dict, Any, Optional +import socket +import json +import threading +import os +import sys +import inspect +import logging + +from app.common.log.json_sink import JsonSink + +class LoggerBase: + binded_loggers = {} + logger_lock = threading.Lock() + + def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None: + self.__logger_name = logger_name + self.extra_fileds = extra_fileds + with LoggerBase.logger_lock: + if self.__logger_name in LoggerBase.binded_loggers: + self.logger = LoggerBase.binded_loggers[self.__logger_name] + return + + log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" + log_level = "INFO" + rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024) + + guru_logger.remove() + + file_sink = JsonSink( + log_file_path=log_filename, + rotation_size_bytes=rotation_bytes, + max_backup_files=log_settings.MAX_BACKUP_FILES + ) + guru_logger.add( + sink=file_sink, + level=log_level, + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + guru_logger.add( + sink=sys.stderr, + level=log_level, + format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}", + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + self.logger = guru_logger.bind( + topic=self.__logger_name, + host_ip=host_ip, + host_name=host_name, + app=log_settings.APP_NAME, + env=log_settings.ENVIRONMENT, + ) + with LoggerBase.logger_lock: + LoggerBase.binded_loggers[self.__logger_name] = self.logger + + def _get_log_context(self) -> dict: + frame = inspect.currentframe().f_back.f_back + filename = os.path.basename(frame.f_code.co_filename) + lineno = frame.f_lineno + return {"log_file": filename, "log_line": lineno} + + def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]: + props = {} if properties is None else properties.copy() + props_str = json.dumps(props, ensure_ascii=False) if props else "{}" + return props, props_str + + async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context) + local_logger.info(text) + + async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context) + local_logger.exception(text) + + async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context) + local_logger.info(text) + + async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context) + local_logger.warning(text) + + async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context) + local_logger.error(text) + + @staticmethod + def configure_uvicorn_logging(): + print("πŸ“’ Setting up uvicorn logging interception...") + + # Intercept logs from these loggers + intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] + + class InterceptHandler(logging.Handler): + def emit(self, record): + level = ( + guru_logger.level(record.levelname).name + if guru_logger.level(record.levelname, None) + else record.levelno + ) + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + guru_logger.opt(depth=depth, exception=record.exc_info).log( + level, + f"[{record.name}] {record.getMessage()}", + ) + + # Replace default handlers + logging.root.handlers.clear() + logging.root.setLevel(logging.INFO) + logging.root.handlers = [InterceptHandler()] + + # Configure specific uvicorn loggers + for logger_name in intercept_loggers: + logging_logger = logging.getLogger(logger_name) + logging_logger.handlers.clear() # Remove default handlers + logging_logger.propagate = True # Ensure propagation through Loguru diff --git a/apps/helloworld/app/common/log/json_sink.py b/apps/helloworld/app/common/log/json_sink.py new file mode 100644 index 0000000..a798156 --- /dev/null +++ b/apps/helloworld/app/common/log/json_sink.py @@ -0,0 +1,85 @@ +import json +import datetime +import traceback +from pathlib import Path +from typing import Optional + +class JsonSink: + def __init__( + self, + log_file_path: str, + rotation_size_bytes: int = 10 * 1024 * 1024, + max_backup_files: int = 5, + ): + self.log_file_path = Path(log_file_path) + self.rotation_size = rotation_size_bytes + self.max_backup_files = max_backup_files + self._open_log_file() + + def _open_log_file(self): + # ensure the parent directory exists + parent_dir = self.log_file_path.parent + if not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + self.log_file = self.log_file_path.open("a", encoding="utf-8") + + def _should_rotate(self) -> bool: + return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size + + def _rotate(self): + self.log_file.close() + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}") + self.log_file_path.rename(rotated_path) + self._cleanup_old_backups() + self._open_log_file() + + def _cleanup_old_backups(self): + parent = self.log_file_path.parent + stem = self.log_file_path.stem + suffix = self.log_file_path.suffix + + backup_files = sorted( + parent.glob(f"{stem}_*{suffix}"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + + for old_file in backup_files[self.max_backup_files:]: + try: + old_file.unlink() + except Exception as e: + print(f"Failed to delete old backup {old_file}: {e}") + + def __call__(self, message): + record = message.record + if self._should_rotate(): + self._rotate() + + log_entry = { + "level": record["level"].name.lower(), + "timestamp": int(record["time"].timestamp() * 1000), + "text": record["message"], + "fields": record["extra"].get("properties", {}), + "context": { + "app": record["extra"].get("app"), + "env": record["extra"].get("env"), + "log_file": record["extra"].get("log_file"), + "log_line": record["extra"].get("log_line"), + "topic": record["extra"].get("topic"), + "sender_id": record["extra"].get("sender_id"), + "receiver_id": record["extra"].get("receiver_id"), + "subject": record["extra"].get("subject"), + "event": record["extra"].get("event"), + "host_ip": record["extra"].get("host_ip"), + "host_name": record["extra"].get("host_name"), + }, + "stacktrace": None + } + + if record["exception"]: + exc_type, exc_value, exc_tb = record["exception"] + log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) + + self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + self.log_file.flush() diff --git a/apps/helloworld/app/common/models/__init__.py b/apps/helloworld/app/common/models/__init__.py new file mode 100644 index 0000000..2535cbb --- /dev/null +++ b/apps/helloworld/app/common/models/__init__.py @@ -0,0 +1,4 @@ +from app.common.models.hello_world.hello_world import HelloWorld + +# list of beanie document models +db_models = [HelloWorld] \ No newline at end of file diff --git a/apps/helloworld/app/common/models/hello_world/__init__.py b/apps/helloworld/app/common/models/hello_world/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/common/models/hello_world/hello_world.py b/apps/helloworld/app/common/models/hello_world/hello_world.py new file mode 100644 index 0000000..55000c7 --- /dev/null +++ b/apps/helloworld/app/common/models/hello_world/hello_world.py @@ -0,0 +1,17 @@ +from datetime import datetime + +from beanie import Document + + +class HelloWorld(Document): + message: str + count: int = 0 + created_time: datetime = datetime.now() + + class Settings: + name = "hello_world" + indexes = [ + [("message", 1), ("count", 1)] + ] + + diff --git a/apps/helloworld/app/common/probes/__init__.py b/apps/helloworld/app/common/probes/__init__.py new file mode 100644 index 0000000..4071df8 --- /dev/null +++ b/apps/helloworld/app/common/probes/__init__.py @@ -0,0 +1,140 @@ +import logging +from enum import Enum +from typing import Optional, Callable, Tuple, Dict +import inspect +from datetime import datetime, timezone + +# ProbeType is an Enum that defines the types of probes that can be registered. +class ProbeType(Enum): + LIVENESS = "liveness" + READINESS = "readiness" + STARTUP = "startup" + +# ProbeResult is a class that represents the result of a probe check. +class ProbeResult: + def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): + self.success = success + self.message = message + self.data = data or {} + + def to_dict(self) -> dict: + return { + "success": self.success, + "message": self.message, + "data": self.data + } + +# Probe is a class that represents a probe that can be registered. +class Probe: + def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): + self.type = type + self.path = path + self.check_fn = check_fn + self.name = name or f"{type.value}-{id(self)}" + + async def execute(self) -> ProbeResult: + try: + result = self.check_fn() + if inspect.isawaitable(result): + result = await result + + if isinstance(result, ProbeResult): + return result + elif isinstance(result, bool): + return ProbeResult(result, "ok" if result else "failed") + else: + return ProbeResult(True, "ok") + except Exception as e: + return ProbeResult(False, str(e)) + +# ProbeGroup is a class that represents a group of probes that can be checked together. +class ProbeGroup: + def __init__(self, path: str): + self.path = path + self.probes: Dict[str, Probe] = {} + + def add_probe(self, probe: Probe): + self.probes[probe.name] = probe + + async def check_all(self) -> Tuple[bool, dict]: + results = {} + all_success = True + + for name, probe in self.probes.items(): + result = await probe.execute() + results[name] = result.to_dict() + if not result.success: + all_success = False + + return all_success, results + +# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. +class FrameworkAdapter: + async def handle_request(self, group: ProbeGroup): + all_success, results = await group.check_all() + status_code = 200 if all_success else 503 + return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code + + def register_route(self, path: str, handler: Callable): + raise NotImplementedError + +# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. +class ProbeManager: + _default_paths = { + ProbeType.LIVENESS: "/_/livez", + ProbeType.READINESS: "/_/readyz", + ProbeType.STARTUP: "/_/healthz" + } + + def __init__(self): + self.groups: Dict[str, ProbeGroup] = {} + self.adapters: Dict[str, FrameworkAdapter] = {} + self._startup_complete = False + + def register_adapter(self, framework: str, adapter: FrameworkAdapter): + self.adapters[framework] = adapter + logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") + + def register( + self, + type: ProbeType, + check_func: Optional[Callable] = None, + path: Optional[str] = None, + prefix: str = "", + name: Optional[str] = None, + frameworks: Optional[list] = None + ): + path = path or self._default_paths.get(type, "/_/healthz") + if prefix: + path = f"{prefix}{path}" + + if type == ProbeType.STARTUP and check_func is None: + check_func = self._default_startup_check + + probe = Probe(type, path, check_func or (lambda: True), name) + + if path not in self.groups: + self.groups[path] = ProbeGroup(path) + self.groups[path].add_probe(probe) + + for framework in (frameworks or ["default"]): + self._register_route(framework, path) + logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") + + def _register_route(self, framework: str, path: str): + if framework not in self.adapters: + return + + adapter = self.adapters[framework] + group = self.groups[path] + + async def handler(): + return await adapter.handle_request(group) + + adapter.register_route(path, handler) + + def _default_startup_check(self) -> bool: + return self._startup_complete + + def mark_startup_complete(self): + self._startup_complete = True \ No newline at end of file diff --git a/apps/helloworld/app/common/probes/adapters.py b/apps/helloworld/app/common/probes/adapters.py new file mode 100644 index 0000000..2ecd38a --- /dev/null +++ b/apps/helloworld/app/common/probes/adapters.py @@ -0,0 +1,15 @@ +from . import FrameworkAdapter +from fastapi.responses import JSONResponse +from typing import Callable + +# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI. +class FastAPIAdapter(FrameworkAdapter): + def __init__(self, app): + self.app = app + + def register_route(self,path: str, handler: Callable): + async def wrapper(): + data, status_code = await handler() + return JSONResponse(content=data, status_code=status_code) + + self.app.add_api_route(path, wrapper, methods=["GET"]) diff --git a/apps/helloworld/app/envs/alpha.yml b/apps/helloworld/app/envs/alpha.yml new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/envs/prod.yml b/apps/helloworld/app/envs/prod.yml new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/main.py b/apps/helloworld/app/main.py new file mode 100644 index 0000000..559d7ed --- /dev/null +++ b/apps/helloworld/app/main.py @@ -0,0 +1,16 @@ +from fastapi.responses import RedirectResponse +from app.common.config.site_settings import site_settings +from app.bootstrap.application import create_app + +app = create_app() + +@app.get("/", status_code=301) +async def root(): + """ + TODO: redirect client to /doc# + """ + return RedirectResponse("docs") + +if __name__ == "__main__": + import uvicorn + uvicorn.run("main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT, reload=True) \ No newline at end of file diff --git a/apps/helloworld/app/providers/__init__.py b/apps/helloworld/app/providers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/app/providers/common.py b/apps/helloworld/app/providers/common.py new file mode 100644 index 0000000..64a9a44 --- /dev/null +++ b/apps/helloworld/app/providers/common.py @@ -0,0 +1,31 @@ +from fastapi.middleware.cors import CORSMiddleware +from app.common.config.site_settings import site_settings + + +def register(app): + app.debug = site_settings.DEBUG + app.title = site_settings.NAME + + add_global_middleware(app) + + # This hook ensures that a connection is opened to handle any queries + # generated by the request. + @app.on_event("startup") + def startup(): + pass + + # This hook ensures that the connection is closed when we've finished + # processing the request. + @app.on_event("shutdown") + def shutdown(): + pass + + +def add_global_middleware(app): + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) diff --git a/apps/helloworld/app/providers/database.py b/apps/helloworld/app/providers/database.py new file mode 100644 index 0000000..8716b8e --- /dev/null +++ b/apps/helloworld/app/providers/database.py @@ -0,0 +1,34 @@ +import asyncio +from app.common.config.app_settings import app_settings +from beanie import init_beanie +from motor.motor_asyncio import AsyncIOMotorClient +from app.common.models import db_models +from app.common.probes import ProbeResult + +client = AsyncIOMotorClient( + app_settings.APP_MONGODB_URI, + serverSelectionTimeoutMS=60000, + minPoolSize=5, # Minimum number of connections in the pool + maxPoolSize=20, # Maximum number of connections in the pool +) + +def register(app): + app.debug = "auth_mongo_debug" + app.title = "auth_mongo_name" + + @app.on_event("startup") + async def start_database(): + await initiate_database() + +async def check_database_initialized() -> ProbeResult: + try: + await asyncio.wait_for(client.server_info(), timeout=5) + return ProbeResult(success=True, message="service has been initialized and ready to serve") + except Exception: + return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"}) + + +async def initiate_database(): + await init_beanie( + database=client[app_settings.APP_MONGODB_NAME], document_models=db_models + ) diff --git a/apps/helloworld/app/providers/exception_handler.py b/apps/helloworld/app/providers/exception_handler.py new file mode 100644 index 0000000..21117a5 --- /dev/null +++ b/apps/helloworld/app/providers/exception_handler.py @@ -0,0 +1,39 @@ +from fastapi import FastAPI, HTTPException +from fastapi.exceptions import RequestValidationError +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.status import ( + HTTP_400_BAD_REQUEST, + HTTP_401_UNAUTHORIZED, + HTTP_403_FORBIDDEN, + HTTP_404_NOT_FOUND, + HTTP_422_UNPROCESSABLE_ENTITY, + HTTP_500_INTERNAL_SERVER_ERROR, +) + + +async def custom_http_exception_handler(request: Request, exc: HTTPException): + return JSONResponse( + status_code=exc.status_code, + content={"error": exc.detail}, + ) + + + +async def validation_exception_handler(request: Request, exc: RequestValidationError): + return JSONResponse( + status_code=HTTP_400_BAD_REQUEST, + content={"error": str(exc)}, + ) + +async def exception_handler(request: Request, exc: Exception): + return JSONResponse( + status_code=HTTP_500_INTERNAL_SERVER_ERROR, + content={"error": str(exc)}, + ) + + +def register(app: FastAPI): + app.add_exception_handler(HTTPException, custom_http_exception_handler) + app.add_exception_handler(RequestValidationError, validation_exception_handler) + app.add_exception_handler(Exception, exception_handler) diff --git a/apps/helloworld/app/providers/logger.py b/apps/helloworld/app/providers/logger.py new file mode 100644 index 0000000..2785603 --- /dev/null +++ b/apps/helloworld/app/providers/logger.py @@ -0,0 +1,7 @@ +from app.common.log.base_logger import LoggerBase + + +def register_logger(): + print("πŸ“’ Setting up logging interception...") + LoggerBase.configure_uvicorn_logging() + print("βœ… Logging interception complete. Logs are formatted and deduplicated!") diff --git a/apps/helloworld/app/providers/metrics.py b/apps/helloworld/app/providers/metrics.py new file mode 100644 index 0000000..1ae941a --- /dev/null +++ b/apps/helloworld/app/providers/metrics.py @@ -0,0 +1,16 @@ +import logging +from prometheus_fastapi_instrumentator import Instrumentator +from app.common.config.app_settings import app_settings + +def register(app): + instrumentator = ( + Instrumentator().instrument( + app, + metric_namespace="freeleaps", + metric_subsystem=app_settings.APP_NAME) + ) + + @app.on_event("startup") + async def startup(): + instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True) + logging.info("Metrics endpoint exposed at /api/_/metrics") \ No newline at end of file diff --git a/apps/helloworld/app/providers/probes.py b/apps/helloworld/app/providers/probes.py new file mode 100644 index 0000000..883e3d6 --- /dev/null +++ b/apps/helloworld/app/providers/probes.py @@ -0,0 +1,25 @@ +from app.common.probes import ProbeManager, ProbeType +from app.common.probes.adapters import FastAPIAdapter +from .database import check_database_initialized + +def register(app): + probes_manager = ProbeManager() + probes_manager.register_adapter("fastapi", FastAPIAdapter(app)) + + async def readiness_checker(): + return await check_database_initialized() + + probes_manager.register( + name="readiness", + prefix="/api", + type=ProbeType.READINESS, + check_func=readiness_checker, + frameworks=["fastapi"] + ) + + probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"]) + probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"]) + + @app.on_event("startup") + async def mark_startup_complete(): + probes_manager.mark_startup_complete() \ No newline at end of file diff --git a/apps/helloworld/app/providers/router.py b/apps/helloworld/app/providers/router.py new file mode 100644 index 0000000..b273eb8 --- /dev/null +++ b/apps/helloworld/app/providers/router.py @@ -0,0 +1,34 @@ +from app.routes import api_router + +from starlette import routing + + +def register(app): + app.include_router( + api_router, + prefix="/api", + tags=["api"], + dependencies=[], + responses={404: {"description": "no page found"}}, + ) + + if app.debug: + for route in app.routes: + if not isinstance(route, routing.WebSocketRoute): + print( + { + "path": route.path, + "endpoint": route.endpoint, + "name": route.name, + "methods": route.methods, + } + ) + else: + print( + { + "path": route.path, + "endpoint": route.endpoint, + "name": route.name, + "type": "web socket route", + } + ) diff --git a/apps/helloworld/app/providers/scheduler.py b/apps/helloworld/app/providers/scheduler.py new file mode 100644 index 0000000..7ea8d6c --- /dev/null +++ b/apps/helloworld/app/providers/scheduler.py @@ -0,0 +1,8 @@ +import asyncio + + +def register(app): + @app.on_event("startup") + async def start_scheduler(): + #create your scheduler here + pass diff --git a/apps/helloworld/app/routes/__init__.py b/apps/helloworld/app/routes/__init__.py new file mode 100644 index 0000000..9644f27 --- /dev/null +++ b/apps/helloworld/app/routes/__init__.py @@ -0,0 +1,8 @@ +from fastapi import APIRouter +from app.routes.hello_world import router as hello_world_router + + +api_router = APIRouter() + +# TODO: add custom routers here +api_router.include_router(hello_world_router, tags=["hello_world"]) diff --git a/apps/helloworld/app/routes/hello_world/__init__.py b/apps/helloworld/app/routes/hello_world/__init__.py new file mode 100644 index 0000000..70ca8e2 --- /dev/null +++ b/apps/helloworld/app/routes/hello_world/__init__.py @@ -0,0 +1,7 @@ +from fastapi import APIRouter +from .apis import router as hello_world_api + + +router = APIRouter(prefix="/hello_world") + +router.include_router(hello_world_api, tags=["hello_world"]) diff --git a/apps/helloworld/app/routes/hello_world/apis.py b/apps/helloworld/app/routes/hello_world/apis.py new file mode 100644 index 0000000..9f4902a --- /dev/null +++ b/apps/helloworld/app/routes/hello_world/apis.py @@ -0,0 +1,25 @@ +from fastapi import APIRouter, Depends +from loguru import logger + +from app.common.daos.hello_world import get_hello_world_dao, HelloWorldDao + +router = APIRouter() + +@router.get("/") +async def hello_world(): + logger.info("Hello, World! endpoint was called") + return {"message": "Hello, World!"} + + +@router.post("/insert") +async def insert_hello_world(msg: str, dao: HelloWorldDao = Depends(get_hello_world_dao)): + """ + Insert a HelloWorld document into the database. + """ + hello_world = await dao.create_hello_world(msg, 1) + return hello_world + + + + + diff --git a/apps/helloworld/app/scripts/mongodb/docker-compose.yml b/apps/helloworld/app/scripts/mongodb/docker-compose.yml new file mode 100644 index 0000000..8ab07c7 --- /dev/null +++ b/apps/helloworld/app/scripts/mongodb/docker-compose.yml @@ -0,0 +1,18 @@ +version: '3.8' + +services: + mongodb: + image: mongo:6.0 # You can change to the desired version + container_name: mongodb + restart: unless-stopped + ports: + - "27017:27017" + environment: + MONGO_INITDB_DATABASE: testdb # <-- This creates the initial database + volumes: + - mongodb_data:/data/db + command: ["mongod", "--noauth"] # <-- Disable authentication + + +volumes: + mongodb_data: \ No newline at end of file diff --git a/apps/helloworld/requirements.txt b/apps/helloworld/requirements.txt new file mode 100644 index 0000000..056543d --- /dev/null +++ b/apps/helloworld/requirements.txt @@ -0,0 +1,10 @@ +beanie==1.29.0 +fastapi==0.115.12 +loguru==0.7.3 +motor==3.7.0 +prometheus_fastapi_instrumentator==7.1.0 +pydantic_settings==2.9.1 +pytest==7.1.2 +starlette==0.46.2 +uvicorn==0.34.2 +httpx==0.24.0 \ No newline at end of file diff --git a/apps/helloworld/tests/__init__.py b/apps/helloworld/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/tests/routes/__init__.py b/apps/helloworld/tests/routes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/helloworld/tests/routes/test_hello_world.py b/apps/helloworld/tests/routes/test_hello_world.py new file mode 100644 index 0000000..638a4b2 --- /dev/null +++ b/apps/helloworld/tests/routes/test_hello_world.py @@ -0,0 +1,27 @@ +from unittest.mock import AsyncMock, patch +from fastapi.testclient import TestClient +from app.main import app +from app.routes.hello_world.apis import get_hello_world_dao + + +def test_hello_world(): + with TestClient(app) as client: + response = client.get("/api/hello_world/") + assert response.status_code == 200 + assert response.json() == {"message": "Hello, World!"} + + +# mock out initiate_database so it doesn’t run during tests +@patch("app.providers.database.initiate_database", new_callable=AsyncMock) +def test_insert_hello_world(mock_db_init): + + class MockHelloWorldDao: + async def create_hello_world(self, msg: str, user_id: int): + return {"message": msg, "user_id": user_id} + + app.dependency_overrides[get_hello_world_dao] = lambda: MockHelloWorldDao() + with TestClient(app) as client: + response = client.post("/api/hello_world/insert", params={"msg": "Test Message"}) + assert response.status_code == 200 + assert response.json() == {"message": "Test Message", "user_id": 1} + app.dependency_overrides.clear() diff --git a/apps/helloworld/tests/test_main.http b/apps/helloworld/tests/test_main.http new file mode 100644 index 0000000..b847198 --- /dev/null +++ b/apps/helloworld/tests/test_main.http @@ -0,0 +1,8 @@ +# Test your FastAPI endpoints + +GET http://localhost:8000/api/hello_world/ +Accept: application/json + +### +POST http://localhost:8000/api/hello_world/insert?msg=Hello%20World +Accept: application/json diff --git a/apps/payment/backend/application/payment_hub.py b/apps/payment/backend/application/payment_hub.py index 07cce96..8f18ba0 100644 --- a/apps/payment/backend/application/payment_hub.py +++ b/apps/payment/backend/application/payment_hub.py @@ -9,9 +9,6 @@ class PaymentHub: self.stripe_manager = StripeManager() return - async def fetch_wechat_qr_code(self, project_id: str) -> Optional[Dict[str, any]]: - return await self.payment_manager.fetch_wechat_qr_code(project_id) - async def fetch_stripe_account_id(self, user_id: str) -> Optional[str]: return await self.payment_manager.fetch_stripe_account_id(user_id) diff --git a/apps/payment/backend/business/payment_manager.py b/apps/payment/backend/business/payment_manager.py index 6a5d6d3..bb2b955 100644 --- a/apps/payment/backend/business/payment_manager.py +++ b/apps/payment/backend/business/payment_manager.py @@ -9,18 +9,6 @@ class PaymentManager: def __init__(self) -> None: self.module_logger = ModuleLogger(sender_id=PaymentManager) - async def fetch_wechat_qr_code(self, project_id: str) -> Optional[Dict[str, any]]: - project = await ProjectDoc.get(project_id) - proposer = project.proposer_id - income_profile = await IncomeProfileDoc.find_one( - IncomeProfileDoc.user_id == proposer - ) - if income_profile: - return income_profile.bank_account.money_collecting_methods[ - 0 - ].wechat_qr_code - return None - async def fetch_stripe_account_id(self, user_id: str) -> Optional[str]: income_profile = await IncomeProfileDoc.find_one(IncomeProfileDoc.user_id == user_id) if income_profile: @@ -44,7 +32,7 @@ class PaymentManager: } }} ) - + if not payment_profile: await self.module_logger.log_warning( warning="No payment profile found for Stripe account", @@ -54,7 +42,7 @@ class PaymentManager: } ) return False - + # Update the stripe method status updated = False # Need to check if money_collecting_methods exists and is not empty @@ -66,7 +54,7 @@ class PaymentManager: method.last_update_time = int(datetime.now().timestamp()) updated = True break # Exit loop once found and updated - + if updated: await payment_profile.save() await self.module_logger.log_info( @@ -79,7 +67,7 @@ class PaymentManager: } ) return True - + # Log warning with more context await self.module_logger.log_warning( warning="Stripe account not found in payment methods", @@ -91,7 +79,7 @@ class PaymentManager: } ) return False - + except Exception as e: await self.module_logger.log_exception( exception=e, diff --git a/apps/payment/backend/business/stripe_manager.py b/apps/payment/backend/business/stripe_manager.py index 656f884..4503370 100644 --- a/apps/payment/backend/business/stripe_manager.py +++ b/apps/payment/backend/business/stripe_manager.py @@ -8,6 +8,7 @@ from stripe.error import SignatureVerificationError from common.log.module_logger import ModuleLogger from decimal import Decimal import json +import httpx stripe.api_key = app_settings.STRIPE_API_KEY @@ -39,7 +40,7 @@ class StripeManager: redirect_url="{}/work".format(self.site_url_root) ) return login_link.url - + # Otherwise show onboarding account_link = stripe.AccountLink.create( account=account_id, @@ -90,6 +91,10 @@ class StripeManager: properties={"session_id": session_id}, ) elif len(transactions) == 0: + await self.module_logger.log_error( + error="No transaction found for session_id: {}".format(session_id), + properties={"session_id": session_id}, + ) return None return transactions[0] @@ -203,19 +208,25 @@ class StripeManager: transaction.stripe_price_id = price.id await transaction.save() - payment_link = stripe.PaymentLink.create( - line_items=[ + # Prepare payment link parameters with conditional application_fee_amount + payment_link_params = { + "line_items": [ { "price": transaction.stripe_price_id, "quantity": 1, } ], - application_fee_amount=transaction.application_fee_amount, - on_behalf_of=transaction.to_stripe_account_id, - transfer_data={ + "on_behalf_of": transaction.to_stripe_account_id, + "transfer_data": { "destination": transaction.to_stripe_account_id, }, - ) + } + + # Only add application_fee_amount if it's greater than 0 + if transaction.application_fee_amount and transaction.application_fee_amount > 0: + payment_link_params["application_fee_amount"] = transaction.application_fee_amount + + payment_link = stripe.PaymentLink.create(**payment_link_params) if payment_link: transaction.stripe_payment_link = payment_link.url @@ -276,27 +287,37 @@ class StripeManager: transaction.stripe_price_id = price.id await transaction.save() - session = stripe.checkout.Session.create( - payment_method_types=["card"], - line_items=[ + # Prepare payment_intent_data with conditional application_fee_amount + payment_intent_data = { + "on_behalf_of": transaction.to_stripe_account_id, + "transfer_data": { + "destination": transaction.to_stripe_account_id, + }, + } + + # Only add application_fee_amount if it's greater than 0 + if transaction.application_fee_amount and transaction.application_fee_amount > 0: + payment_intent_data["application_fee_amount"] = transaction.application_fee_amount + + + + session_params = { + "payment_method_types": ["card"], + "line_items": [ { "price": transaction.stripe_price_id, "quantity": 1, } ], - payment_intent_data={ - "on_behalf_of": transaction.to_stripe_account_id, - "application_fee_amount": transaction.application_fee_amount, - "transfer_data": { - "destination": transaction.to_stripe_account_id, - }, - }, - mode="payment", - success_url="{}/projects".format( - self.site_url_root - ), # needs to be set, local: http://localhost/ - cancel_url="{}/projects".format(self.site_url_root), - ) + "payment_intent_data": payment_intent_data, + "mode": "payment", + "success_url": "{}/projects".format(self.site_url_root), + "cancel_url": "{}/projects".format(self.site_url_root), + } + + + + session = stripe.checkout.Session.create(**session_params) if session: transaction.stripe_checkout_session_id = session.id @@ -332,21 +353,211 @@ class StripeManager: async def invoke_checkout_session_webhook( self, event: dict ) -> Tuple[bool, Optional[str], Optional[str]]: - # Handle the checkout.session.completed event + """ + Handle checkout.session.completed webhook events from Stripe. + Updates transaction status and saves payment method information for future use. + """ if event["type"] == "checkout.session.completed": session = event["data"]["object"] + + # Find and validate the transaction transaction = await self.__fetch_transaction_by_session_id(session["id"]) if not transaction: await self.module_logger.log_error( error="Transaction not found for session_id: {}".format(session["id"]), properties={"session_id": session["id"]}, ) - return False + return False, None, None - transaction.status = TransactionStatus.COMPLETED - transaction.updated_time = datetime.now(timezone.utc) - await transaction.save() + # Update transaction status to completed + await self.__update_transaction_status(transaction) + + # Process and save payment method information + await self.__process_payment_method(session, transaction) return True, transaction.project_id, transaction.milestone_index return False, None, None + + async def __update_transaction_status(self, transaction: StripeTransactionDoc) -> None: + """ + Update transaction status to completed and save to database. + """ + transaction.status = TransactionStatus.COMPLETED + transaction.updated_time = datetime.now(timezone.utc) + await transaction.save() + + async def __process_payment_method(self, session: dict, transaction: StripeTransactionDoc) -> None: + """ + Extract payment method details from Stripe session and save to database. + Creates or finds customer and attaches payment method for future use. + """ + try: + # Get payment method details from Stripe + payment_method_info = await self.__extract_payment_method_info(session) + if not payment_method_info: + return + + payment_method_id, card_details = payment_method_info + + # Get or create Stripe customer for the user + customer_id = await self.__get_or_create_customer(transaction.from_user) + if not customer_id: + return + + # Attach payment method to customer and save to database + await self.__attach_and_save_payment_method( + payment_method_id, card_details, customer_id, transaction.from_user + ) + + except Exception as payment_method_error: + await self.module_logger.log_error( + error=f"Error processing payment method: {payment_method_error}", + properties={"session_id": session["id"], "user_id": transaction.from_user} + ) + + async def __extract_payment_method_info(self, session: dict) -> Optional[Tuple[str, dict]]: + """ + Extract payment method ID and card details from Stripe session. + Returns tuple of (payment_method_id, card_details) or None if not found. + """ + try: + # Get the Stripe session to extract payment method details + stripe_session = stripe.checkout.Session.retrieve(session["id"]) + payment_intent_id = stripe_session.get('payment_intent') + + if not payment_intent_id: + return None + + payment_intent = stripe.PaymentIntent.retrieve(payment_intent_id) + payment_method_id = payment_intent.get('payment_method') + + if not payment_method_id: + return None + + payment_method = stripe.PaymentMethod.retrieve(payment_method_id) + card_details = payment_method.get('card', {}) + + return payment_method_id, card_details + + except Exception as e: + await self.module_logger.log_error( + error=f"Error extracting payment method info: {e}", + properties={"session_id": session["id"]} + ) + return None + + async def __get_or_create_customer(self, user_id: str) -> Optional[str]: + """ + Find existing Stripe customer by email or create new one. + Returns customer ID or None if creation fails. + """ + try: + # Generate email for user (fallback since we don't have access to user profile) + user_email = f"user_{user_id}@freeleaps.com" + + # Search for existing customers by email + customers = stripe.Customer.list(email=user_email, limit=1) + if customers.data: + return customers.data[0].id + + # Create new customer if not found + customer = stripe.Customer.create( + email=user_email, + metadata={"user_id": user_id} + ) + return customer.id + + except Exception as customer_error: + await self.module_logger.log_error( + error=f"Error getting/creating customer: {customer_error}", + properties={"user_id": user_id} + ) + return None + + async def __attach_and_save_payment_method( + self, payment_method_id: str, card_details: dict, customer_id: str, user_id: str + ) -> None: + """ + Attach payment method to Stripe customer and save details to database. + Handles various error scenarios gracefully. + """ + try: + # Check if payment method is already attached to a customer + payment_method_obj = stripe.PaymentMethod.retrieve(payment_method_id) + if payment_method_obj.customer: + # Use the existing customer ID + customer_id = payment_method_obj.customer + else: + # Try to attach payment method to customer in Stripe + stripe.PaymentMethod.attach( + payment_method_id, + customer=customer_id + ) + + # Save to database + await self.__save_payment_method_to_db( + payment_method_id, card_details, customer_id, user_id + ) + + except stripe.error.InvalidRequestError as attach_error: + # Handle specific Stripe attachment errors + await self.__handle_attachment_error( + attach_error, payment_method_id, card_details, customer_id, user_id + ) + except Exception as save_error: + await self.module_logger.log_error( + error=f"Error attaching payment method: {save_error}", + properties={"payment_method_id": payment_method_id, "user_id": user_id} + ) + + async def __save_payment_method_to_db( + self, payment_method_id: str, card_details: dict, customer_id: str, user_id: str + ) -> None: + """ + Save payment method details to database if it doesn't already exist. + """ + from backend.infra.payment.models import StripePaymentMethodDoc + + # Check if payment method already exists in our database + existing_payment_method = await StripePaymentMethodDoc.find_one( + StripePaymentMethodDoc.stripe_payment_method_id == payment_method_id + ) + + if existing_payment_method: + return # Already saved + + # Save to our database + payment_method_doc = StripePaymentMethodDoc( + user_id=user_id, + stripe_customer_id=customer_id, + stripe_payment_method_id=payment_method_id, + card_last4=card_details.get('last4'), + card_brand=card_details.get('brand'), + card_exp_month=card_details.get('exp_month'), + card_exp_year=card_details.get('exp_year'), + created_time=datetime.now(timezone.utc), + updated_time=datetime.now(timezone.utc), + ) + await payment_method_doc.save() + + async def __handle_attachment_error( + self, attach_error: stripe.error.InvalidRequestError, + payment_method_id: str, card_details: dict, customer_id: str, user_id: str + ) -> None: + """ + Handle specific Stripe attachment errors and still save to database when possible. + """ + error_message = str(attach_error).lower() + + if "already attached" in error_message or "may not be used again" in error_message: + # Payment method can't be attached but we can still save to database + await self.__save_payment_method_to_db( + payment_method_id, card_details, customer_id, user_id + ) + else: + # Log other attachment errors + await self.module_logger.log_error( + error=f"Error attaching payment method: {attach_error}", + properties={"payment_method_id": payment_method_id, "user_id": user_id} + ) diff --git a/apps/payment/backend/infra/payment/constants.py b/apps/payment/backend/infra/payment/constants.py index 25d4254..310f8ff 100644 --- a/apps/payment/backend/infra/payment/constants.py +++ b/apps/payment/backend/infra/payment/constants.py @@ -12,7 +12,6 @@ class MoneyCollectionType(IntEnum): UNSPECIFIED = 0 MARKED_AS_PAID = 1 UPLOAD_PROOF = 2 - WECHAT_QR_CODE = 3 STRIPE_CHECKOUT = 4 diff --git a/apps/payment/backend/infra/payment/models.py b/apps/payment/backend/infra/payment/models.py index 8d556dc..e6a2df4 100644 --- a/apps/payment/backend/infra/payment/models.py +++ b/apps/payment/backend/infra/payment/models.py @@ -24,3 +24,18 @@ class StripeTransactionDoc(Document): class Settings: name = "stripe_transaction" + + +class StripePaymentMethodDoc(Document): + user_id: str + stripe_customer_id: str + stripe_payment_method_id: str + card_last4: Optional[str] = None + card_brand: Optional[str] = None + card_exp_month: Optional[int] = None + card_exp_year: Optional[int] = None + created_time: datetime + updated_time: datetime + + class Settings: + name = "stripe_payment_method" diff --git a/apps/payment/backend/models/__init__.py b/apps/payment/backend/models/__init__.py index 077c1f5..b5a31b5 100644 --- a/apps/payment/backend/models/__init__.py +++ b/apps/payment/backend/models/__init__.py @@ -5,9 +5,9 @@ # TODO: Add all models to backend_models from backend.services.payment.models import IncomeProfileDoc, PaymentProfileDoc from backend.services.project.models import ProjectDoc -from backend.infra.payment.models import StripeTransactionDoc +from backend.infra.payment.models import StripeTransactionDoc, StripePaymentMethodDoc -backend_models = [IncomeProfileDoc, PaymentProfileDoc, ProjectDoc, StripeTransactionDoc] +backend_models = [IncomeProfileDoc, PaymentProfileDoc, ProjectDoc, StripeTransactionDoc, StripePaymentMethodDoc] # backend_models.extend(code_models) # backend_models.extend(user_models) # backend_models.extend(profile_models) diff --git a/apps/payment/backend/models/payment/models.py b/apps/payment/backend/models/payment/models.py index 387c59e..5ad97e0 100644 --- a/apps/payment/backend/models/payment/models.py +++ b/apps/payment/backend/models/payment/models.py @@ -2,6 +2,7 @@ from typing import List, Dict, Optional from decimal import Decimal from beanie import Document from pydantic import BaseModel +from datetime import datetime from backend.services.payment.constants import PaymentGateway from backend.infra.payment.constants import MoneyCollectionType, PaymentLocation @@ -23,7 +24,6 @@ class MoneyCollectingMethod(BaseModel): location: Optional[PaymentLocation] priority: int = 0 # less number has high priority to be used. stripe_account_id: Optional[str] - wechat_qr_code: Optional[str] last_update_time: Optional[int] = None @@ -61,3 +61,6 @@ class PaymentProfileDoc(Document): class Settings: name = "payment_profile" + + + diff --git a/apps/payment/backend/services/payment/constants.py b/apps/payment/backend/services/payment/constants.py index 202cba6..1b5d568 100644 --- a/apps/payment/backend/services/payment/constants.py +++ b/apps/payment/backend/services/payment/constants.py @@ -3,4 +3,3 @@ from enum import IntEnum class PaymentGateway(IntEnum): STRIP = 1 - WECHAT = 2 diff --git a/apps/payment/backend/services/payment/models.py b/apps/payment/backend/services/payment/models.py index 606668e..05d79bf 100644 --- a/apps/payment/backend/services/payment/models.py +++ b/apps/payment/backend/services/payment/models.py @@ -27,7 +27,6 @@ class MoneyCollectingMethod(BaseModel): location: Optional[PaymentLocation] priority: int = 0 # less number has high priority to be used. stripe_account_id: Optional[str] - wechat_qr_code: Optional[str] last_update_time: Optional[int] = None diff --git a/apps/payment/common/log/base_logger.py b/apps/payment/common/log/base_logger.py index 24f7bb0..2470ebe 100644 --- a/apps/payment/common/log/base_logger.py +++ b/apps/payment/common/log/base_logger.py @@ -30,7 +30,7 @@ class LoggerBase: guru_logger.remove() file_sink = JsonSink( - log_file_path=log_filename, + log_file_path=log_filename, rotation_size_bytes=rotation_bytes, max_backup_files=log_settings.MAX_BACKUP_FILES ) diff --git a/apps/payment/webapi/routes/payment/payment_manager_controller.py b/apps/payment/webapi/routes/payment/payment_manager_controller.py index e9500ce..eb21bd1 100644 --- a/apps/payment/webapi/routes/payment/payment_manager_controller.py +++ b/apps/payment/webapi/routes/payment/payment_manager_controller.py @@ -6,19 +6,6 @@ from fastapi.encoders import jsonable_encoder router = APIRouter() payment_hub = PaymentHub() -# Web API -# Fetch wechat qr code -@router.get( - "/fetch_wechat_qr_code/{project_id}", - operation_id="fetch_wechat_qr_code", - summary="Fetch wechat qr code", - description="Fetch wechat qr code", -) -async def fetch_wechat_qr_code( - project_id: str -): - return await payment_hub.fetch_wechat_qr_code(project_id) - # Web API # Fetch stripe account id @router.get( diff --git a/apps/payment/webapi/routes/payment/stripe_manager_controller.py b/apps/payment/webapi/routes/payment/stripe_manager_controller.py index ee95dea..c8c353a 100644 --- a/apps/payment/webapi/routes/payment/stripe_manager_controller.py +++ b/apps/payment/webapi/routes/payment/stripe_manager_controller.py @@ -204,8 +204,28 @@ async def handle_account_webhook( details_submitted=session["details_submitted"], payouts_enabled=session["payouts_enabled"], charges_enabled=session["charges_enabled"] - ) + ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) - + return JSONResponse(content={"status": "success"}) + + +# Web API +# Detach payment method +@router.delete( + "/detach_payment_method/{payment_method_id}", + operation_id="detach_payment_method", + summary="Detach payment method from customer", + description="Detach a payment method from a Stripe customer", +) +async def detach_payment_method(payment_method_id: str): + try: + # Detach the payment method from Stripe + stripe.PaymentMethod.detach(payment_method_id) + return JSONResponse(content={"success": True, "message": "Payment method detached successfully"}) + except Exception as e: + return JSONResponse( + status_code=400, + content={"success": False, "message": f"Failed to detach payment method: {str(e)}"} + )