Merge branch 'dev' into feature/icecheng/role_management
This commit is contained in:
commit
6132f98283
3
.gitignore
vendored
3
.gitignore
vendored
@ -7,4 +7,5 @@
|
||||
*.pyc
|
||||
freedev.code-workspace
|
||||
.idea/
|
||||
.pytest_cache/
|
||||
.pytest_cache/
|
||||
CLAUDE.md
|
||||
|
||||
31
apps/devops/Dockerfile
Normal file
31
apps/devops/Dockerfile
Normal file
@ -0,0 +1,31 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Install system dependencies
|
||||
RUN apt-get update && apt-get install -y \
|
||||
curl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Copy requirements and install Python dependencies
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Copy application code
|
||||
COPY . .
|
||||
|
||||
# Set environment variables
|
||||
ENV LOG_BASE_PATH=/app/log/devsvc
|
||||
|
||||
# Create necessary directories
|
||||
RUN mkdir -p /app/log/devops
|
||||
|
||||
# Expose port
|
||||
EXPOSE 8014
|
||||
|
||||
# Health check
|
||||
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
|
||||
CMD curl -f http://localhost:8014/api/_/healthz || exit 1
|
||||
|
||||
# Start the application
|
||||
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8014", "--reload"]
|
||||
20
apps/devops/README.md
Normal file
20
apps/devops/README.md
Normal file
@ -0,0 +1,20 @@
|
||||
This is a template backend service based on fastapi + mongodb app
|
||||
|
||||
To start development in local, go to the root directory of the project YOUR_WORKSPACE_PATH/helloworld/
|
||||
```bash
|
||||
docker compose -f app/scripts/mongodb/docker-compose.yml up -d
|
||||
```
|
||||
|
||||
Then run the app
|
||||
```bash
|
||||
uvicorn app.main:app --reload
|
||||
```
|
||||
|
||||
In case a new dependency is added, run the following command to update the requirements.txt file
|
||||
```bash
|
||||
# optional: if you have not installed pipreqs
|
||||
pip3 install pipreqs
|
||||
|
||||
# generate requirements.txt
|
||||
pipreqs . --force
|
||||
```
|
||||
0
apps/devops/app/__init__.py
Normal file
0
apps/devops/app/__init__.py
Normal file
1
apps/devops/app/backend/infra/rabbitmq/__init__.py
Normal file
1
apps/devops/app/backend/infra/rabbitmq/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
# RabbitMQ infrastructure for DevOps service
|
||||
66
apps/devops/app/backend/infra/rabbitmq/async_client.py
Normal file
66
apps/devops/app/backend/infra/rabbitmq/async_client.py
Normal file
@ -0,0 +1,66 @@
|
||||
from app.common.config.app_settings import app_settings
|
||||
from app.common.log.module_logger import ModuleLogger
|
||||
import asyncio
|
||||
from asyncio import AbstractEventLoop
|
||||
import aio_pika
|
||||
|
||||
|
||||
class AsyncMQClient:
|
||||
exchange_name_format = "freeleaps.devops.exchange.{}"
|
||||
exchange_type = "direct"
|
||||
|
||||
def __init__(self, channel_name: str) -> None:
|
||||
self.exchange_name_format = AsyncMQClient.exchange_name_format
|
||||
self.channel_name = channel_name
|
||||
self.exchange_type = AsyncMQClient.exchange_type
|
||||
self.exchange_name = self.exchange_name_format.format(self.channel_name)
|
||||
self.process_callable = None
|
||||
self.routing_key = self.channel_name
|
||||
self.module_logger = ModuleLogger(sender_id="AsyncMQClient")
|
||||
|
||||
async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None):
|
||||
retry_count = 0
|
||||
retry_interval = 1
|
||||
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
self.connection = await aio_pika.connect_robust(
|
||||
host=app_settings.RABBITMQ_HOST,
|
||||
port=int(app_settings.RABBITMQ_PORT),
|
||||
login=app_settings.RABBITMQ_USERNAME,
|
||||
password=app_settings.RABBITMQ_PASSWORD,
|
||||
virtualhost=app_settings.RABBITMQ_VIRTUAL_HOST,
|
||||
loop=event_loop,
|
||||
)
|
||||
self.channel = await self.connection.channel()
|
||||
self.exchange = await self.channel.declare_exchange(
|
||||
name=self.exchange_name, type="direct", auto_delete=False
|
||||
)
|
||||
|
||||
# Connect to existing named queue instead of creating anonymous queue
|
||||
# channel_name already contains the full queue name from environment variable
|
||||
self.queue = await self.channel.declare_queue(
|
||||
name=self.channel_name, exclusive=False, auto_delete=False, durable=True
|
||||
)
|
||||
await self.queue.bind(
|
||||
exchange=self.exchange, routing_key=self.routing_key
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
await self.module_logger.log_exception(
|
||||
exception=e,
|
||||
text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}",
|
||||
)
|
||||
await asyncio.sleep(retry_interval)
|
||||
retry_interval = min(retry_interval * 2, 60)
|
||||
retry_count += 1
|
||||
|
||||
if retry_count >= max_retries:
|
||||
raise ConnectionError(
|
||||
"Unable to connect to RabbitMQ after multiple retries."
|
||||
)
|
||||
|
||||
async def close(self):
|
||||
"""Unbind the queue and close the connection gracefully."""
|
||||
await self.queue.unbind(self.exchange, self.routing_key)
|
||||
await self.connection.close()
|
||||
84
apps/devops/app/backend/infra/rabbitmq/async_subscriber.py
Normal file
84
apps/devops/app/backend/infra/rabbitmq/async_subscriber.py
Normal file
@ -0,0 +1,84 @@
|
||||
from asyncio import AbstractEventLoop
|
||||
from app.common.log.module_logger import ModuleLogger
|
||||
import json
|
||||
import asyncio
|
||||
from .async_client import AsyncMQClient
|
||||
|
||||
|
||||
class AsyncMQSubscriber(AsyncMQClient):
|
||||
def __init__(self, channel_name: str) -> None:
|
||||
super().__init__(channel_name=channel_name)
|
||||
self.process_callable = None
|
||||
self.routing_key = self.channel_name
|
||||
self.consumer_callbacks = {}
|
||||
self.consumer_callbacks_lock = asyncio.Lock()
|
||||
self.module_logger = ModuleLogger(sender_id="AsyncMQSubscriber")
|
||||
|
||||
async def process_incoming_message(self, message):
|
||||
"""Processing incoming message from RabbitMQ"""
|
||||
await message.ack()
|
||||
body = message.body
|
||||
if body:
|
||||
async with self.consumer_callbacks_lock:
|
||||
for registry_key, callback_info in self.consumer_callbacks.items():
|
||||
try:
|
||||
await callback_info["method"](
|
||||
registry_key, json.loads(body), callback_info["args"]
|
||||
)
|
||||
except Exception as err:
|
||||
await self.module_logger.log_exception(
|
||||
exception=err,
|
||||
text=f"Error processing message for consumer '{registry_key}'",
|
||||
)
|
||||
|
||||
async def subscribe(self, max_retries=10, event_loop: AbstractEventLoop = None):
|
||||
"""Attempts to bind and consume messages, with retry mechanism."""
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
try:
|
||||
await self.bind(max_retries=5, event_loop=event_loop)
|
||||
await self.queue.consume(
|
||||
no_ack=False, exclusive=True, callback=self.process_incoming_message
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
await self.module_logger.log_exception(
|
||||
exception=e,
|
||||
text=f"Failed to subscribe at {retries} time, will retry",
|
||||
)
|
||||
retries += 1
|
||||
await asyncio.sleep(5)
|
||||
else:
|
||||
await self.module_logger.log_exception(
|
||||
exception=ConnectionError(
|
||||
f"Exceeded max retries ({max_retries}) for subscription."
|
||||
),
|
||||
text=f"Subscription failed for {self.channel_name} after {max_retries} attempts.",
|
||||
)
|
||||
|
||||
async def register_consumer(
|
||||
self,
|
||||
registry_key: str,
|
||||
callback_method,
|
||||
args: dict,
|
||||
):
|
||||
"""Register a consumer callback with a unique key."""
|
||||
async with self.consumer_callbacks_lock:
|
||||
self.consumer_callbacks[registry_key] = {
|
||||
"method": callback_method,
|
||||
"args": args,
|
||||
}
|
||||
|
||||
async def unregister_consumer(
|
||||
self,
|
||||
registry_key: str,
|
||||
):
|
||||
"""Unregister a consumer callback by its key."""
|
||||
async with self.consumer_callbacks_lock:
|
||||
if registry_key in self.consumer_callbacks:
|
||||
del self.consumer_callbacks[registry_key]
|
||||
|
||||
async def clear_all_consumers(self):
|
||||
"""Unregister all consumer callbacks."""
|
||||
async with self.consumer_callbacks_lock:
|
||||
self.consumer_callbacks.clear()
|
||||
@ -0,0 +1,100 @@
|
||||
from datetime import datetime
|
||||
from typing import Dict, Literal
|
||||
from app.common.log.module_logger import ModuleLogger
|
||||
from app.common.models.deployment.deployment import Deployment
|
||||
from app.common.models.deployment.heartbeat import DevOpsReconcileJobHeartbeatMessage
|
||||
|
||||
|
||||
class DeploymentStatusUpdateService:
|
||||
def __init__(self):
|
||||
self.module_logger = ModuleLogger(sender_id="DeploymentStatusUpdateService")
|
||||
|
||||
# Status mapping from heartbeat to deployment model
|
||||
self.status_mapping: Dict[str, Literal["started", "failed", "succeeded", "aborted"]] = {
|
||||
"running": "started",
|
||||
"success": "succeeded",
|
||||
"failed": "failed",
|
||||
"terminated": "aborted"
|
||||
}
|
||||
|
||||
# Phase to stage mapping for more detailed tracking
|
||||
self.phase_to_stage_mapping: Dict[str, str] = {
|
||||
"initializing": "initialization",
|
||||
"jenkins_build": "build",
|
||||
"building": "build",
|
||||
"deploying": "deployment",
|
||||
"finished": "completed"
|
||||
}
|
||||
|
||||
async def process_heartbeat_message(self, registry_key: str, message_data: dict, args: dict):
|
||||
"""Process incoming heartbeat message and update deployment status"""
|
||||
# registry_key and args are provided by the message queue framework but not used in this implementation
|
||||
_ = registry_key, args
|
||||
try:
|
||||
# Parse the message using our Pydantic model
|
||||
heartbeat_message = DevOpsReconcileJobHeartbeatMessage(**message_data)
|
||||
payload = heartbeat_message.payload
|
||||
|
||||
await self.module_logger.log_info(
|
||||
text=f"Processing heartbeat for deployment {payload.id}: {payload.status} - {payload.phase}",
|
||||
data={"deployment_id": payload.id, "status": payload.status, "phase": payload.phase}
|
||||
)
|
||||
|
||||
# Find the deployment by ID
|
||||
deployment = await Deployment.find_one(Deployment.deployment_id == payload.id)
|
||||
if not deployment:
|
||||
await self.module_logger.log_warning(
|
||||
text=f"Deployment not found: {payload.id}",
|
||||
data={"deployment_id": payload.id}
|
||||
)
|
||||
return
|
||||
|
||||
# Map heartbeat status to deployment status
|
||||
if payload.status in self.status_mapping:
|
||||
deployment.deployment_status = self.status_mapping[payload.status]
|
||||
else:
|
||||
await self.module_logger.log_warning(
|
||||
text=f"Unknown status received: {payload.status}",
|
||||
data={"deployment_id": payload.id, "status": payload.status}
|
||||
)
|
||||
return
|
||||
|
||||
# Map phase to deployment stage
|
||||
if payload.phase in self.phase_to_stage_mapping:
|
||||
deployment.deployment_stage = self.phase_to_stage_mapping[payload.phase]
|
||||
else:
|
||||
deployment.deployment_stage = payload.phase
|
||||
|
||||
# Update app URL if provided and deployment is successful
|
||||
if payload.url and payload.status == "success":
|
||||
deployment.deployment_app_url = payload.url
|
||||
|
||||
# Update timestamp
|
||||
deployment.updated_at = datetime.now()
|
||||
|
||||
# Save the updated deployment
|
||||
await deployment.save()
|
||||
|
||||
await self.module_logger.log_info(
|
||||
text=f"Updated deployment {payload.id}: status={deployment.deployment_status}, stage={deployment.deployment_stage}",
|
||||
data={
|
||||
"deployment_id": payload.id,
|
||||
"status": deployment.deployment_status,
|
||||
"stage": deployment.deployment_stage,
|
||||
"app_url": deployment.deployment_app_url,
|
||||
"error": payload.error
|
||||
}
|
||||
)
|
||||
|
||||
# Log errors if present
|
||||
if payload.error:
|
||||
await self.module_logger.log_error(
|
||||
text=f"Deployment {payload.id} failed: {payload.error}",
|
||||
data={"deployment_id": payload.id, "error": payload.error, "phase": payload.phase}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
await self.module_logger.log_exception(
|
||||
exception=e,
|
||||
text=f"Error processing heartbeat message: {message_data}",
|
||||
)
|
||||
0
apps/devops/app/bootstrap/__init__.py
Normal file
0
apps/devops/app/bootstrap/__init__.py
Normal file
83
apps/devops/app/bootstrap/application.py
Normal file
83
apps/devops/app/bootstrap/application.py
Normal file
@ -0,0 +1,83 @@
|
||||
import logging
|
||||
from fastapi import FastAPI
|
||||
from fastapi.openapi.utils import get_openapi
|
||||
|
||||
from app.providers import common
|
||||
from app.providers.logger import register_logger
|
||||
from app.providers import router
|
||||
from app.providers import database
|
||||
from app.providers import metrics
|
||||
from app.providers import probes
|
||||
from app.providers import exception_handler
|
||||
from app.providers import message_queue
|
||||
from app.common.config.app_settings import app_settings
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
logging.info("App initializing")
|
||||
|
||||
app = FreeleapsApp()
|
||||
|
||||
register_logger()
|
||||
register(app, exception_handler)
|
||||
register(app, database)
|
||||
register(app, router)
|
||||
register(app, common)
|
||||
register(app, message_queue)
|
||||
|
||||
# Call the custom_openapi function to change the OpenAPI version
|
||||
customize_openapi_security(app)
|
||||
# Register probe APIs if enabled
|
||||
if app_settings.PROBES_ENABLED:
|
||||
register(app, probes)
|
||||
|
||||
# Register metrics APIs if enabled
|
||||
if app_settings.METRICS_ENABLED:
|
||||
register(app, metrics)
|
||||
return app
|
||||
|
||||
|
||||
# This function overrides the OpenAPI schema version to 3.0.0
|
||||
def customize_openapi_security(app: FastAPI) -> None:
|
||||
|
||||
def custom_openapi():
|
||||
if app.openapi_schema:
|
||||
return app.openapi_schema
|
||||
|
||||
# Generate OpenAPI schema
|
||||
openapi_schema = get_openapi(
|
||||
title="FreeLeaps API",
|
||||
version="3.1.0",
|
||||
description="FreeLeaps API Documentation",
|
||||
routes=app.routes,
|
||||
)
|
||||
|
||||
# Ensure the components section exists in the OpenAPI schema
|
||||
if "components" not in openapi_schema:
|
||||
openapi_schema["components"] = {}
|
||||
|
||||
# Add security scheme to components
|
||||
openapi_schema["components"]["securitySchemes"] = {
|
||||
"bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
|
||||
}
|
||||
|
||||
# Add security requirement globally
|
||||
openapi_schema["security"] = [{"bearerAuth": []}]
|
||||
|
||||
app.openapi_schema = openapi_schema
|
||||
return app.openapi_schema
|
||||
|
||||
app.openapi = custom_openapi
|
||||
|
||||
|
||||
def register(app, provider):
|
||||
logging.info(provider.__name__ + " registering")
|
||||
provider.register(app)
|
||||
|
||||
|
||||
def boot(app, provider):
|
||||
logging.info(provider.__name__ + " booting")
|
||||
provider.boot(app)
|
||||
|
||||
class FreeleapsApp(FastAPI):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
0
apps/devops/app/common/__init__.py
Normal file
0
apps/devops/app/common/__init__.py
Normal file
0
apps/devops/app/common/config/__init__.py
Normal file
0
apps/devops/app/common/config/__init__.py
Normal file
35
apps/devops/app/common/config/app_settings.py
Normal file
35
apps/devops/app/common/config/app_settings.py
Normal file
@ -0,0 +1,35 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
# NOTE: The values fall backs to your environment variables when not set here
|
||||
class AppSettings(BaseSettings):
|
||||
NAME: str = "YOUR_APP_NAME"
|
||||
APP_NAME: str = NAME
|
||||
APP_ENV: str = "alpha"
|
||||
|
||||
JWT_SECRET_KEY: str = ""
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600
|
||||
REFRESH_TOKEN_EXPIRE_DAYS: int = 1
|
||||
|
||||
METRICS_ENABLED: bool = False
|
||||
PROBES_ENABLED: bool = True
|
||||
|
||||
APP_MONGODB_URI: str = "mongodb://localhost:27017"
|
||||
APP_MONGODB_NAME: str = "testdb"
|
||||
|
||||
LOG_BASE_PATH: str = "./log"
|
||||
BACKEND_LOG_FILE_NAME: str = APP_NAME
|
||||
APPLICATION_ACTIVITY_LOG: str = APP_NAME + "-application-activity"\
|
||||
|
||||
RABBITMQ_HOST: str = "localhost"
|
||||
RABBITMQ_PORT: int = 5672
|
||||
RABBITMQ_USERNAME: str = "guest"
|
||||
RABBITMQ_PASSWORD: str = "guest"
|
||||
RABBITMQ_VIRTUAL_HOST: str = "/"
|
||||
|
||||
|
||||
class Config:
|
||||
env_file = ".myapp.env"
|
||||
env_file_encoding = "utf-8"
|
||||
|
||||
|
||||
app_settings = AppSettings()
|
||||
16
apps/devops/app/common/config/log_settings.py
Normal file
16
apps/devops/app/common/config/log_settings.py
Normal file
@ -0,0 +1,16 @@
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from .app_settings import app_settings
|
||||
|
||||
@dataclass
|
||||
class LogSettings:
|
||||
LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
|
||||
LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
|
||||
LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
|
||||
MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
|
||||
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
|
||||
APP_NAME: str = app_settings.APP_NAME
|
||||
ENVIRONMENT: str = app_settings.APP_ENV
|
||||
|
||||
|
||||
log_settings = LogSettings()
|
||||
35
apps/devops/app/common/config/site_settings.py
Normal file
35
apps/devops/app/common/config/site_settings.py
Normal file
@ -0,0 +1,35 @@
|
||||
import os
|
||||
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
# NOTE: The values fall backs to your environment variables when not set here
|
||||
class SiteSettings(BaseSettings):
|
||||
NAME: str = "appname"
|
||||
DEBUG: bool = True
|
||||
|
||||
ENV: str = "dev"
|
||||
|
||||
SERVER_HOST: str = "localhost"
|
||||
SERVER_PORT: int = 8000
|
||||
|
||||
URL: str = "http://localhost"
|
||||
TIME_ZONE: str = "UTC"
|
||||
|
||||
BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__))))
|
||||
|
||||
BASE_GITEA_URL: str = "https://gitea.freeleaps.mathmast.com"
|
||||
|
||||
# TODO: confirm with Zhenyu
|
||||
BASE_RECONCILE_URL: str = "https://reconcile.freeleaps.mathmast.com"
|
||||
|
||||
# TODO: modify this with actual Loki URL
|
||||
BASE_LOKI_URL: str = "http://localhost:3100"
|
||||
|
||||
class Config:
|
||||
env_file = ".devbase-webapi.env"
|
||||
env_file_encoding = "utf-8"
|
||||
|
||||
|
||||
site_settings = SiteSettings()
|
||||
|
||||
0
apps/devops/app/common/log/__init__.py
Normal file
0
apps/devops/app/common/log/__init__.py
Normal file
12
apps/devops/app/common/log/application_logger.py
Normal file
12
apps/devops/app/common/log/application_logger.py
Normal file
@ -0,0 +1,12 @@
|
||||
from .base_logger import LoggerBase
|
||||
from app.common.config.app_settings import app_settings
|
||||
|
||||
class ApplicationLogger(LoggerBase):
|
||||
def __init__(self, application_activities: dict[str, any] = {}) -> None:
|
||||
extra_fileds = {}
|
||||
if application_activities:
|
||||
extra_fileds.update(application_activities)
|
||||
super().__init__(
|
||||
logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
|
||||
extra_fileds=extra_fileds,
|
||||
)
|
||||
136
apps/devops/app/common/log/base_logger.py
Normal file
136
apps/devops/app/common/log/base_logger.py
Normal file
@ -0,0 +1,136 @@
|
||||
from loguru import logger as guru_logger
|
||||
from app.common.config.log_settings import log_settings
|
||||
from typing import Dict, Any, Optional
|
||||
import socket
|
||||
import json
|
||||
import threading
|
||||
import os
|
||||
import sys
|
||||
import inspect
|
||||
import logging
|
||||
|
||||
from app.common.log.json_sink import JsonSink
|
||||
|
||||
class LoggerBase:
|
||||
binded_loggers = {}
|
||||
logger_lock = threading.Lock()
|
||||
|
||||
def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
|
||||
self.__logger_name = logger_name
|
||||
self.extra_fileds = extra_fileds
|
||||
with LoggerBase.logger_lock:
|
||||
if self.__logger_name in LoggerBase.binded_loggers:
|
||||
self.logger = LoggerBase.binded_loggers[self.__logger_name]
|
||||
return
|
||||
|
||||
log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
|
||||
log_level = "INFO"
|
||||
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
|
||||
|
||||
guru_logger.remove()
|
||||
|
||||
file_sink = JsonSink(
|
||||
log_file_path=log_filename,
|
||||
rotation_size_bytes=rotation_bytes,
|
||||
max_backup_files=log_settings.MAX_BACKUP_FILES
|
||||
)
|
||||
guru_logger.add(
|
||||
sink=file_sink,
|
||||
level=log_level,
|
||||
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
|
||||
)
|
||||
|
||||
guru_logger.add(
|
||||
sink=sys.stderr,
|
||||
level=log_level,
|
||||
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
|
||||
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
|
||||
)
|
||||
|
||||
host_name = socket.gethostname()
|
||||
host_ip = socket.gethostbyname(host_name)
|
||||
self.logger = guru_logger.bind(
|
||||
topic=self.__logger_name,
|
||||
host_ip=host_ip,
|
||||
host_name=host_name,
|
||||
app=log_settings.APP_NAME,
|
||||
env=log_settings.ENVIRONMENT,
|
||||
)
|
||||
with LoggerBase.logger_lock:
|
||||
LoggerBase.binded_loggers[self.__logger_name] = self.logger
|
||||
|
||||
def _get_log_context(self) -> dict:
|
||||
frame = inspect.currentframe().f_back.f_back
|
||||
filename = os.path.basename(frame.f_code.co_filename)
|
||||
lineno = frame.f_lineno
|
||||
return {"log_file": filename, "log_line": lineno}
|
||||
|
||||
def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
props = {} if properties is None else properties.copy()
|
||||
props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
|
||||
return props, props_str
|
||||
|
||||
async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
|
||||
local_logger.info(text)
|
||||
|
||||
async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
|
||||
local_logger.exception(text)
|
||||
|
||||
async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
|
||||
local_logger.info(text)
|
||||
|
||||
async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
|
||||
local_logger.warning(text)
|
||||
|
||||
async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
|
||||
local_logger.error(text)
|
||||
|
||||
@staticmethod
|
||||
def configure_uvicorn_logging():
|
||||
print("📢 Setting up uvicorn logging interception...")
|
||||
|
||||
# Intercept logs from these loggers
|
||||
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
|
||||
|
||||
class InterceptHandler(logging.Handler):
|
||||
def emit(self, record):
|
||||
level = (
|
||||
guru_logger.level(record.levelname).name
|
||||
if guru_logger.level(record.levelname, None)
|
||||
else record.levelno
|
||||
)
|
||||
frame, depth = logging.currentframe(), 2
|
||||
while frame.f_code.co_filename == logging.__file__:
|
||||
frame = frame.f_back
|
||||
depth += 1
|
||||
|
||||
guru_logger.opt(depth=depth, exception=record.exc_info).log(
|
||||
level,
|
||||
f"[{record.name}] {record.getMessage()}",
|
||||
)
|
||||
|
||||
# Replace default handlers
|
||||
logging.root.handlers.clear()
|
||||
logging.root.setLevel(logging.INFO)
|
||||
logging.root.handlers = [InterceptHandler()]
|
||||
|
||||
# Configure specific uvicorn loggers
|
||||
for logger_name in intercept_loggers:
|
||||
logging_logger = logging.getLogger(logger_name)
|
||||
logging_logger.handlers.clear() # Remove default handlers
|
||||
logging_logger.propagate = True # Ensure propagation through Loguru
|
||||
85
apps/devops/app/common/log/json_sink.py
Normal file
85
apps/devops/app/common/log/json_sink.py
Normal file
@ -0,0 +1,85 @@
|
||||
import json
|
||||
import datetime
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
class JsonSink:
|
||||
def __init__(
|
||||
self,
|
||||
log_file_path: str,
|
||||
rotation_size_bytes: int = 10 * 1024 * 1024,
|
||||
max_backup_files: int = 5,
|
||||
):
|
||||
self.log_file_path = Path(log_file_path)
|
||||
self.rotation_size = rotation_size_bytes
|
||||
self.max_backup_files = max_backup_files
|
||||
self._open_log_file()
|
||||
|
||||
def _open_log_file(self):
|
||||
# ensure the parent directory exists
|
||||
parent_dir = self.log_file_path.parent
|
||||
if not parent_dir.exists():
|
||||
parent_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.log_file = self.log_file_path.open("a", encoding="utf-8")
|
||||
|
||||
def _should_rotate(self) -> bool:
|
||||
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
|
||||
|
||||
def _rotate(self):
|
||||
self.log_file.close()
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
|
||||
self.log_file_path.rename(rotated_path)
|
||||
self._cleanup_old_backups()
|
||||
self._open_log_file()
|
||||
|
||||
def _cleanup_old_backups(self):
|
||||
parent = self.log_file_path.parent
|
||||
stem = self.log_file_path.stem
|
||||
suffix = self.log_file_path.suffix
|
||||
|
||||
backup_files = sorted(
|
||||
parent.glob(f"{stem}_*{suffix}"),
|
||||
key=lambda p: p.stat().st_mtime,
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
for old_file in backup_files[self.max_backup_files:]:
|
||||
try:
|
||||
old_file.unlink()
|
||||
except Exception as e:
|
||||
print(f"Failed to delete old backup {old_file}: {e}")
|
||||
|
||||
def __call__(self, message):
|
||||
record = message.record
|
||||
if self._should_rotate():
|
||||
self._rotate()
|
||||
|
||||
log_entry = {
|
||||
"level": record["level"].name.lower(),
|
||||
"timestamp": int(record["time"].timestamp() * 1000),
|
||||
"text": record["message"],
|
||||
"fields": record["extra"].get("properties", {}),
|
||||
"context": {
|
||||
"app": record["extra"].get("app"),
|
||||
"env": record["extra"].get("env"),
|
||||
"log_file": record["extra"].get("log_file"),
|
||||
"log_line": record["extra"].get("log_line"),
|
||||
"topic": record["extra"].get("topic"),
|
||||
"sender_id": record["extra"].get("sender_id"),
|
||||
"receiver_id": record["extra"].get("receiver_id"),
|
||||
"subject": record["extra"].get("subject"),
|
||||
"event": record["extra"].get("event"),
|
||||
"host_ip": record["extra"].get("host_ip"),
|
||||
"host_name": record["extra"].get("host_name"),
|
||||
},
|
||||
"stacktrace": None
|
||||
}
|
||||
|
||||
if record["exception"]:
|
||||
exc_type, exc_value, exc_tb = record["exception"]
|
||||
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
|
||||
|
||||
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
|
||||
self.log_file.flush()
|
||||
46
apps/devops/app/common/log/module_logger.py
Normal file
46
apps/devops/app/common/log/module_logger.py
Normal file
@ -0,0 +1,46 @@
|
||||
from .application_logger import ApplicationLogger
|
||||
|
||||
|
||||
class ModuleLogger(ApplicationLogger):
|
||||
def __init__(self, sender_id: str) -> None:
|
||||
super().__init__()
|
||||
self.event_sender_id = sender_id
|
||||
self.event_receiver_id = "ModuleLogger"
|
||||
self.event_subject = "module"
|
||||
|
||||
async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None:
|
||||
return await super().log_exception(
|
||||
sender_id=self.event_sender_id,
|
||||
receiver_id=self.event_receiver_id,
|
||||
subject=self.event_subject,
|
||||
exception=exception,
|
||||
text=text,
|
||||
properties=properties,
|
||||
)
|
||||
|
||||
async def log_info(self, text: str, data: dict[str, any] = None) -> None:
|
||||
return await super().log_info(
|
||||
sender_id=self.event_sender_id,
|
||||
receiver_id=self.event_receiver_id,
|
||||
subject=self.event_subject,
|
||||
text=text,
|
||||
properties=data,
|
||||
)
|
||||
|
||||
async def log_warning(self, text: str, data: dict[str, any] = None) -> None:
|
||||
return await super().log_warning(
|
||||
sender_id=self.event_sender_id,
|
||||
receiver_id=self.event_receiver_id,
|
||||
subject=self.event_subject,
|
||||
text=text,
|
||||
properties=data,
|
||||
)
|
||||
|
||||
async def log_error(self, text: str, data: dict[str, any] = None) -> None:
|
||||
return await super().log_error(
|
||||
sender_id=self.event_sender_id,
|
||||
receiver_id=self.event_receiver_id,
|
||||
subject=self.event_subject,
|
||||
text=text,
|
||||
properties=data,
|
||||
)
|
||||
7
apps/devops/app/common/models/__init__.py
Normal file
7
apps/devops/app/common/models/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
from app.common.models.code_depot.code_depot import CodeDepotDoc
|
||||
from app.common.models.deployment.deployment import Deployment
|
||||
from app.common.models.deployment.deployment import DevOpsReconcileRequest, DevOpsReconcileOperationType
|
||||
|
||||
# list of beanie document models,
|
||||
# must add here so that the mongo db collection can be automatically created
|
||||
db_models = [Deployment, CodeDepotDoc]
|
||||
39
apps/devops/app/common/models/code_depot/code_depot.py
Normal file
39
apps/devops/app/common/models/code_depot/code_depot.py
Normal file
@ -0,0 +1,39 @@
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
from beanie import Document
|
||||
from enum import IntEnum
|
||||
|
||||
from pymongo import IndexModel
|
||||
|
||||
|
||||
class DepotStatus(IntEnum):
|
||||
TO_BE_CREATED = 0
|
||||
CREATED = 1
|
||||
DELETED = 2
|
||||
|
||||
|
||||
class UserAccountStatus(IntEnum):
|
||||
TO_BE_CREATED = 0
|
||||
CREATED = 1
|
||||
DELETED = 2
|
||||
DEACTIVATED = 3
|
||||
|
||||
class CodeDepotDoc(Document):
|
||||
depot_name: str
|
||||
product_id: str
|
||||
depot_status: DepotStatus
|
||||
collaborators: List[str] = []
|
||||
total_commits: Optional[int] = 0
|
||||
last_commiter: Optional[str] = ""
|
||||
last_update: Optional[datetime] = datetime.now(timezone.utc)
|
||||
weekly_commits: Optional[Dict[str, int]] = {}
|
||||
|
||||
class Settings:
|
||||
name = "code_depot"
|
||||
indexes = [
|
||||
IndexModel([("product_id", 1)])
|
||||
]
|
||||
|
||||
|
||||
|
||||
88
apps/devops/app/common/models/deployment/deployment.py
Normal file
88
apps/devops/app/common/models/deployment/deployment.py
Normal file
@ -0,0 +1,88 @@
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Literal, List, Optional
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
from beanie import Document
|
||||
from pydantic import Field, field_validator
|
||||
from pydantic import BaseModel
|
||||
from pymongo import IndexModel
|
||||
|
||||
|
||||
class Deployment(Document):
|
||||
deployment_id: str
|
||||
|
||||
deployment_stage: str
|
||||
deployment_status: Literal["started", "failed", "succeeded", "aborted"]
|
||||
|
||||
deployment_target_env: Literal["alpha", "prod"]
|
||||
deployment_ttl_hours: int = 2
|
||||
|
||||
deployment_project_id: str
|
||||
deployment_project_name: str
|
||||
deployment_product_id: str
|
||||
deployment_product_name: str
|
||||
deployment_git_url: str
|
||||
deployment_git_sha256: str
|
||||
deployment_reason: str
|
||||
deployment_app_url: str = "" # URL to access the deployed application, keep it empty to be filled later
|
||||
|
||||
deployed_by: str
|
||||
created_at: datetime = datetime.now()
|
||||
updated_at: datetime = datetime.now()
|
||||
|
||||
class Settings:
|
||||
name = "deployment"
|
||||
indexes = [
|
||||
IndexModel([("deployment_product_id", 1), ("created_at", 1)]),
|
||||
IndexModel([("deployment_id", 1), ("deployment_status", 1)]),
|
||||
IndexModel([("deployment_id", 1), ("deployment_stage", 1)], unique=True)
|
||||
]
|
||||
|
||||
class InitDeploymentRequest(BaseModel):
|
||||
product_id: str
|
||||
sha256: str = ""
|
||||
target_env: Literal["alpha", "prod"]
|
||||
user_id: str
|
||||
reason: str = "not provided"
|
||||
ttl_hours: int = 3
|
||||
|
||||
class CheckDeploymentStatusRequest(BaseModel):
|
||||
product_id: str
|
||||
target_env: str
|
||||
user_id: str
|
||||
|
||||
class CheckApplicationLogsRequest(BaseModel):
|
||||
product_id: str
|
||||
target_env: Literal["alpha", "prod"]
|
||||
user_id: str = ''
|
||||
log_level: List[Literal["info", "error", "debug"]] = Field(default_factory=lambda: ["info"])
|
||||
start_time: datetime = datetime.now() - timedelta(minutes=5)
|
||||
end_time: datetime = datetime.now()
|
||||
limit: int = 1000
|
||||
|
||||
class CheckApplicationLogsResponse(BaseModel):
|
||||
product_id: str
|
||||
target_env: Literal["alpha", "prod"]
|
||||
user_id: str = ''
|
||||
log_level: List[Literal["info", "error", "debug"]]
|
||||
start_time: datetime
|
||||
end_time: datetime
|
||||
limit: int
|
||||
logs: list[str]
|
||||
|
||||
class DevOpsReconcileOperationType(str, Enum):
|
||||
START = "start"
|
||||
TERMINATE = "terminate"
|
||||
RESTART = "restart"
|
||||
|
||||
class DevOpsReconcileRequest(BaseModel):
|
||||
operation: DevOpsReconcileOperationType
|
||||
id: str
|
||||
devops_proj_id: str
|
||||
triggered_user_id: str
|
||||
causes: str
|
||||
commit_sha256: Optional[str] = None
|
||||
target_env: Literal["alpha", "prod"]
|
||||
ttl_control: bool = False
|
||||
ttl: int = 10800
|
||||
17
apps/devops/app/common/models/deployment/heartbeat.py
Normal file
17
apps/devops/app/common/models/deployment/heartbeat.py
Normal file
@ -0,0 +1,17 @@
|
||||
from typing import Literal, Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class DevOpsReconcileJobHeartbeatPayload(BaseModel):
|
||||
operation: Literal["heartbeat"] = "heartbeat"
|
||||
id: str
|
||||
status: Literal["running", "success", "failed", "terminated"]
|
||||
phase: Literal["initializing", "jenkins_build", "building", "deploying", "finished"]
|
||||
phase_message: str
|
||||
error: Optional[str] = None
|
||||
url: Optional[str] = None
|
||||
|
||||
|
||||
class DevOpsReconcileJobHeartbeatMessage(BaseModel):
|
||||
event_type: Literal["DevOpsReconcileJobHeartbeat"]
|
||||
payload: DevOpsReconcileJobHeartbeatPayload
|
||||
140
apps/devops/app/common/probes/__init__.py
Normal file
140
apps/devops/app/common/probes/__init__.py
Normal file
@ -0,0 +1,140 @@
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import Optional, Callable, Tuple, Dict
|
||||
import inspect
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# ProbeType is an Enum that defines the types of probes that can be registered.
|
||||
class ProbeType(Enum):
|
||||
LIVENESS = "liveness"
|
||||
READINESS = "readiness"
|
||||
STARTUP = "startup"
|
||||
|
||||
# ProbeResult is a class that represents the result of a probe check.
|
||||
class ProbeResult:
|
||||
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
|
||||
self.success = success
|
||||
self.message = message
|
||||
self.data = data or {}
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"success": self.success,
|
||||
"message": self.message,
|
||||
"data": self.data
|
||||
}
|
||||
|
||||
# Probe is a class that represents a probe that can be registered.
|
||||
class Probe:
|
||||
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
|
||||
self.type = type
|
||||
self.path = path
|
||||
self.check_fn = check_fn
|
||||
self.name = name or f"{type.value}-{id(self)}"
|
||||
|
||||
async def execute(self) -> ProbeResult:
|
||||
try:
|
||||
result = self.check_fn()
|
||||
if inspect.isawaitable(result):
|
||||
result = await result
|
||||
|
||||
if isinstance(result, ProbeResult):
|
||||
return result
|
||||
elif isinstance(result, bool):
|
||||
return ProbeResult(result, "ok" if result else "failed")
|
||||
else:
|
||||
return ProbeResult(True, "ok")
|
||||
except Exception as e:
|
||||
return ProbeResult(False, str(e))
|
||||
|
||||
# ProbeGroup is a class that represents a group of probes that can be checked together.
|
||||
class ProbeGroup:
|
||||
def __init__(self, path: str):
|
||||
self.path = path
|
||||
self.probes: Dict[str, Probe] = {}
|
||||
|
||||
def add_probe(self, probe: Probe):
|
||||
self.probes[probe.name] = probe
|
||||
|
||||
async def check_all(self) -> Tuple[bool, dict]:
|
||||
results = {}
|
||||
all_success = True
|
||||
|
||||
for name, probe in self.probes.items():
|
||||
result = await probe.execute()
|
||||
results[name] = result.to_dict()
|
||||
if not result.success:
|
||||
all_success = False
|
||||
|
||||
return all_success, results
|
||||
|
||||
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
|
||||
class FrameworkAdapter:
|
||||
async def handle_request(self, group: ProbeGroup):
|
||||
all_success, results = await group.check_all()
|
||||
status_code = 200 if all_success else 503
|
||||
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
|
||||
|
||||
def register_route(self, path: str, handler: Callable):
|
||||
raise NotImplementedError
|
||||
|
||||
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
|
||||
class ProbeManager:
|
||||
_default_paths = {
|
||||
ProbeType.LIVENESS: "/_/livez",
|
||||
ProbeType.READINESS: "/_/readyz",
|
||||
ProbeType.STARTUP: "/_/healthz"
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.groups: Dict[str, ProbeGroup] = {}
|
||||
self.adapters: Dict[str, FrameworkAdapter] = {}
|
||||
self._startup_complete = False
|
||||
|
||||
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
|
||||
self.adapters[framework] = adapter
|
||||
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
|
||||
|
||||
def register(
|
||||
self,
|
||||
type: ProbeType,
|
||||
check_func: Optional[Callable] = None,
|
||||
path: Optional[str] = None,
|
||||
prefix: str = "",
|
||||
name: Optional[str] = None,
|
||||
frameworks: Optional[list] = None
|
||||
):
|
||||
path = path or self._default_paths.get(type, "/_/healthz")
|
||||
if prefix:
|
||||
path = f"{prefix}{path}"
|
||||
|
||||
if type == ProbeType.STARTUP and check_func is None:
|
||||
check_func = self._default_startup_check
|
||||
|
||||
probe = Probe(type, path, check_func or (lambda: True), name)
|
||||
|
||||
if path not in self.groups:
|
||||
self.groups[path] = ProbeGroup(path)
|
||||
self.groups[path].add_probe(probe)
|
||||
|
||||
for framework in (frameworks or ["default"]):
|
||||
self._register_route(framework, path)
|
||||
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
|
||||
|
||||
def _register_route(self, framework: str, path: str):
|
||||
if framework not in self.adapters:
|
||||
return
|
||||
|
||||
adapter = self.adapters[framework]
|
||||
group = self.groups[path]
|
||||
|
||||
async def handler():
|
||||
return await adapter.handle_request(group)
|
||||
|
||||
adapter.register_route(path, handler)
|
||||
|
||||
def _default_startup_check(self) -> bool:
|
||||
return self._startup_complete
|
||||
|
||||
def mark_startup_complete(self):
|
||||
self._startup_complete = True
|
||||
15
apps/devops/app/common/probes/adapters.py
Normal file
15
apps/devops/app/common/probes/adapters.py
Normal file
@ -0,0 +1,15 @@
|
||||
from . import FrameworkAdapter
|
||||
from fastapi.responses import JSONResponse
|
||||
from typing import Callable
|
||||
|
||||
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
|
||||
class FastAPIAdapter(FrameworkAdapter):
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
|
||||
def register_route(self,path: str, handler: Callable):
|
||||
async def wrapper():
|
||||
data, status_code = await handler()
|
||||
return JSONResponse(content=data, status_code=status_code)
|
||||
|
||||
self.app.add_api_route(path, wrapper, methods=["GET"])
|
||||
0
apps/devops/app/envs/alpha.yml
Normal file
0
apps/devops/app/envs/alpha.yml
Normal file
0
apps/devops/app/envs/prod.yml
Normal file
0
apps/devops/app/envs/prod.yml
Normal file
17
apps/devops/app/main.py
Normal file
17
apps/devops/app/main.py
Normal file
@ -0,0 +1,17 @@
|
||||
from fastapi.responses import RedirectResponse
|
||||
from app.common.config.site_settings import site_settings
|
||||
from app.bootstrap.application import create_app
|
||||
|
||||
app = create_app()
|
||||
|
||||
@app.get("/", status_code=301)
|
||||
async def root():
|
||||
"""
|
||||
TODO: redirect client to /doc#
|
||||
"""
|
||||
return RedirectResponse("docs")
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
print("Starting FastAPI server...")
|
||||
uvicorn.run("main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT, reload=True)
|
||||
0
apps/devops/app/providers/__init__.py
Normal file
0
apps/devops/app/providers/__init__.py
Normal file
31
apps/devops/app/providers/common.py
Normal file
31
apps/devops/app/providers/common.py
Normal file
@ -0,0 +1,31 @@
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from app.common.config.site_settings import site_settings
|
||||
|
||||
|
||||
def register(app):
|
||||
app.debug = site_settings.DEBUG
|
||||
app.title = site_settings.NAME
|
||||
|
||||
add_global_middleware(app)
|
||||
|
||||
# This hook ensures that a connection is opened to handle any queries
|
||||
# generated by the request.
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
pass
|
||||
|
||||
# This hook ensures that the connection is closed when we've finished
|
||||
# processing the request.
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown():
|
||||
pass
|
||||
|
||||
|
||||
def add_global_middleware(app):
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
34
apps/devops/app/providers/database.py
Normal file
34
apps/devops/app/providers/database.py
Normal file
@ -0,0 +1,34 @@
|
||||
import asyncio
|
||||
from app.common.config.app_settings import app_settings
|
||||
from beanie import init_beanie
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
from app.common.models import db_models
|
||||
from app.common.probes import ProbeResult
|
||||
|
||||
client = AsyncIOMotorClient(
|
||||
app_settings.APP_MONGODB_URI,
|
||||
serverSelectionTimeoutMS=60000,
|
||||
minPoolSize=5, # Minimum number of connections in the pool
|
||||
maxPoolSize=20, # Maximum number of connections in the pool
|
||||
)
|
||||
|
||||
def register(app):
|
||||
app.debug = "auth_mongo_debug"
|
||||
app.title = "auth_mongo_name"
|
||||
|
||||
@app.on_event("startup")
|
||||
async def start_database():
|
||||
await initiate_database()
|
||||
|
||||
async def check_database_initialized() -> ProbeResult:
|
||||
try:
|
||||
await asyncio.wait_for(client.server_info(), timeout=5)
|
||||
return ProbeResult(success=True, message="service has been initialized and ready to serve")
|
||||
except Exception:
|
||||
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
|
||||
|
||||
|
||||
async def initiate_database():
|
||||
await init_beanie(
|
||||
database=client[app_settings.APP_MONGODB_NAME], document_models=db_models
|
||||
)
|
||||
39
apps/devops/app/providers/exception_handler.py
Normal file
39
apps/devops/app/providers/exception_handler.py
Normal file
@ -0,0 +1,39 @@
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import JSONResponse
|
||||
from starlette.status import (
|
||||
HTTP_400_BAD_REQUEST,
|
||||
HTTP_401_UNAUTHORIZED,
|
||||
HTTP_403_FORBIDDEN,
|
||||
HTTP_404_NOT_FOUND,
|
||||
HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
|
||||
|
||||
async def custom_http_exception_handler(request: Request, exc: HTTPException):
|
||||
return JSONResponse(
|
||||
status_code=exc.status_code,
|
||||
content={"error": exc.detail},
|
||||
)
|
||||
|
||||
|
||||
|
||||
async def validation_exception_handler(request: Request, exc: RequestValidationError):
|
||||
return JSONResponse(
|
||||
status_code=HTTP_400_BAD_REQUEST,
|
||||
content={"error": str(exc)},
|
||||
)
|
||||
|
||||
async def exception_handler(request: Request, exc: Exception):
|
||||
return JSONResponse(
|
||||
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content={"error": str(exc)},
|
||||
)
|
||||
|
||||
|
||||
def register(app: FastAPI):
|
||||
app.add_exception_handler(HTTPException, custom_http_exception_handler)
|
||||
app.add_exception_handler(RequestValidationError, validation_exception_handler)
|
||||
app.add_exception_handler(Exception, exception_handler)
|
||||
7
apps/devops/app/providers/logger.py
Normal file
7
apps/devops/app/providers/logger.py
Normal file
@ -0,0 +1,7 @@
|
||||
from app.common.log.base_logger import LoggerBase
|
||||
|
||||
|
||||
def register_logger():
|
||||
print("📢 Setting up logging interception...")
|
||||
LoggerBase.configure_uvicorn_logging()
|
||||
print("✅ Logging interception complete. Logs are formatted and deduplicated!")
|
||||
60
apps/devops/app/providers/message_queue.py
Normal file
60
apps/devops/app/providers/message_queue.py
Normal file
@ -0,0 +1,60 @@
|
||||
import asyncio
|
||||
import os
|
||||
from app.backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
|
||||
from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService
|
||||
|
||||
|
||||
def register(app):
|
||||
# Initialize the message subscriber and status update service lazily to avoid blocking startup
|
||||
app.deployment_heartbeat_subscriber = None
|
||||
app.deployment_status_service = None
|
||||
|
||||
@app.on_event("startup")
|
||||
async def start_message_consumers():
|
||||
print("🚀 Starting message consumers...")
|
||||
|
||||
try:
|
||||
# Initialize services during startup to avoid blocking app initialization
|
||||
print("🔧 Initializing services...")
|
||||
output_queue_name = os.getenv("RABBITMQ_OUTPUT_QUEUE_NAME", "freeleaps.devops.reconciler.output")
|
||||
print(f"Using output queue: {output_queue_name}")
|
||||
app.deployment_heartbeat_subscriber = AsyncMQSubscriber(output_queue_name)
|
||||
app.deployment_status_service = DeploymentStatusUpdateService()
|
||||
print("✅ Services initialized")
|
||||
|
||||
# Register the heartbeat processor
|
||||
print("📝 Registering deployment heartbeat processor...")
|
||||
await app.deployment_heartbeat_subscriber.register_consumer(
|
||||
registry_key="deployment_heartbeat_processor",
|
||||
callback_method=app.deployment_status_service.process_heartbeat_message,
|
||||
args={}
|
||||
)
|
||||
print("✅ Registered deployment heartbeat processor")
|
||||
|
||||
# Start the subscriber in the background
|
||||
print("🔄 Starting subscriber in background...")
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.create_task(
|
||||
app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop)
|
||||
)
|
||||
print("✅ Started deployment heartbeat subscriber")
|
||||
print("🎉 Message consumers startup complete!")
|
||||
except Exception as e:
|
||||
print(f"❌ Error in message consumer startup: {e}")
|
||||
# Don't raise the exception to prevent app startup failure
|
||||
print("⚠️ App will continue without message queue functionality")
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def stop_message_consumers():
|
||||
# Clear consumers and close connection
|
||||
print("Stopping message consumers")
|
||||
if app.deployment_heartbeat_subscriber:
|
||||
try:
|
||||
await app.deployment_heartbeat_subscriber.clear_all_consumers()
|
||||
print("Cleared all consumers")
|
||||
await app.deployment_heartbeat_subscriber.close()
|
||||
print("Closed deployment heartbeat subscriber")
|
||||
except Exception as e:
|
||||
print(f"Error during shutdown: {e}")
|
||||
else:
|
||||
print("No message consumers to stop")
|
||||
16
apps/devops/app/providers/metrics.py
Normal file
16
apps/devops/app/providers/metrics.py
Normal file
@ -0,0 +1,16 @@
|
||||
import logging
|
||||
from prometheus_fastapi_instrumentator import Instrumentator
|
||||
from app.common.config.app_settings import app_settings
|
||||
|
||||
def register(app):
|
||||
instrumentator = (
|
||||
Instrumentator().instrument(
|
||||
app,
|
||||
metric_namespace="freeleaps",
|
||||
metric_subsystem=app_settings.APP_NAME)
|
||||
)
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
|
||||
logging.info("Metrics endpoint exposed at /api/_/metrics")
|
||||
25
apps/devops/app/providers/probes.py
Normal file
25
apps/devops/app/providers/probes.py
Normal file
@ -0,0 +1,25 @@
|
||||
from app.common.probes import ProbeManager, ProbeType
|
||||
from app.common.probes.adapters import FastAPIAdapter
|
||||
from .database import check_database_initialized
|
||||
|
||||
def register(app):
|
||||
probes_manager = ProbeManager()
|
||||
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
|
||||
|
||||
async def readiness_checker():
|
||||
return await check_database_initialized()
|
||||
|
||||
probes_manager.register(
|
||||
name="readiness",
|
||||
prefix="/api",
|
||||
type=ProbeType.READINESS,
|
||||
check_func=readiness_checker,
|
||||
frameworks=["fastapi"]
|
||||
)
|
||||
|
||||
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
|
||||
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
|
||||
|
||||
@app.on_event("startup")
|
||||
async def mark_startup_complete():
|
||||
probes_manager.mark_startup_complete()
|
||||
34
apps/devops/app/providers/router.py
Normal file
34
apps/devops/app/providers/router.py
Normal file
@ -0,0 +1,34 @@
|
||||
from app.routes import api_router
|
||||
|
||||
from starlette import routing
|
||||
|
||||
|
||||
def register(app):
|
||||
app.include_router(
|
||||
api_router,
|
||||
prefix="/api",
|
||||
tags=["api"],
|
||||
dependencies=[],
|
||||
responses={404: {"description": "no page found"}},
|
||||
)
|
||||
|
||||
if app.debug:
|
||||
for route in app.routes:
|
||||
if not isinstance(route, routing.WebSocketRoute):
|
||||
print(
|
||||
{
|
||||
"path": route.path,
|
||||
"endpoint": route.endpoint,
|
||||
"name": route.name,
|
||||
"methods": route.methods,
|
||||
}
|
||||
)
|
||||
else:
|
||||
print(
|
||||
{
|
||||
"path": route.path,
|
||||
"endpoint": route.endpoint,
|
||||
"name": route.name,
|
||||
"type": "web socket route",
|
||||
}
|
||||
)
|
||||
8
apps/devops/app/providers/scheduler.py
Normal file
8
apps/devops/app/providers/scheduler.py
Normal file
@ -0,0 +1,8 @@
|
||||
import asyncio
|
||||
|
||||
|
||||
def register(app):
|
||||
@app.on_event("startup")
|
||||
async def start_scheduler():
|
||||
#create your scheduler here
|
||||
pass
|
||||
7
apps/devops/app/routes/__init__.py
Normal file
7
apps/devops/app/routes/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
from fastapi import APIRouter
|
||||
from app.routes.deployment.apis import router as deployment_api
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
# TODO: add custom routers here
|
||||
api_router.include_router(deployment_api, tags=["deployment"])
|
||||
0
apps/devops/app/routes/deployment/__init__.py
Normal file
0
apps/devops/app/routes/deployment/__init__.py
Normal file
75
apps/devops/app/routes/deployment/apis.py
Normal file
75
apps/devops/app/routes/deployment/apis.py
Normal file
@ -0,0 +1,75 @@
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from loguru import logger
|
||||
|
||||
from app.common.models import CodeDepotDoc
|
||||
from app.common.models.deployment.deployment import Deployment, InitDeploymentRequest, CheckDeploymentStatusRequest, \
|
||||
CheckApplicationLogsRequest, CheckApplicationLogsResponse
|
||||
from app.routes.deployment.service import DeploymentService, get_deployment_service
|
||||
|
||||
router = APIRouter(prefix="/deployment")
|
||||
|
||||
@router.post("/initDeployment")
|
||||
## insert a new Deployment object to db
|
||||
async def init_deployment(
|
||||
request: InitDeploymentRequest,
|
||||
service: DeploymentService = Depends(get_deployment_service)
|
||||
) -> Deployment:
|
||||
return await service.init_deployment(request)
|
||||
|
||||
@router.get('/getLatestDeployment')
|
||||
async def get_latest_deployment(
|
||||
product_id: str,
|
||||
target_env: str = "alpha",
|
||||
service: DeploymentService = Depends(get_deployment_service)
|
||||
) -> Optional[Deployment]:
|
||||
"""
|
||||
Get the latest deployment for a given product ID.
|
||||
"""
|
||||
return await service.get_latest_deployment(product_id, target_env)
|
||||
|
||||
@router.post("/updateDeploymentStatus")
|
||||
async def update_deployment(
|
||||
request: Deployment,
|
||||
service: DeploymentService = Depends(get_deployment_service)
|
||||
) -> Deployment:
|
||||
return await service.update_deployment_status(request)
|
||||
|
||||
@router.get("/checkDeploymentStatus")
|
||||
async def check_deployment_status(
|
||||
product_id: str,
|
||||
service: DeploymentService = Depends(get_deployment_service)
|
||||
) -> List[Deployment]:
|
||||
return await service.check_deployment_status(product_id)
|
||||
|
||||
@router.post("/createDummyCodeDepot")
|
||||
async def create_dummy_code_depot(
|
||||
service: DeploymentService = Depends(get_deployment_service)
|
||||
) -> CodeDepotDoc:
|
||||
"""
|
||||
Create a dummy code depot for testing purposes.
|
||||
"""
|
||||
try:
|
||||
depot_name = await service.create_dummy_code_depot()
|
||||
return depot_name
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create dummy code depot: {e}")
|
||||
raise e
|
||||
|
||||
@router.post("/checkApplicationLogs")
|
||||
async def check_application_logs(
|
||||
request: CheckApplicationLogsRequest,
|
||||
service: DeploymentService = Depends(get_deployment_service)
|
||||
) -> CheckApplicationLogsResponse:
|
||||
"""
|
||||
Check application logs for a given deployment.
|
||||
"""
|
||||
try:
|
||||
res = await service.check_application_logs(request)
|
||||
return res
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check application logs: {e}")
|
||||
raise e
|
||||
|
||||
262
apps/devops/app/routes/deployment/service.py
Normal file
262
apps/devops/app/routes/deployment/service.py
Normal file
@ -0,0 +1,262 @@
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List
|
||||
|
||||
import httpx
|
||||
import requests
|
||||
from fastapi import HTTPException, Depends
|
||||
|
||||
from app.common.config.site_settings import site_settings
|
||||
from app.common.models import Deployment, DevOpsReconcileRequest, DevOpsReconcileOperationType
|
||||
from app.common.models.code_depot.code_depot import CodeDepotDoc, DepotStatus
|
||||
from app.common.models.deployment.deployment import InitDeploymentRequest, CheckApplicationLogsRequest, \
|
||||
CheckApplicationLogsResponse
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class DeploymentService:
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def init_deployment(
|
||||
self,
|
||||
request: InitDeploymentRequest,
|
||||
) -> Deployment:
|
||||
"""
|
||||
"""
|
||||
# TODO validate permission with user_id
|
||||
# currently skip
|
||||
|
||||
code_depot = await self._get_code_depot_by_product_id(request.product_id)
|
||||
|
||||
git_url = await self._compose_git_url(code_depot.depot_name)
|
||||
|
||||
|
||||
# retrieve project name
|
||||
project_name = "TODO"
|
||||
|
||||
# retrieve product info, depot name should be the same as product name
|
||||
product_id = request.product_id
|
||||
product_name = code_depot.depot_name
|
||||
|
||||
deployment = Deployment.model_construct(
|
||||
deployment_id = str(uuid.uuid4()),
|
||||
deployment_stage = "init",
|
||||
deployment_status = "started",
|
||||
deployment_target_env = request.target_env,
|
||||
deployment_ttl_hours = request.ttl_hours,
|
||||
deployment_project_id = "project_id",
|
||||
deployment_project_name = "project_name",
|
||||
deployment_product_id = product_id,
|
||||
deployment_product_name = product_name,
|
||||
deployment_git_url = git_url,
|
||||
deployment_git_sha256 = request.sha256,
|
||||
deployment_reason = request.reason,
|
||||
deployed_by = request.user_id,
|
||||
created_at = datetime.now(),
|
||||
updated_at = datetime.now(),
|
||||
)
|
||||
|
||||
await self._start_deployment(deployment)
|
||||
res = await Deployment.insert(deployment)
|
||||
|
||||
return res
|
||||
|
||||
async def get_latest_deployment(
|
||||
self,
|
||||
product_id: str,
|
||||
target_env: str,
|
||||
) -> Deployment:
|
||||
time_threshold = datetime.now() - timedelta(hours=168) # 7 days
|
||||
deployment_records = await Deployment.find(
|
||||
Deployment.deployment_product_id == product_id,
|
||||
Deployment.deployment_target_env == target_env,
|
||||
Deployment.updated_at >= time_threshold
|
||||
).to_list()
|
||||
|
||||
if not deployment_records or len(deployment_records) == 0:
|
||||
logger.warning(f"No deployment records found for product ID: {product_id} in the last 7 days")
|
||||
return None
|
||||
|
||||
latest_deployment = max(deployment_records, key=lambda d: (d.updated_at, d.created_at))
|
||||
return latest_deployment
|
||||
|
||||
async def check_deployment_status(
|
||||
self,
|
||||
product_id: str,
|
||||
) -> List[Deployment]:
|
||||
"""
|
||||
Check the deployment status of the application, only check past 48 hours
|
||||
"""
|
||||
# TODO implement this function
|
||||
time_threshold = datetime.now() - timedelta(hours=48)
|
||||
deployment_records = await Deployment.find(
|
||||
Deployment.deployment_product_id == product_id,
|
||||
Deployment.created_at >= time_threshold
|
||||
).to_list()
|
||||
grouped = defaultdict(list)
|
||||
for deployment in deployment_records:
|
||||
grouped[deployment.deployment_id].append(deployment)
|
||||
for deployment_list in grouped.values():
|
||||
deployment_list.sort(key=lambda d: (d.created_at, d.updated_at))
|
||||
latest_deployments = [deployments[-1] for deployments in grouped.values()]
|
||||
|
||||
return latest_deployments
|
||||
|
||||
async def update_deployment_status(
|
||||
self,
|
||||
deployment: Deployment
|
||||
) -> Deployment:
|
||||
latest_record = await Deployment.find_one(
|
||||
Deployment.deployment_id == deployment.deployment_id,
|
||||
sort=[("created_at", -1)]
|
||||
)
|
||||
|
||||
if not latest_record:
|
||||
raise HTTPException(status_code=404, detail="No record found, please initiate deployment first")
|
||||
|
||||
# TODO add more sanity check logic here
|
||||
|
||||
# if updating the same stage, just update the status and timestamp
|
||||
# else, create a new record with the same deployment_id
|
||||
res = None
|
||||
if deployment.deployment_stage == latest_record.deployment_stage:
|
||||
# update existing record
|
||||
latest_record.deployment_status = deployment.deployment_status
|
||||
latest_record.updated_at = deployment.updated_at or datetime.now()
|
||||
res = await latest_record.save()
|
||||
else:
|
||||
# create new record
|
||||
deployment.deployment_id = latest_record.deployment_id
|
||||
deployment.created_at = datetime.now()
|
||||
deployment.updated_at = datetime.now()
|
||||
res = await deployment.insert()
|
||||
|
||||
return res
|
||||
|
||||
async def _get_code_depot_by_product_id(
|
||||
self,
|
||||
product_id: str,
|
||||
) -> CodeDepotDoc:
|
||||
"""
|
||||
Retrieve code depot by product id
|
||||
"""
|
||||
code_depot = await CodeDepotDoc.find_one(CodeDepotDoc.product_id == product_id)
|
||||
if not code_depot:
|
||||
raise HTTPException(status_code=404,
|
||||
detail="Code depot not found for the given product id, "
|
||||
"please initialize the product first"
|
||||
)
|
||||
return code_depot
|
||||
|
||||
async def _compose_git_url(
|
||||
self,
|
||||
code_depot_name: str,
|
||||
gitea_base_url: str = site_settings.BASE_GITEA_URL
|
||||
) -> str:
|
||||
"""
|
||||
Retrieve git url by product id
|
||||
"""
|
||||
return f"{gitea_base_url}/prodcuts/{code_depot_name.lower()}.git"
|
||||
|
||||
async def _start_deployment(
|
||||
self,
|
||||
deployment: Deployment,
|
||||
reconsile_base_url: str = site_settings.BASE_RECONCILE_URL,
|
||||
) -> bool:
|
||||
"""
|
||||
Start the deployment
|
||||
Return true atm, modify calling reconcile service later
|
||||
"""
|
||||
# construct request body
|
||||
request = DevOpsReconcileRequest(
|
||||
operation=DevOpsReconcileOperationType.START,
|
||||
id=deployment.deployment_id,
|
||||
devops_proj_id=deployment.deployment_product_id,
|
||||
triggered_user_id=deployment.deployed_by,
|
||||
causes=deployment.deployment_reason,
|
||||
target_env=deployment.deployment_target_env,
|
||||
ttl_control=True,
|
||||
ttl=deployment.deployment_ttl_hours * 60 * 60,
|
||||
commit_sha256=deployment.deployment_git_sha256,
|
||||
)
|
||||
# send request to reoncile service
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{reconsile_base_url}/api/devops/reconcile",
|
||||
json=request.model_dump()
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise HTTPException(status_code=response.status_code, detail=response.text)
|
||||
return True
|
||||
|
||||
async def check_application_logs(
|
||||
self,
|
||||
request: CheckApplicationLogsRequest,
|
||||
loki_url: str = site_settings.BASE_LOKI_URL,
|
||||
) -> CheckApplicationLogsResponse:
|
||||
# Convert to nanoseconds since epoch
|
||||
start_ns = int(request.start_time.timestamp() * 1e9)
|
||||
end_ns = int(request.end_time.timestamp() * 1e9)
|
||||
|
||||
# TODO: convert product_id to application name if needed
|
||||
base_query = f'{{application="{request.product_id}", environment="{request.target_env}"}}'
|
||||
log_level = '|'.join(request.log_level) if request.log_level else ''
|
||||
loki_query = f'{base_query} |~ "{log_level}"'
|
||||
|
||||
params = {
|
||||
"query": loki_query,
|
||||
"limit": request.limit,
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
}
|
||||
|
||||
url = f"{loki_url}/loki/api/v1/query_range"
|
||||
response = requests.get(url, params=params)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise Exception(f"Query failed: {response.status_code} - {response.text}")
|
||||
|
||||
data = response.json()
|
||||
streams = data.get("data", {}).get("result", [])
|
||||
|
||||
logs = []
|
||||
for stream in streams:
|
||||
for ts, log in stream.get("values", []):
|
||||
timestamp = datetime.fromtimestamp(int(ts) / 1e9)
|
||||
logs.append(f"[{timestamp}] {log.strip()}")
|
||||
|
||||
return CheckApplicationLogsResponse(
|
||||
product_id=request.product_id,
|
||||
target_env=request.target_env,
|
||||
user_id=request.user_id,
|
||||
log_level=request.log_level,
|
||||
start_time=request.start_time,
|
||||
end_time=request.end_time,
|
||||
limit=request.limit,
|
||||
logs=logs
|
||||
)
|
||||
|
||||
# TODO: dummy test code, remove later
|
||||
async def create_dummy_code_depot(
|
||||
self,
|
||||
) -> CodeDepotDoc:
|
||||
"""
|
||||
Create a dummy code depot for testing purposes.
|
||||
"""
|
||||
depot_name = f"dummy-depot-{uuid.uuid4()}"
|
||||
code_depot = CodeDepotDoc(
|
||||
depot_name=depot_name,
|
||||
product_id="dummy-product-id",
|
||||
depot_status=DepotStatus.CREATED
|
||||
)
|
||||
|
||||
return await CodeDepotDoc.insert_one(code_depot)
|
||||
|
||||
deployment_service = DeploymentService()
|
||||
|
||||
def get_deployment_service() -> DeploymentService:
|
||||
return deployment_service
|
||||
18
apps/devops/app/scripts/mongodb/docker-compose.yml
Normal file
18
apps/devops/app/scripts/mongodb/docker-compose.yml
Normal file
@ -0,0 +1,18 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
mongodb:
|
||||
image: mongo:6.0 # You can change to the desired version
|
||||
container_name: mongodb
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "27017:27017"
|
||||
environment:
|
||||
MONGO_INITDB_DATABASE: testdb # <-- This creates the initial database
|
||||
volumes:
|
||||
- mongodb_data:/data/db
|
||||
command: ["mongod", "--noauth"] # <-- Disable authentication
|
||||
|
||||
|
||||
volumes:
|
||||
mongodb_data:
|
||||
15
apps/devops/requirements.txt
Normal file
15
apps/devops/requirements.txt
Normal file
@ -0,0 +1,15 @@
|
||||
beanie==1.29.0
|
||||
fastapi==0.115.12
|
||||
loguru==0.7.3
|
||||
motor==3.7.0
|
||||
prometheus_fastapi_instrumentator==7.1.0
|
||||
pydantic_settings==2.9.1
|
||||
pytest==7.1.2
|
||||
starlette==0.46.2
|
||||
uvicorn==0.34.2
|
||||
httpx==0.24.0
|
||||
pydantic-settings~=2.9.1
|
||||
pymongo~=4.12.1
|
||||
pydantic~=2.11.4
|
||||
requests~=2.32.3
|
||||
aio-pika==9.4.3
|
||||
0
apps/devops/tests/__init__.py
Normal file
0
apps/devops/tests/__init__.py
Normal file
0
apps/devops/tests/routes/__init__.py
Normal file
0
apps/devops/tests/routes/__init__.py
Normal file
27
apps/devops/tests/routes/test_hello_world.py
Normal file
27
apps/devops/tests/routes/test_hello_world.py
Normal file
@ -0,0 +1,27 @@
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from fastapi.testclient import TestClient
|
||||
from app.main import app
|
||||
from app.routes.hello_world.apis import get_hello_world_dao
|
||||
|
||||
|
||||
def test_hello_world():
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/api/hello_world/")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "Hello, World!"}
|
||||
|
||||
|
||||
# mock out initiate_database so it doesn’t run during tests
|
||||
@patch("app.providers.database.initiate_database", new_callable=AsyncMock)
|
||||
def test_insert_hello_world(mock_db_init):
|
||||
|
||||
class MockHelloWorldDao:
|
||||
async def create_hello_world(self, msg: str, user_id: int):
|
||||
return {"message": msg, "user_id": user_id}
|
||||
|
||||
app.dependency_overrides[get_hello_world_dao] = lambda: MockHelloWorldDao()
|
||||
with TestClient(app) as client:
|
||||
response = client.post("/api/hello_world/insert", params={"msg": "Test Message"})
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "Test Message", "user_id": 1}
|
||||
app.dependency_overrides.clear()
|
||||
@ -0,0 +1,216 @@
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock
|
||||
from datetime import datetime
|
||||
from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService
|
||||
from app.common.models.deployment.deployment import Deployment
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def status_update_service():
|
||||
return DeploymentStatusUpdateService()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_heartbeat_message():
|
||||
return {
|
||||
"event_type": "DevOpsReconcileJobHeartbeat",
|
||||
"payload": {
|
||||
"operation": "heartbeat",
|
||||
"id": "deployment-123-abc",
|
||||
"status": "running",
|
||||
"phase": "building",
|
||||
"phase_message": "Building container image",
|
||||
"error": None,
|
||||
"url": None
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_success_message():
|
||||
return {
|
||||
"event_type": "DevOpsReconcileJobHeartbeat",
|
||||
"payload": {
|
||||
"operation": "heartbeat",
|
||||
"id": "deployment-789-ghi",
|
||||
"status": "success",
|
||||
"phase": "finished",
|
||||
"phase_message": "Deployment completed successfully",
|
||||
"error": None,
|
||||
"url": "https://my-app-alpha.freeleaps.com"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_failed_message():
|
||||
return {
|
||||
"event_type": "DevOpsReconcileJobHeartbeat",
|
||||
"payload": {
|
||||
"operation": "heartbeat",
|
||||
"id": "deployment-456-def",
|
||||
"status": "failed",
|
||||
"phase": "jenkins_build",
|
||||
"phase_message": "Build failed due to compilation errors",
|
||||
"error": "Build step 'Invoke top-level Maven targets' marked build as failure",
|
||||
"url": None
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_deployment():
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
class MockDeployment:
|
||||
def __init__(self):
|
||||
self.deployment_id = "deployment-123-abc"
|
||||
self.deployment_status = "started"
|
||||
self.deployment_stage = "initialization"
|
||||
self.deployment_app_url = ""
|
||||
self.updated_at = datetime.now()
|
||||
self.save = AsyncMock()
|
||||
|
||||
return MockDeployment()
|
||||
|
||||
|
||||
class TestDeploymentStatusUpdateService:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_mapping(self, status_update_service):
|
||||
"""Test that status mapping works correctly"""
|
||||
assert status_update_service.status_mapping["running"] == "started"
|
||||
assert status_update_service.status_mapping["success"] == "succeeded"
|
||||
assert status_update_service.status_mapping["failed"] == "failed"
|
||||
assert status_update_service.status_mapping["terminated"] == "aborted"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_phase_to_stage_mapping(self, status_update_service):
|
||||
"""Test that phase to stage mapping works correctly"""
|
||||
assert status_update_service.phase_to_stage_mapping["initializing"] == "initialization"
|
||||
assert status_update_service.phase_to_stage_mapping["jenkins_build"] == "build"
|
||||
assert status_update_service.phase_to_stage_mapping["building"] == "build"
|
||||
assert status_update_service.phase_to_stage_mapping["deploying"] == "deployment"
|
||||
assert status_update_service.phase_to_stage_mapping["finished"] == "completed"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_running_heartbeat_message(self, status_update_service, sample_heartbeat_message, mock_deployment, monkeypatch):
|
||||
"""Test processing a running status heartbeat"""
|
||||
# Mock Deployment.find_one to return our mock deployment
|
||||
async def mock_find_one(query):
|
||||
_ = query # Parameter required by interface but not used in mock
|
||||
return mock_deployment
|
||||
|
||||
# Mock the logger methods to avoid actual logging during tests
|
||||
status_update_service.module_logger.log_info = AsyncMock()
|
||||
status_update_service.module_logger.log_warning = AsyncMock()
|
||||
status_update_service.module_logger.log_error = AsyncMock()
|
||||
status_update_service.module_logger.log_exception = AsyncMock()
|
||||
|
||||
# Mock the Beanie query mechanism properly
|
||||
mock_deployment_class = AsyncMock()
|
||||
mock_deployment_class.find_one = mock_find_one
|
||||
monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class)
|
||||
|
||||
await status_update_service.process_heartbeat_message(
|
||||
"test_key", sample_heartbeat_message, {}
|
||||
)
|
||||
|
||||
# Verify the deployment was updated correctly
|
||||
assert mock_deployment.deployment_status == "started"
|
||||
assert mock_deployment.deployment_stage == "build"
|
||||
mock_deployment.save.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_success_heartbeat_message(self, status_update_service, sample_success_message, mock_deployment, monkeypatch):
|
||||
"""Test processing a success status heartbeat with URL"""
|
||||
async def mock_find_one(query):
|
||||
_ = query # Parameter required by interface but not used in mock
|
||||
return mock_deployment
|
||||
|
||||
# Mock the logger methods
|
||||
status_update_service.module_logger.log_info = AsyncMock()
|
||||
status_update_service.module_logger.log_warning = AsyncMock()
|
||||
status_update_service.module_logger.log_error = AsyncMock()
|
||||
status_update_service.module_logger.log_exception = AsyncMock()
|
||||
|
||||
# Mock the Beanie query mechanism properly
|
||||
mock_deployment_class = AsyncMock()
|
||||
mock_deployment_class.find_one = mock_find_one
|
||||
monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class)
|
||||
|
||||
await status_update_service.process_heartbeat_message(
|
||||
"test_key", sample_success_message, {}
|
||||
)
|
||||
|
||||
# Verify the deployment was updated correctly
|
||||
assert mock_deployment.deployment_status == "succeeded"
|
||||
assert mock_deployment.deployment_stage == "completed"
|
||||
assert mock_deployment.deployment_app_url == "https://my-app-alpha.freeleaps.com"
|
||||
mock_deployment.save.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_failed_heartbeat_message(self, status_update_service, sample_failed_message, mock_deployment, monkeypatch):
|
||||
"""Test processing a failed status heartbeat"""
|
||||
async def mock_find_one(query):
|
||||
_ = query # Parameter required by interface but not used in mock
|
||||
return mock_deployment
|
||||
|
||||
# Mock the logger methods
|
||||
status_update_service.module_logger.log_info = AsyncMock()
|
||||
status_update_service.module_logger.log_warning = AsyncMock()
|
||||
status_update_service.module_logger.log_error = AsyncMock()
|
||||
status_update_service.module_logger.log_exception = AsyncMock()
|
||||
|
||||
# Mock the Beanie query mechanism properly
|
||||
mock_deployment_class = AsyncMock()
|
||||
mock_deployment_class.find_one = mock_find_one
|
||||
monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class)
|
||||
|
||||
await status_update_service.process_heartbeat_message(
|
||||
"test_key", sample_failed_message, {}
|
||||
)
|
||||
|
||||
# Verify the deployment was updated correctly
|
||||
assert mock_deployment.deployment_status == "failed"
|
||||
assert mock_deployment.deployment_stage == "build"
|
||||
mock_deployment.save.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deployment_not_found(self, status_update_service, sample_heartbeat_message, monkeypatch):
|
||||
"""Test handling when deployment is not found"""
|
||||
async def mock_find_one(query):
|
||||
_ = query # Parameter required by interface but not used in mock
|
||||
return None
|
||||
|
||||
# Mock the logger methods
|
||||
status_update_service.module_logger.log_info = AsyncMock()
|
||||
status_update_service.module_logger.log_warning = AsyncMock()
|
||||
status_update_service.module_logger.log_error = AsyncMock()
|
||||
status_update_service.module_logger.log_exception = AsyncMock()
|
||||
|
||||
# Mock the Beanie query mechanism properly
|
||||
mock_deployment_class = AsyncMock()
|
||||
mock_deployment_class.find_one = mock_find_one
|
||||
monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class)
|
||||
|
||||
# Should not raise an exception
|
||||
await status_update_service.process_heartbeat_message(
|
||||
"test_key", sample_heartbeat_message, {}
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_message_format(self, status_update_service):
|
||||
"""Test handling invalid message format"""
|
||||
invalid_message = {"invalid": "format"}
|
||||
|
||||
# Mock the logger methods
|
||||
status_update_service.module_logger.log_info = AsyncMock()
|
||||
status_update_service.module_logger.log_warning = AsyncMock()
|
||||
status_update_service.module_logger.log_error = AsyncMock()
|
||||
status_update_service.module_logger.log_exception = AsyncMock()
|
||||
|
||||
# Should not raise an exception due to try/catch in the method
|
||||
await status_update_service.process_heartbeat_message(
|
||||
"test_key", invalid_message, {}
|
||||
)
|
||||
8
apps/devops/tests/test_main.http
Normal file
8
apps/devops/tests/test_main.http
Normal file
@ -0,0 +1,8 @@
|
||||
# Test your FastAPI endpoints
|
||||
|
||||
GET http://localhost:8000/api/hello_world/
|
||||
Accept: application/json
|
||||
|
||||
###
|
||||
POST http://localhost:8000/api/hello_world/insert?msg=Hello%20World
|
||||
Accept: application/json
|
||||
20
apps/helloworld/README.md
Normal file
20
apps/helloworld/README.md
Normal file
@ -0,0 +1,20 @@
|
||||
This is a template backend service based on fastapi + mongodb app
|
||||
|
||||
To start development in local, go to the root directory of the project YOUR_WORKSPACE_PATH/helloworld/
|
||||
```bash
|
||||
docker compose -f app/scripts/mongodb/docker-compose.yml up -d
|
||||
```
|
||||
|
||||
Then run the app
|
||||
```bash
|
||||
uvicorn app.main:app --reload
|
||||
```
|
||||
|
||||
In case a new dependency is added, run the following command to update the requirements.txt file
|
||||
```bash
|
||||
# optional: if you have not installed pipreqs
|
||||
pip3 install pipreqs
|
||||
|
||||
# generate requirements.txt
|
||||
pipreqs . --force
|
||||
```
|
||||
0
apps/helloworld/app/__init__.py
Normal file
0
apps/helloworld/app/__init__.py
Normal file
0
apps/helloworld/app/bootstrap/__init__.py
Normal file
0
apps/helloworld/app/bootstrap/__init__.py
Normal file
82
apps/helloworld/app/bootstrap/application.py
Normal file
82
apps/helloworld/app/bootstrap/application.py
Normal file
@ -0,0 +1,82 @@
|
||||
import logging
|
||||
from fastapi import FastAPI
|
||||
from fastapi.openapi.utils import get_openapi
|
||||
|
||||
from app.providers import common
|
||||
from app.providers.logger import register_logger
|
||||
from app.providers import router
|
||||
from app.providers import database
|
||||
from app.providers import metrics
|
||||
from app.providers import probes
|
||||
from app.providers import exception_handler
|
||||
from app.common.config.app_settings import app_settings
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
logging.info("App initializing")
|
||||
|
||||
app = FreeleapsApp()
|
||||
|
||||
register_logger()
|
||||
register(app, exception_handler)
|
||||
register(app, database)
|
||||
register(app, router)
|
||||
# register(app, scheduler)
|
||||
register(app, common)
|
||||
|
||||
# Call the custom_openapi function to change the OpenAPI version
|
||||
customize_openapi_security(app)
|
||||
# Register probe APIs if enabled
|
||||
if app_settings.PROBES_ENABLED:
|
||||
register(app, probes)
|
||||
|
||||
# Register metrics APIs if enabled
|
||||
if app_settings.METRICS_ENABLED:
|
||||
register(app, metrics)
|
||||
return app
|
||||
|
||||
|
||||
# This function overrides the OpenAPI schema version to 3.0.0
|
||||
def customize_openapi_security(app: FastAPI) -> None:
|
||||
|
||||
def custom_openapi():
|
||||
if app.openapi_schema:
|
||||
return app.openapi_schema
|
||||
|
||||
# Generate OpenAPI schema
|
||||
openapi_schema = get_openapi(
|
||||
title="FreeLeaps API",
|
||||
version="3.1.0",
|
||||
description="FreeLeaps API Documentation",
|
||||
routes=app.routes,
|
||||
)
|
||||
|
||||
# Ensure the components section exists in the OpenAPI schema
|
||||
if "components" not in openapi_schema:
|
||||
openapi_schema["components"] = {}
|
||||
|
||||
# Add security scheme to components
|
||||
openapi_schema["components"]["securitySchemes"] = {
|
||||
"bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
|
||||
}
|
||||
|
||||
# Add security requirement globally
|
||||
openapi_schema["security"] = [{"bearerAuth": []}]
|
||||
|
||||
app.openapi_schema = openapi_schema
|
||||
return app.openapi_schema
|
||||
|
||||
app.openapi = custom_openapi
|
||||
|
||||
|
||||
def register(app, provider):
|
||||
logging.info(provider.__name__ + " registering")
|
||||
provider.register(app)
|
||||
|
||||
|
||||
def boot(app, provider):
|
||||
logging.info(provider.__name__ + " booting")
|
||||
provider.boot(app)
|
||||
|
||||
class FreeleapsApp(FastAPI):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
0
apps/helloworld/app/common/__init__.py
Normal file
0
apps/helloworld/app/common/__init__.py
Normal file
0
apps/helloworld/app/common/config/__init__.py
Normal file
0
apps/helloworld/app/common/config/__init__.py
Normal file
29
apps/helloworld/app/common/config/app_settings.py
Normal file
29
apps/helloworld/app/common/config/app_settings.py
Normal file
@ -0,0 +1,29 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
# NOTE: The values fall backs to your environment variables when not set here
|
||||
class AppSettings(BaseSettings):
|
||||
NAME: str = "YOUR_APP_NAME"
|
||||
APP_NAME: str = NAME
|
||||
APP_ENV: str = "alpha"
|
||||
|
||||
JWT_SECRET_KEY: str = ""
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600
|
||||
REFRESH_TOKEN_EXPIRE_DAYS: int = 1
|
||||
|
||||
METRICS_ENABLED: bool = False
|
||||
PROBES_ENABLED: bool = True
|
||||
|
||||
APP_MONGODB_URI: str = "mongodb://localhost:27017"
|
||||
APP_MONGODB_NAME: str = "testdb"
|
||||
|
||||
LOG_BASE_PATH: str = "./log"
|
||||
BACKEND_LOG_FILE_NAME: str = APP_NAME
|
||||
APPLICATION_ACTIVITY_LOG: str = APP_NAME + "-application-activity"
|
||||
|
||||
|
||||
class Config:
|
||||
env_file = ".myapp.env"
|
||||
env_file_encoding = "utf-8"
|
||||
|
||||
|
||||
app_settings = AppSettings()
|
||||
16
apps/helloworld/app/common/config/log_settings.py
Normal file
16
apps/helloworld/app/common/config/log_settings.py
Normal file
@ -0,0 +1,16 @@
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from .app_settings import app_settings
|
||||
|
||||
@dataclass
|
||||
class LogSettings:
|
||||
LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
|
||||
LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
|
||||
LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
|
||||
MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
|
||||
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
|
||||
APP_NAME: str = app_settings.APP_NAME
|
||||
ENVIRONMENT: str = app_settings.APP_ENV
|
||||
|
||||
|
||||
log_settings = LogSettings()
|
||||
27
apps/helloworld/app/common/config/site_settings.py
Normal file
27
apps/helloworld/app/common/config/site_settings.py
Normal file
@ -0,0 +1,27 @@
|
||||
import os
|
||||
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
# NOTE: The values fall backs to your environment variables when not set here
|
||||
class SiteSettings(BaseSettings):
|
||||
NAME: str = "appname"
|
||||
DEBUG: bool = True
|
||||
|
||||
ENV: str = "dev"
|
||||
|
||||
SERVER_HOST: str = "localhost"
|
||||
SERVER_PORT: int = 8000
|
||||
|
||||
URL: str = "http://localhost"
|
||||
TIME_ZONE: str = "UTC"
|
||||
|
||||
BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__))))
|
||||
|
||||
class Config:
|
||||
env_file = ".devbase-webapi.env"
|
||||
env_file_encoding = "utf-8"
|
||||
|
||||
|
||||
site_settings = SiteSettings()
|
||||
|
||||
0
apps/helloworld/app/common/daos/__init__.py
Normal file
0
apps/helloworld/app/common/daos/__init__.py
Normal file
6
apps/helloworld/app/common/daos/hello_world/__init__.py
Normal file
6
apps/helloworld/app/common/daos/hello_world/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
from app.common.daos.hello_world.hello_world_dao import HelloWorldDao
|
||||
|
||||
hello_world_dao = HelloWorldDao()
|
||||
|
||||
def get_hello_world_dao() -> HelloWorldDao:
|
||||
return hello_world_dao
|
||||
@ -0,0 +1,30 @@
|
||||
from app.common.models.hello_world.hello_world import HelloWorld
|
||||
|
||||
class HelloWorldDao:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def create_hello_world(self, message: str, count: int):
|
||||
hello_world = HelloWorld(message=message, count=count)
|
||||
await hello_world.insert()
|
||||
return hello_world
|
||||
|
||||
async def get_hello_world(self, id: str):
|
||||
hello_world = await HelloWorld.get(id)
|
||||
return hello_world
|
||||
|
||||
async def update_hello_world(self, id: str, message: str, count: int):
|
||||
hello_world = await HelloWorld.get(id)
|
||||
if hello_world:
|
||||
hello_world.message = message
|
||||
hello_world.count = count
|
||||
await hello_world.save()
|
||||
return hello_world
|
||||
return None
|
||||
|
||||
async def delete_hello_world(self, id: str):
|
||||
hello_world = await HelloWorld.get(id)
|
||||
if hello_world:
|
||||
await hello_world.delete()
|
||||
return True
|
||||
return False
|
||||
0
apps/helloworld/app/common/log/__init__.py
Normal file
0
apps/helloworld/app/common/log/__init__.py
Normal file
12
apps/helloworld/app/common/log/application_logger.py
Normal file
12
apps/helloworld/app/common/log/application_logger.py
Normal file
@ -0,0 +1,12 @@
|
||||
from .base_logger import LoggerBase
|
||||
from app.common.config.app_settings import app_settings
|
||||
|
||||
class ApplicationLogger(LoggerBase):
|
||||
def __init__(self, application_activities: dict[str, any] = {}) -> None:
|
||||
extra_fileds = {}
|
||||
if application_activities:
|
||||
extra_fileds.update(application_activities)
|
||||
super().__init__(
|
||||
logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
|
||||
extra_fileds=extra_fileds,
|
||||
)
|
||||
136
apps/helloworld/app/common/log/base_logger.py
Normal file
136
apps/helloworld/app/common/log/base_logger.py
Normal file
@ -0,0 +1,136 @@
|
||||
from loguru import logger as guru_logger
|
||||
from app.common.config.log_settings import log_settings
|
||||
from typing import Dict, Any, Optional
|
||||
import socket
|
||||
import json
|
||||
import threading
|
||||
import os
|
||||
import sys
|
||||
import inspect
|
||||
import logging
|
||||
|
||||
from app.common.log.json_sink import JsonSink
|
||||
|
||||
class LoggerBase:
|
||||
binded_loggers = {}
|
||||
logger_lock = threading.Lock()
|
||||
|
||||
def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
|
||||
self.__logger_name = logger_name
|
||||
self.extra_fileds = extra_fileds
|
||||
with LoggerBase.logger_lock:
|
||||
if self.__logger_name in LoggerBase.binded_loggers:
|
||||
self.logger = LoggerBase.binded_loggers[self.__logger_name]
|
||||
return
|
||||
|
||||
log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
|
||||
log_level = "INFO"
|
||||
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
|
||||
|
||||
guru_logger.remove()
|
||||
|
||||
file_sink = JsonSink(
|
||||
log_file_path=log_filename,
|
||||
rotation_size_bytes=rotation_bytes,
|
||||
max_backup_files=log_settings.MAX_BACKUP_FILES
|
||||
)
|
||||
guru_logger.add(
|
||||
sink=file_sink,
|
||||
level=log_level,
|
||||
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
|
||||
)
|
||||
|
||||
guru_logger.add(
|
||||
sink=sys.stderr,
|
||||
level=log_level,
|
||||
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
|
||||
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
|
||||
)
|
||||
|
||||
host_name = socket.gethostname()
|
||||
host_ip = socket.gethostbyname(host_name)
|
||||
self.logger = guru_logger.bind(
|
||||
topic=self.__logger_name,
|
||||
host_ip=host_ip,
|
||||
host_name=host_name,
|
||||
app=log_settings.APP_NAME,
|
||||
env=log_settings.ENVIRONMENT,
|
||||
)
|
||||
with LoggerBase.logger_lock:
|
||||
LoggerBase.binded_loggers[self.__logger_name] = self.logger
|
||||
|
||||
def _get_log_context(self) -> dict:
|
||||
frame = inspect.currentframe().f_back.f_back
|
||||
filename = os.path.basename(frame.f_code.co_filename)
|
||||
lineno = frame.f_lineno
|
||||
return {"log_file": filename, "log_line": lineno}
|
||||
|
||||
def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
props = {} if properties is None else properties.copy()
|
||||
props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
|
||||
return props, props_str
|
||||
|
||||
async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
|
||||
local_logger.info(text)
|
||||
|
||||
async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
|
||||
local_logger.exception(text)
|
||||
|
||||
async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
|
||||
local_logger.info(text)
|
||||
|
||||
async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
|
||||
local_logger.warning(text)
|
||||
|
||||
async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
|
||||
props, props_str = self._prepare_properties(properties)
|
||||
context = self._get_log_context()
|
||||
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
|
||||
local_logger.error(text)
|
||||
|
||||
@staticmethod
|
||||
def configure_uvicorn_logging():
|
||||
print("📢 Setting up uvicorn logging interception...")
|
||||
|
||||
# Intercept logs from these loggers
|
||||
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
|
||||
|
||||
class InterceptHandler(logging.Handler):
|
||||
def emit(self, record):
|
||||
level = (
|
||||
guru_logger.level(record.levelname).name
|
||||
if guru_logger.level(record.levelname, None)
|
||||
else record.levelno
|
||||
)
|
||||
frame, depth = logging.currentframe(), 2
|
||||
while frame.f_code.co_filename == logging.__file__:
|
||||
frame = frame.f_back
|
||||
depth += 1
|
||||
|
||||
guru_logger.opt(depth=depth, exception=record.exc_info).log(
|
||||
level,
|
||||
f"[{record.name}] {record.getMessage()}",
|
||||
)
|
||||
|
||||
# Replace default handlers
|
||||
logging.root.handlers.clear()
|
||||
logging.root.setLevel(logging.INFO)
|
||||
logging.root.handlers = [InterceptHandler()]
|
||||
|
||||
# Configure specific uvicorn loggers
|
||||
for logger_name in intercept_loggers:
|
||||
logging_logger = logging.getLogger(logger_name)
|
||||
logging_logger.handlers.clear() # Remove default handlers
|
||||
logging_logger.propagate = True # Ensure propagation through Loguru
|
||||
85
apps/helloworld/app/common/log/json_sink.py
Normal file
85
apps/helloworld/app/common/log/json_sink.py
Normal file
@ -0,0 +1,85 @@
|
||||
import json
|
||||
import datetime
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
class JsonSink:
|
||||
def __init__(
|
||||
self,
|
||||
log_file_path: str,
|
||||
rotation_size_bytes: int = 10 * 1024 * 1024,
|
||||
max_backup_files: int = 5,
|
||||
):
|
||||
self.log_file_path = Path(log_file_path)
|
||||
self.rotation_size = rotation_size_bytes
|
||||
self.max_backup_files = max_backup_files
|
||||
self._open_log_file()
|
||||
|
||||
def _open_log_file(self):
|
||||
# ensure the parent directory exists
|
||||
parent_dir = self.log_file_path.parent
|
||||
if not parent_dir.exists():
|
||||
parent_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.log_file = self.log_file_path.open("a", encoding="utf-8")
|
||||
|
||||
def _should_rotate(self) -> bool:
|
||||
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
|
||||
|
||||
def _rotate(self):
|
||||
self.log_file.close()
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
|
||||
self.log_file_path.rename(rotated_path)
|
||||
self._cleanup_old_backups()
|
||||
self._open_log_file()
|
||||
|
||||
def _cleanup_old_backups(self):
|
||||
parent = self.log_file_path.parent
|
||||
stem = self.log_file_path.stem
|
||||
suffix = self.log_file_path.suffix
|
||||
|
||||
backup_files = sorted(
|
||||
parent.glob(f"{stem}_*{suffix}"),
|
||||
key=lambda p: p.stat().st_mtime,
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
for old_file in backup_files[self.max_backup_files:]:
|
||||
try:
|
||||
old_file.unlink()
|
||||
except Exception as e:
|
||||
print(f"Failed to delete old backup {old_file}: {e}")
|
||||
|
||||
def __call__(self, message):
|
||||
record = message.record
|
||||
if self._should_rotate():
|
||||
self._rotate()
|
||||
|
||||
log_entry = {
|
||||
"level": record["level"].name.lower(),
|
||||
"timestamp": int(record["time"].timestamp() * 1000),
|
||||
"text": record["message"],
|
||||
"fields": record["extra"].get("properties", {}),
|
||||
"context": {
|
||||
"app": record["extra"].get("app"),
|
||||
"env": record["extra"].get("env"),
|
||||
"log_file": record["extra"].get("log_file"),
|
||||
"log_line": record["extra"].get("log_line"),
|
||||
"topic": record["extra"].get("topic"),
|
||||
"sender_id": record["extra"].get("sender_id"),
|
||||
"receiver_id": record["extra"].get("receiver_id"),
|
||||
"subject": record["extra"].get("subject"),
|
||||
"event": record["extra"].get("event"),
|
||||
"host_ip": record["extra"].get("host_ip"),
|
||||
"host_name": record["extra"].get("host_name"),
|
||||
},
|
||||
"stacktrace": None
|
||||
}
|
||||
|
||||
if record["exception"]:
|
||||
exc_type, exc_value, exc_tb = record["exception"]
|
||||
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
|
||||
|
||||
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
|
||||
self.log_file.flush()
|
||||
4
apps/helloworld/app/common/models/__init__.py
Normal file
4
apps/helloworld/app/common/models/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
from app.common.models.hello_world.hello_world import HelloWorld
|
||||
|
||||
# list of beanie document models
|
||||
db_models = [HelloWorld]
|
||||
17
apps/helloworld/app/common/models/hello_world/hello_world.py
Normal file
17
apps/helloworld/app/common/models/hello_world/hello_world.py
Normal file
@ -0,0 +1,17 @@
|
||||
from datetime import datetime
|
||||
|
||||
from beanie import Document
|
||||
|
||||
|
||||
class HelloWorld(Document):
|
||||
message: str
|
||||
count: int = 0
|
||||
created_time: datetime = datetime.now()
|
||||
|
||||
class Settings:
|
||||
name = "hello_world"
|
||||
indexes = [
|
||||
[("message", 1), ("count", 1)]
|
||||
]
|
||||
|
||||
|
||||
140
apps/helloworld/app/common/probes/__init__.py
Normal file
140
apps/helloworld/app/common/probes/__init__.py
Normal file
@ -0,0 +1,140 @@
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import Optional, Callable, Tuple, Dict
|
||||
import inspect
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# ProbeType is an Enum that defines the types of probes that can be registered.
|
||||
class ProbeType(Enum):
|
||||
LIVENESS = "liveness"
|
||||
READINESS = "readiness"
|
||||
STARTUP = "startup"
|
||||
|
||||
# ProbeResult is a class that represents the result of a probe check.
|
||||
class ProbeResult:
|
||||
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
|
||||
self.success = success
|
||||
self.message = message
|
||||
self.data = data or {}
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"success": self.success,
|
||||
"message": self.message,
|
||||
"data": self.data
|
||||
}
|
||||
|
||||
# Probe is a class that represents a probe that can be registered.
|
||||
class Probe:
|
||||
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
|
||||
self.type = type
|
||||
self.path = path
|
||||
self.check_fn = check_fn
|
||||
self.name = name or f"{type.value}-{id(self)}"
|
||||
|
||||
async def execute(self) -> ProbeResult:
|
||||
try:
|
||||
result = self.check_fn()
|
||||
if inspect.isawaitable(result):
|
||||
result = await result
|
||||
|
||||
if isinstance(result, ProbeResult):
|
||||
return result
|
||||
elif isinstance(result, bool):
|
||||
return ProbeResult(result, "ok" if result else "failed")
|
||||
else:
|
||||
return ProbeResult(True, "ok")
|
||||
except Exception as e:
|
||||
return ProbeResult(False, str(e))
|
||||
|
||||
# ProbeGroup is a class that represents a group of probes that can be checked together.
|
||||
class ProbeGroup:
|
||||
def __init__(self, path: str):
|
||||
self.path = path
|
||||
self.probes: Dict[str, Probe] = {}
|
||||
|
||||
def add_probe(self, probe: Probe):
|
||||
self.probes[probe.name] = probe
|
||||
|
||||
async def check_all(self) -> Tuple[bool, dict]:
|
||||
results = {}
|
||||
all_success = True
|
||||
|
||||
for name, probe in self.probes.items():
|
||||
result = await probe.execute()
|
||||
results[name] = result.to_dict()
|
||||
if not result.success:
|
||||
all_success = False
|
||||
|
||||
return all_success, results
|
||||
|
||||
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
|
||||
class FrameworkAdapter:
|
||||
async def handle_request(self, group: ProbeGroup):
|
||||
all_success, results = await group.check_all()
|
||||
status_code = 200 if all_success else 503
|
||||
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
|
||||
|
||||
def register_route(self, path: str, handler: Callable):
|
||||
raise NotImplementedError
|
||||
|
||||
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
|
||||
class ProbeManager:
|
||||
_default_paths = {
|
||||
ProbeType.LIVENESS: "/_/livez",
|
||||
ProbeType.READINESS: "/_/readyz",
|
||||
ProbeType.STARTUP: "/_/healthz"
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.groups: Dict[str, ProbeGroup] = {}
|
||||
self.adapters: Dict[str, FrameworkAdapter] = {}
|
||||
self._startup_complete = False
|
||||
|
||||
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
|
||||
self.adapters[framework] = adapter
|
||||
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
|
||||
|
||||
def register(
|
||||
self,
|
||||
type: ProbeType,
|
||||
check_func: Optional[Callable] = None,
|
||||
path: Optional[str] = None,
|
||||
prefix: str = "",
|
||||
name: Optional[str] = None,
|
||||
frameworks: Optional[list] = None
|
||||
):
|
||||
path = path or self._default_paths.get(type, "/_/healthz")
|
||||
if prefix:
|
||||
path = f"{prefix}{path}"
|
||||
|
||||
if type == ProbeType.STARTUP and check_func is None:
|
||||
check_func = self._default_startup_check
|
||||
|
||||
probe = Probe(type, path, check_func or (lambda: True), name)
|
||||
|
||||
if path not in self.groups:
|
||||
self.groups[path] = ProbeGroup(path)
|
||||
self.groups[path].add_probe(probe)
|
||||
|
||||
for framework in (frameworks or ["default"]):
|
||||
self._register_route(framework, path)
|
||||
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
|
||||
|
||||
def _register_route(self, framework: str, path: str):
|
||||
if framework not in self.adapters:
|
||||
return
|
||||
|
||||
adapter = self.adapters[framework]
|
||||
group = self.groups[path]
|
||||
|
||||
async def handler():
|
||||
return await adapter.handle_request(group)
|
||||
|
||||
adapter.register_route(path, handler)
|
||||
|
||||
def _default_startup_check(self) -> bool:
|
||||
return self._startup_complete
|
||||
|
||||
def mark_startup_complete(self):
|
||||
self._startup_complete = True
|
||||
15
apps/helloworld/app/common/probes/adapters.py
Normal file
15
apps/helloworld/app/common/probes/adapters.py
Normal file
@ -0,0 +1,15 @@
|
||||
from . import FrameworkAdapter
|
||||
from fastapi.responses import JSONResponse
|
||||
from typing import Callable
|
||||
|
||||
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
|
||||
class FastAPIAdapter(FrameworkAdapter):
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
|
||||
def register_route(self,path: str, handler: Callable):
|
||||
async def wrapper():
|
||||
data, status_code = await handler()
|
||||
return JSONResponse(content=data, status_code=status_code)
|
||||
|
||||
self.app.add_api_route(path, wrapper, methods=["GET"])
|
||||
0
apps/helloworld/app/envs/alpha.yml
Normal file
0
apps/helloworld/app/envs/alpha.yml
Normal file
0
apps/helloworld/app/envs/prod.yml
Normal file
0
apps/helloworld/app/envs/prod.yml
Normal file
16
apps/helloworld/app/main.py
Normal file
16
apps/helloworld/app/main.py
Normal file
@ -0,0 +1,16 @@
|
||||
from fastapi.responses import RedirectResponse
|
||||
from app.common.config.site_settings import site_settings
|
||||
from app.bootstrap.application import create_app
|
||||
|
||||
app = create_app()
|
||||
|
||||
@app.get("/", status_code=301)
|
||||
async def root():
|
||||
"""
|
||||
TODO: redirect client to /doc#
|
||||
"""
|
||||
return RedirectResponse("docs")
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run("main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT, reload=True)
|
||||
0
apps/helloworld/app/providers/__init__.py
Normal file
0
apps/helloworld/app/providers/__init__.py
Normal file
31
apps/helloworld/app/providers/common.py
Normal file
31
apps/helloworld/app/providers/common.py
Normal file
@ -0,0 +1,31 @@
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from app.common.config.site_settings import site_settings
|
||||
|
||||
|
||||
def register(app):
|
||||
app.debug = site_settings.DEBUG
|
||||
app.title = site_settings.NAME
|
||||
|
||||
add_global_middleware(app)
|
||||
|
||||
# This hook ensures that a connection is opened to handle any queries
|
||||
# generated by the request.
|
||||
@app.on_event("startup")
|
||||
def startup():
|
||||
pass
|
||||
|
||||
# This hook ensures that the connection is closed when we've finished
|
||||
# processing the request.
|
||||
@app.on_event("shutdown")
|
||||
def shutdown():
|
||||
pass
|
||||
|
||||
|
||||
def add_global_middleware(app):
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
34
apps/helloworld/app/providers/database.py
Normal file
34
apps/helloworld/app/providers/database.py
Normal file
@ -0,0 +1,34 @@
|
||||
import asyncio
|
||||
from app.common.config.app_settings import app_settings
|
||||
from beanie import init_beanie
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
from app.common.models import db_models
|
||||
from app.common.probes import ProbeResult
|
||||
|
||||
client = AsyncIOMotorClient(
|
||||
app_settings.APP_MONGODB_URI,
|
||||
serverSelectionTimeoutMS=60000,
|
||||
minPoolSize=5, # Minimum number of connections in the pool
|
||||
maxPoolSize=20, # Maximum number of connections in the pool
|
||||
)
|
||||
|
||||
def register(app):
|
||||
app.debug = "auth_mongo_debug"
|
||||
app.title = "auth_mongo_name"
|
||||
|
||||
@app.on_event("startup")
|
||||
async def start_database():
|
||||
await initiate_database()
|
||||
|
||||
async def check_database_initialized() -> ProbeResult:
|
||||
try:
|
||||
await asyncio.wait_for(client.server_info(), timeout=5)
|
||||
return ProbeResult(success=True, message="service has been initialized and ready to serve")
|
||||
except Exception:
|
||||
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
|
||||
|
||||
|
||||
async def initiate_database():
|
||||
await init_beanie(
|
||||
database=client[app_settings.APP_MONGODB_NAME], document_models=db_models
|
||||
)
|
||||
39
apps/helloworld/app/providers/exception_handler.py
Normal file
39
apps/helloworld/app/providers/exception_handler.py
Normal file
@ -0,0 +1,39 @@
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import JSONResponse
|
||||
from starlette.status import (
|
||||
HTTP_400_BAD_REQUEST,
|
||||
HTTP_401_UNAUTHORIZED,
|
||||
HTTP_403_FORBIDDEN,
|
||||
HTTP_404_NOT_FOUND,
|
||||
HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
|
||||
|
||||
async def custom_http_exception_handler(request: Request, exc: HTTPException):
|
||||
return JSONResponse(
|
||||
status_code=exc.status_code,
|
||||
content={"error": exc.detail},
|
||||
)
|
||||
|
||||
|
||||
|
||||
async def validation_exception_handler(request: Request, exc: RequestValidationError):
|
||||
return JSONResponse(
|
||||
status_code=HTTP_400_BAD_REQUEST,
|
||||
content={"error": str(exc)},
|
||||
)
|
||||
|
||||
async def exception_handler(request: Request, exc: Exception):
|
||||
return JSONResponse(
|
||||
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content={"error": str(exc)},
|
||||
)
|
||||
|
||||
|
||||
def register(app: FastAPI):
|
||||
app.add_exception_handler(HTTPException, custom_http_exception_handler)
|
||||
app.add_exception_handler(RequestValidationError, validation_exception_handler)
|
||||
app.add_exception_handler(Exception, exception_handler)
|
||||
7
apps/helloworld/app/providers/logger.py
Normal file
7
apps/helloworld/app/providers/logger.py
Normal file
@ -0,0 +1,7 @@
|
||||
from app.common.log.base_logger import LoggerBase
|
||||
|
||||
|
||||
def register_logger():
|
||||
print("📢 Setting up logging interception...")
|
||||
LoggerBase.configure_uvicorn_logging()
|
||||
print("✅ Logging interception complete. Logs are formatted and deduplicated!")
|
||||
16
apps/helloworld/app/providers/metrics.py
Normal file
16
apps/helloworld/app/providers/metrics.py
Normal file
@ -0,0 +1,16 @@
|
||||
import logging
|
||||
from prometheus_fastapi_instrumentator import Instrumentator
|
||||
from app.common.config.app_settings import app_settings
|
||||
|
||||
def register(app):
|
||||
instrumentator = (
|
||||
Instrumentator().instrument(
|
||||
app,
|
||||
metric_namespace="freeleaps",
|
||||
metric_subsystem=app_settings.APP_NAME)
|
||||
)
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
|
||||
logging.info("Metrics endpoint exposed at /api/_/metrics")
|
||||
25
apps/helloworld/app/providers/probes.py
Normal file
25
apps/helloworld/app/providers/probes.py
Normal file
@ -0,0 +1,25 @@
|
||||
from app.common.probes import ProbeManager, ProbeType
|
||||
from app.common.probes.adapters import FastAPIAdapter
|
||||
from .database import check_database_initialized
|
||||
|
||||
def register(app):
|
||||
probes_manager = ProbeManager()
|
||||
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
|
||||
|
||||
async def readiness_checker():
|
||||
return await check_database_initialized()
|
||||
|
||||
probes_manager.register(
|
||||
name="readiness",
|
||||
prefix="/api",
|
||||
type=ProbeType.READINESS,
|
||||
check_func=readiness_checker,
|
||||
frameworks=["fastapi"]
|
||||
)
|
||||
|
||||
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
|
||||
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
|
||||
|
||||
@app.on_event("startup")
|
||||
async def mark_startup_complete():
|
||||
probes_manager.mark_startup_complete()
|
||||
34
apps/helloworld/app/providers/router.py
Normal file
34
apps/helloworld/app/providers/router.py
Normal file
@ -0,0 +1,34 @@
|
||||
from app.routes import api_router
|
||||
|
||||
from starlette import routing
|
||||
|
||||
|
||||
def register(app):
|
||||
app.include_router(
|
||||
api_router,
|
||||
prefix="/api",
|
||||
tags=["api"],
|
||||
dependencies=[],
|
||||
responses={404: {"description": "no page found"}},
|
||||
)
|
||||
|
||||
if app.debug:
|
||||
for route in app.routes:
|
||||
if not isinstance(route, routing.WebSocketRoute):
|
||||
print(
|
||||
{
|
||||
"path": route.path,
|
||||
"endpoint": route.endpoint,
|
||||
"name": route.name,
|
||||
"methods": route.methods,
|
||||
}
|
||||
)
|
||||
else:
|
||||
print(
|
||||
{
|
||||
"path": route.path,
|
||||
"endpoint": route.endpoint,
|
||||
"name": route.name,
|
||||
"type": "web socket route",
|
||||
}
|
||||
)
|
||||
8
apps/helloworld/app/providers/scheduler.py
Normal file
8
apps/helloworld/app/providers/scheduler.py
Normal file
@ -0,0 +1,8 @@
|
||||
import asyncio
|
||||
|
||||
|
||||
def register(app):
|
||||
@app.on_event("startup")
|
||||
async def start_scheduler():
|
||||
#create your scheduler here
|
||||
pass
|
||||
8
apps/helloworld/app/routes/__init__.py
Normal file
8
apps/helloworld/app/routes/__init__.py
Normal file
@ -0,0 +1,8 @@
|
||||
from fastapi import APIRouter
|
||||
from app.routes.hello_world import router as hello_world_router
|
||||
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
# TODO: add custom routers here
|
||||
api_router.include_router(hello_world_router, tags=["hello_world"])
|
||||
7
apps/helloworld/app/routes/hello_world/__init__.py
Normal file
7
apps/helloworld/app/routes/hello_world/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
from fastapi import APIRouter
|
||||
from .apis import router as hello_world_api
|
||||
|
||||
|
||||
router = APIRouter(prefix="/hello_world")
|
||||
|
||||
router.include_router(hello_world_api, tags=["hello_world"])
|
||||
25
apps/helloworld/app/routes/hello_world/apis.py
Normal file
25
apps/helloworld/app/routes/hello_world/apis.py
Normal file
@ -0,0 +1,25 @@
|
||||
from fastapi import APIRouter, Depends
|
||||
from loguru import logger
|
||||
|
||||
from app.common.daos.hello_world import get_hello_world_dao, HelloWorldDao
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/")
|
||||
async def hello_world():
|
||||
logger.info("Hello, World! endpoint was called")
|
||||
return {"message": "Hello, World!"}
|
||||
|
||||
|
||||
@router.post("/insert")
|
||||
async def insert_hello_world(msg: str, dao: HelloWorldDao = Depends(get_hello_world_dao)):
|
||||
"""
|
||||
Insert a HelloWorld document into the database.
|
||||
"""
|
||||
hello_world = await dao.create_hello_world(msg, 1)
|
||||
return hello_world
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
18
apps/helloworld/app/scripts/mongodb/docker-compose.yml
Normal file
18
apps/helloworld/app/scripts/mongodb/docker-compose.yml
Normal file
@ -0,0 +1,18 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
mongodb:
|
||||
image: mongo:6.0 # You can change to the desired version
|
||||
container_name: mongodb
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "27017:27017"
|
||||
environment:
|
||||
MONGO_INITDB_DATABASE: testdb # <-- This creates the initial database
|
||||
volumes:
|
||||
- mongodb_data:/data/db
|
||||
command: ["mongod", "--noauth"] # <-- Disable authentication
|
||||
|
||||
|
||||
volumes:
|
||||
mongodb_data:
|
||||
10
apps/helloworld/requirements.txt
Normal file
10
apps/helloworld/requirements.txt
Normal file
@ -0,0 +1,10 @@
|
||||
beanie==1.29.0
|
||||
fastapi==0.115.12
|
||||
loguru==0.7.3
|
||||
motor==3.7.0
|
||||
prometheus_fastapi_instrumentator==7.1.0
|
||||
pydantic_settings==2.9.1
|
||||
pytest==7.1.2
|
||||
starlette==0.46.2
|
||||
uvicorn==0.34.2
|
||||
httpx==0.24.0
|
||||
0
apps/helloworld/tests/__init__.py
Normal file
0
apps/helloworld/tests/__init__.py
Normal file
0
apps/helloworld/tests/routes/__init__.py
Normal file
0
apps/helloworld/tests/routes/__init__.py
Normal file
27
apps/helloworld/tests/routes/test_hello_world.py
Normal file
27
apps/helloworld/tests/routes/test_hello_world.py
Normal file
@ -0,0 +1,27 @@
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from fastapi.testclient import TestClient
|
||||
from app.main import app
|
||||
from app.routes.hello_world.apis import get_hello_world_dao
|
||||
|
||||
|
||||
def test_hello_world():
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/api/hello_world/")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "Hello, World!"}
|
||||
|
||||
|
||||
# mock out initiate_database so it doesn’t run during tests
|
||||
@patch("app.providers.database.initiate_database", new_callable=AsyncMock)
|
||||
def test_insert_hello_world(mock_db_init):
|
||||
|
||||
class MockHelloWorldDao:
|
||||
async def create_hello_world(self, msg: str, user_id: int):
|
||||
return {"message": msg, "user_id": user_id}
|
||||
|
||||
app.dependency_overrides[get_hello_world_dao] = lambda: MockHelloWorldDao()
|
||||
with TestClient(app) as client:
|
||||
response = client.post("/api/hello_world/insert", params={"msg": "Test Message"})
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "Test Message", "user_id": 1}
|
||||
app.dependency_overrides.clear()
|
||||
8
apps/helloworld/tests/test_main.http
Normal file
8
apps/helloworld/tests/test_main.http
Normal file
@ -0,0 +1,8 @@
|
||||
# Test your FastAPI endpoints
|
||||
|
||||
GET http://localhost:8000/api/hello_world/
|
||||
Accept: application/json
|
||||
|
||||
###
|
||||
POST http://localhost:8000/api/hello_world/insert?msg=Hello%20World
|
||||
Accept: application/json
|
||||
@ -9,9 +9,6 @@ class PaymentHub:
|
||||
self.stripe_manager = StripeManager()
|
||||
return
|
||||
|
||||
async def fetch_wechat_qr_code(self, project_id: str) -> Optional[Dict[str, any]]:
|
||||
return await self.payment_manager.fetch_wechat_qr_code(project_id)
|
||||
|
||||
async def fetch_stripe_account_id(self, user_id: str) -> Optional[str]:
|
||||
return await self.payment_manager.fetch_stripe_account_id(user_id)
|
||||
|
||||
|
||||
@ -9,18 +9,6 @@ class PaymentManager:
|
||||
def __init__(self) -> None:
|
||||
self.module_logger = ModuleLogger(sender_id=PaymentManager)
|
||||
|
||||
async def fetch_wechat_qr_code(self, project_id: str) -> Optional[Dict[str, any]]:
|
||||
project = await ProjectDoc.get(project_id)
|
||||
proposer = project.proposer_id
|
||||
income_profile = await IncomeProfileDoc.find_one(
|
||||
IncomeProfileDoc.user_id == proposer
|
||||
)
|
||||
if income_profile:
|
||||
return income_profile.bank_account.money_collecting_methods[
|
||||
0
|
||||
].wechat_qr_code
|
||||
return None
|
||||
|
||||
async def fetch_stripe_account_id(self, user_id: str) -> Optional[str]:
|
||||
income_profile = await IncomeProfileDoc.find_one(IncomeProfileDoc.user_id == user_id)
|
||||
if income_profile:
|
||||
@ -44,7 +32,7 @@ class PaymentManager:
|
||||
}
|
||||
}}
|
||||
)
|
||||
|
||||
|
||||
if not payment_profile:
|
||||
await self.module_logger.log_warning(
|
||||
warning="No payment profile found for Stripe account",
|
||||
@ -54,7 +42,7 @@ class PaymentManager:
|
||||
}
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
# Update the stripe method status
|
||||
updated = False
|
||||
# Need to check if money_collecting_methods exists and is not empty
|
||||
@ -66,7 +54,7 @@ class PaymentManager:
|
||||
method.last_update_time = int(datetime.now().timestamp())
|
||||
updated = True
|
||||
break # Exit loop once found and updated
|
||||
|
||||
|
||||
if updated:
|
||||
await payment_profile.save()
|
||||
await self.module_logger.log_info(
|
||||
@ -79,7 +67,7 @@ class PaymentManager:
|
||||
}
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
# Log warning with more context
|
||||
await self.module_logger.log_warning(
|
||||
warning="Stripe account not found in payment methods",
|
||||
@ -91,7 +79,7 @@ class PaymentManager:
|
||||
}
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
except Exception as e:
|
||||
await self.module_logger.log_exception(
|
||||
exception=e,
|
||||
|
||||
@ -8,6 +8,7 @@ from stripe.error import SignatureVerificationError
|
||||
from common.log.module_logger import ModuleLogger
|
||||
from decimal import Decimal
|
||||
import json
|
||||
import httpx
|
||||
|
||||
|
||||
stripe.api_key = app_settings.STRIPE_API_KEY
|
||||
@ -39,7 +40,7 @@ class StripeManager:
|
||||
redirect_url="{}/work".format(self.site_url_root)
|
||||
)
|
||||
return login_link.url
|
||||
|
||||
|
||||
# Otherwise show onboarding
|
||||
account_link = stripe.AccountLink.create(
|
||||
account=account_id,
|
||||
@ -90,6 +91,10 @@ class StripeManager:
|
||||
properties={"session_id": session_id},
|
||||
)
|
||||
elif len(transactions) == 0:
|
||||
await self.module_logger.log_error(
|
||||
error="No transaction found for session_id: {}".format(session_id),
|
||||
properties={"session_id": session_id},
|
||||
)
|
||||
return None
|
||||
|
||||
return transactions[0]
|
||||
@ -203,19 +208,25 @@ class StripeManager:
|
||||
transaction.stripe_price_id = price.id
|
||||
await transaction.save()
|
||||
|
||||
payment_link = stripe.PaymentLink.create(
|
||||
line_items=[
|
||||
# Prepare payment link parameters with conditional application_fee_amount
|
||||
payment_link_params = {
|
||||
"line_items": [
|
||||
{
|
||||
"price": transaction.stripe_price_id,
|
||||
"quantity": 1,
|
||||
}
|
||||
],
|
||||
application_fee_amount=transaction.application_fee_amount,
|
||||
on_behalf_of=transaction.to_stripe_account_id,
|
||||
transfer_data={
|
||||
"on_behalf_of": transaction.to_stripe_account_id,
|
||||
"transfer_data": {
|
||||
"destination": transaction.to_stripe_account_id,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
# Only add application_fee_amount if it's greater than 0
|
||||
if transaction.application_fee_amount and transaction.application_fee_amount > 0:
|
||||
payment_link_params["application_fee_amount"] = transaction.application_fee_amount
|
||||
|
||||
payment_link = stripe.PaymentLink.create(**payment_link_params)
|
||||
|
||||
if payment_link:
|
||||
transaction.stripe_payment_link = payment_link.url
|
||||
@ -276,27 +287,37 @@ class StripeManager:
|
||||
transaction.stripe_price_id = price.id
|
||||
await transaction.save()
|
||||
|
||||
session = stripe.checkout.Session.create(
|
||||
payment_method_types=["card"],
|
||||
line_items=[
|
||||
# Prepare payment_intent_data with conditional application_fee_amount
|
||||
payment_intent_data = {
|
||||
"on_behalf_of": transaction.to_stripe_account_id,
|
||||
"transfer_data": {
|
||||
"destination": transaction.to_stripe_account_id,
|
||||
},
|
||||
}
|
||||
|
||||
# Only add application_fee_amount if it's greater than 0
|
||||
if transaction.application_fee_amount and transaction.application_fee_amount > 0:
|
||||
payment_intent_data["application_fee_amount"] = transaction.application_fee_amount
|
||||
|
||||
|
||||
|
||||
session_params = {
|
||||
"payment_method_types": ["card"],
|
||||
"line_items": [
|
||||
{
|
||||
"price": transaction.stripe_price_id,
|
||||
"quantity": 1,
|
||||
}
|
||||
],
|
||||
payment_intent_data={
|
||||
"on_behalf_of": transaction.to_stripe_account_id,
|
||||
"application_fee_amount": transaction.application_fee_amount,
|
||||
"transfer_data": {
|
||||
"destination": transaction.to_stripe_account_id,
|
||||
},
|
||||
},
|
||||
mode="payment",
|
||||
success_url="{}/projects".format(
|
||||
self.site_url_root
|
||||
), # needs to be set, local: http://localhost/
|
||||
cancel_url="{}/projects".format(self.site_url_root),
|
||||
)
|
||||
"payment_intent_data": payment_intent_data,
|
||||
"mode": "payment",
|
||||
"success_url": "{}/projects".format(self.site_url_root),
|
||||
"cancel_url": "{}/projects".format(self.site_url_root),
|
||||
}
|
||||
|
||||
|
||||
|
||||
session = stripe.checkout.Session.create(**session_params)
|
||||
|
||||
if session:
|
||||
transaction.stripe_checkout_session_id = session.id
|
||||
@ -332,21 +353,211 @@ class StripeManager:
|
||||
async def invoke_checkout_session_webhook(
|
||||
self, event: dict
|
||||
) -> Tuple[bool, Optional[str], Optional[str]]:
|
||||
# Handle the checkout.session.completed event
|
||||
"""
|
||||
Handle checkout.session.completed webhook events from Stripe.
|
||||
Updates transaction status and saves payment method information for future use.
|
||||
"""
|
||||
if event["type"] == "checkout.session.completed":
|
||||
session = event["data"]["object"]
|
||||
|
||||
# Find and validate the transaction
|
||||
transaction = await self.__fetch_transaction_by_session_id(session["id"])
|
||||
if not transaction:
|
||||
await self.module_logger.log_error(
|
||||
error="Transaction not found for session_id: {}".format(session["id"]),
|
||||
properties={"session_id": session["id"]},
|
||||
)
|
||||
return False
|
||||
return False, None, None
|
||||
|
||||
transaction.status = TransactionStatus.COMPLETED
|
||||
transaction.updated_time = datetime.now(timezone.utc)
|
||||
await transaction.save()
|
||||
# Update transaction status to completed
|
||||
await self.__update_transaction_status(transaction)
|
||||
|
||||
# Process and save payment method information
|
||||
await self.__process_payment_method(session, transaction)
|
||||
|
||||
return True, transaction.project_id, transaction.milestone_index
|
||||
|
||||
return False, None, None
|
||||
|
||||
async def __update_transaction_status(self, transaction: StripeTransactionDoc) -> None:
|
||||
"""
|
||||
Update transaction status to completed and save to database.
|
||||
"""
|
||||
transaction.status = TransactionStatus.COMPLETED
|
||||
transaction.updated_time = datetime.now(timezone.utc)
|
||||
await transaction.save()
|
||||
|
||||
async def __process_payment_method(self, session: dict, transaction: StripeTransactionDoc) -> None:
|
||||
"""
|
||||
Extract payment method details from Stripe session and save to database.
|
||||
Creates or finds customer and attaches payment method for future use.
|
||||
"""
|
||||
try:
|
||||
# Get payment method details from Stripe
|
||||
payment_method_info = await self.__extract_payment_method_info(session)
|
||||
if not payment_method_info:
|
||||
return
|
||||
|
||||
payment_method_id, card_details = payment_method_info
|
||||
|
||||
# Get or create Stripe customer for the user
|
||||
customer_id = await self.__get_or_create_customer(transaction.from_user)
|
||||
if not customer_id:
|
||||
return
|
||||
|
||||
# Attach payment method to customer and save to database
|
||||
await self.__attach_and_save_payment_method(
|
||||
payment_method_id, card_details, customer_id, transaction.from_user
|
||||
)
|
||||
|
||||
except Exception as payment_method_error:
|
||||
await self.module_logger.log_error(
|
||||
error=f"Error processing payment method: {payment_method_error}",
|
||||
properties={"session_id": session["id"], "user_id": transaction.from_user}
|
||||
)
|
||||
|
||||
async def __extract_payment_method_info(self, session: dict) -> Optional[Tuple[str, dict]]:
|
||||
"""
|
||||
Extract payment method ID and card details from Stripe session.
|
||||
Returns tuple of (payment_method_id, card_details) or None if not found.
|
||||
"""
|
||||
try:
|
||||
# Get the Stripe session to extract payment method details
|
||||
stripe_session = stripe.checkout.Session.retrieve(session["id"])
|
||||
payment_intent_id = stripe_session.get('payment_intent')
|
||||
|
||||
if not payment_intent_id:
|
||||
return None
|
||||
|
||||
payment_intent = stripe.PaymentIntent.retrieve(payment_intent_id)
|
||||
payment_method_id = payment_intent.get('payment_method')
|
||||
|
||||
if not payment_method_id:
|
||||
return None
|
||||
|
||||
payment_method = stripe.PaymentMethod.retrieve(payment_method_id)
|
||||
card_details = payment_method.get('card', {})
|
||||
|
||||
return payment_method_id, card_details
|
||||
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(
|
||||
error=f"Error extracting payment method info: {e}",
|
||||
properties={"session_id": session["id"]}
|
||||
)
|
||||
return None
|
||||
|
||||
async def __get_or_create_customer(self, user_id: str) -> Optional[str]:
|
||||
"""
|
||||
Find existing Stripe customer by email or create new one.
|
||||
Returns customer ID or None if creation fails.
|
||||
"""
|
||||
try:
|
||||
# Generate email for user (fallback since we don't have access to user profile)
|
||||
user_email = f"user_{user_id}@freeleaps.com"
|
||||
|
||||
# Search for existing customers by email
|
||||
customers = stripe.Customer.list(email=user_email, limit=1)
|
||||
if customers.data:
|
||||
return customers.data[0].id
|
||||
|
||||
# Create new customer if not found
|
||||
customer = stripe.Customer.create(
|
||||
email=user_email,
|
||||
metadata={"user_id": user_id}
|
||||
)
|
||||
return customer.id
|
||||
|
||||
except Exception as customer_error:
|
||||
await self.module_logger.log_error(
|
||||
error=f"Error getting/creating customer: {customer_error}",
|
||||
properties={"user_id": user_id}
|
||||
)
|
||||
return None
|
||||
|
||||
async def __attach_and_save_payment_method(
|
||||
self, payment_method_id: str, card_details: dict, customer_id: str, user_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Attach payment method to Stripe customer and save details to database.
|
||||
Handles various error scenarios gracefully.
|
||||
"""
|
||||
try:
|
||||
# Check if payment method is already attached to a customer
|
||||
payment_method_obj = stripe.PaymentMethod.retrieve(payment_method_id)
|
||||
if payment_method_obj.customer:
|
||||
# Use the existing customer ID
|
||||
customer_id = payment_method_obj.customer
|
||||
else:
|
||||
# Try to attach payment method to customer in Stripe
|
||||
stripe.PaymentMethod.attach(
|
||||
payment_method_id,
|
||||
customer=customer_id
|
||||
)
|
||||
|
||||
# Save to database
|
||||
await self.__save_payment_method_to_db(
|
||||
payment_method_id, card_details, customer_id, user_id
|
||||
)
|
||||
|
||||
except stripe.error.InvalidRequestError as attach_error:
|
||||
# Handle specific Stripe attachment errors
|
||||
await self.__handle_attachment_error(
|
||||
attach_error, payment_method_id, card_details, customer_id, user_id
|
||||
)
|
||||
except Exception as save_error:
|
||||
await self.module_logger.log_error(
|
||||
error=f"Error attaching payment method: {save_error}",
|
||||
properties={"payment_method_id": payment_method_id, "user_id": user_id}
|
||||
)
|
||||
|
||||
async def __save_payment_method_to_db(
|
||||
self, payment_method_id: str, card_details: dict, customer_id: str, user_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Save payment method details to database if it doesn't already exist.
|
||||
"""
|
||||
from backend.infra.payment.models import StripePaymentMethodDoc
|
||||
|
||||
# Check if payment method already exists in our database
|
||||
existing_payment_method = await StripePaymentMethodDoc.find_one(
|
||||
StripePaymentMethodDoc.stripe_payment_method_id == payment_method_id
|
||||
)
|
||||
|
||||
if existing_payment_method:
|
||||
return # Already saved
|
||||
|
||||
# Save to our database
|
||||
payment_method_doc = StripePaymentMethodDoc(
|
||||
user_id=user_id,
|
||||
stripe_customer_id=customer_id,
|
||||
stripe_payment_method_id=payment_method_id,
|
||||
card_last4=card_details.get('last4'),
|
||||
card_brand=card_details.get('brand'),
|
||||
card_exp_month=card_details.get('exp_month'),
|
||||
card_exp_year=card_details.get('exp_year'),
|
||||
created_time=datetime.now(timezone.utc),
|
||||
updated_time=datetime.now(timezone.utc),
|
||||
)
|
||||
await payment_method_doc.save()
|
||||
|
||||
async def __handle_attachment_error(
|
||||
self, attach_error: stripe.error.InvalidRequestError,
|
||||
payment_method_id: str, card_details: dict, customer_id: str, user_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Handle specific Stripe attachment errors and still save to database when possible.
|
||||
"""
|
||||
error_message = str(attach_error).lower()
|
||||
|
||||
if "already attached" in error_message or "may not be used again" in error_message:
|
||||
# Payment method can't be attached but we can still save to database
|
||||
await self.__save_payment_method_to_db(
|
||||
payment_method_id, card_details, customer_id, user_id
|
||||
)
|
||||
else:
|
||||
# Log other attachment errors
|
||||
await self.module_logger.log_error(
|
||||
error=f"Error attaching payment method: {attach_error}",
|
||||
properties={"payment_method_id": payment_method_id, "user_id": user_id}
|
||||
)
|
||||
|
||||
@ -12,7 +12,6 @@ class MoneyCollectionType(IntEnum):
|
||||
UNSPECIFIED = 0
|
||||
MARKED_AS_PAID = 1
|
||||
UPLOAD_PROOF = 2
|
||||
WECHAT_QR_CODE = 3
|
||||
STRIPE_CHECKOUT = 4
|
||||
|
||||
|
||||
|
||||
@ -24,3 +24,18 @@ class StripeTransactionDoc(Document):
|
||||
|
||||
class Settings:
|
||||
name = "stripe_transaction"
|
||||
|
||||
|
||||
class StripePaymentMethodDoc(Document):
|
||||
user_id: str
|
||||
stripe_customer_id: str
|
||||
stripe_payment_method_id: str
|
||||
card_last4: Optional[str] = None
|
||||
card_brand: Optional[str] = None
|
||||
card_exp_month: Optional[int] = None
|
||||
card_exp_year: Optional[int] = None
|
||||
created_time: datetime
|
||||
updated_time: datetime
|
||||
|
||||
class Settings:
|
||||
name = "stripe_payment_method"
|
||||
|
||||
@ -5,9 +5,9 @@
|
||||
# TODO: Add all models to backend_models
|
||||
from backend.services.payment.models import IncomeProfileDoc, PaymentProfileDoc
|
||||
from backend.services.project.models import ProjectDoc
|
||||
from backend.infra.payment.models import StripeTransactionDoc
|
||||
from backend.infra.payment.models import StripeTransactionDoc, StripePaymentMethodDoc
|
||||
|
||||
backend_models = [IncomeProfileDoc, PaymentProfileDoc, ProjectDoc, StripeTransactionDoc]
|
||||
backend_models = [IncomeProfileDoc, PaymentProfileDoc, ProjectDoc, StripeTransactionDoc, StripePaymentMethodDoc]
|
||||
# backend_models.extend(code_models)
|
||||
# backend_models.extend(user_models)
|
||||
# backend_models.extend(profile_models)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user