import uuid from collections import defaultdict from datetime import datetime, timedelta from typing import List import httpx import requests from fastapi import HTTPException, Depends from app.common.config.site_settings import site_settings from app.common.models import Deployment from app.common.models.code_depot.code_depot import CodeDepotDoc, DepotStatus from app.common.models.deployment.deployment import InitDeploymentRequest, CheckApplicationLogsRequest, \ CheckApplicationLogsResponse class DeploymentService: def __init__(self): pass async def init_deployment( self, request: InitDeploymentRequest, ) -> Deployment: """ """ # TODO validate permission with user_id # currently skip code_depot = await self._get_code_depot_by_product_id(request.product_id) git_url = await self._compose_git_url(code_depot.depot_name) # retrieve project name project_name = "TODO" # retrieve product info, depot name should be the same as product name product_id = request.product_id product_name = code_depot.depot_name deployment = Deployment.model_construct( deployment_id = str(uuid.uuid4()), deployment_stage = "init", deployment_status = "started", deployment_target_env = request.target_env, deployment_ttl_hours = request.ttl_hours, deployment_project_id = "project_id", deployment_project_name = "project_name", deployment_product_id = product_id, deployment_product_name = product_name, deployment_git_url = git_url, deployment_git_sha256 = request.sha256, deployment_reason = request.reason, deployed_by = request.user_id, created_at = datetime.now(), updated_at = datetime.now(), ) await self._start_deployment(deployment) res = await Deployment.insert(deployment) return res async def check_deployment_status( self, product_id: str, ) -> List[Deployment]: """ Check the deployment status of the application, only check past 48 hours """ # TODO implement this function time_threshold = datetime.now() - timedelta(hours=48) deployment_records = await Deployment.find( Deployment.deployment_product_id == product_id, Deployment.created_at >= time_threshold ).to_list() grouped = defaultdict(list) for deployment in deployment_records: grouped[deployment.deployment_id].append(deployment) for deployment_list in grouped.values(): deployment_list.sort(key=lambda d: (d.created_at, d.updated_at), reverse=True) latest_deployments = [deployments[-1] for deployments in grouped.values()] return latest_deployments async def update_deployment_status( self, deployment: Deployment ) -> Deployment: latest_record = await Deployment.find_one( Deployment.deployment_id == deployment.deployment_id, sort=[("created_at", -1)] ) if not latest_record: raise HTTPException(status_code=404, detail="No record found, please initiate deployment first") # TODO add more sanity check logic here # if updating the same stage, just update the status and timestamp # else, create a new record with the same deployment_id res = None if deployment.deployment_stage == latest_record.deployment_stage: # update existing record latest_record.deployment_status = deployment.deployment_status latest_record.updated_at = deployment.updated_at or datetime.now() res = await latest_record.save() else: # create new record deployment.deployment_id = latest_record.deployment_id deployment.created_at = datetime.now() deployment.updated_at = datetime.now() res = await deployment.insert() return res async def _get_code_depot_by_product_id( self, product_id: str, ) -> CodeDepotDoc: """ Retrieve code depot by product id """ code_depot = await CodeDepotDoc.find_one(CodeDepotDoc.product_id == product_id) if not code_depot: raise HTTPException(status_code=404, detail="Code depot not found for the given product id, " "please initialize the product first" ) return code_depot async def _compose_git_url( self, code_depot_name: str, gitea_base_url: str = site_settings.BASE_GITEA_URL ) -> str: """ Retrieve git url by product id """ return f"{gitea_base_url}/prodcuts/{code_depot_name.lower()}.git" async def _start_deployment( self, deployment: Deployment, reconsile_base_url: str = site_settings.BASE_RECONSILE_URL, ) -> bool: """ Start the deployment Return true atm, modify calling reconsile service later """ # async with httpx.AsyncClient() as client: # response = await client.post( # f"{reconsile_base_url}/api/devops/reconcile", # json=deployment.model_dump() # ) # if response.status_code != 200: # raise HTTPException(status_code=response.status_code, detail=response.text) return True async def check_application_logs( self, request: CheckApplicationLogsRequest, loki_url: str = site_settings.BASE_LOKI_URL, ) -> CheckApplicationLogsResponse: # Convert to nanoseconds since epoch start_ns = int(request.start_time.timestamp() * 1e9) end_ns = int(request.end_time.timestamp() * 1e9) # TODO: convert product_id to application name if needed base_query = f'{{application="{request.product_id}", environment="{request.target_env}"}}' log_level = '|'.join(request.log_level) if request.log_level else '' loki_query = f'{base_query} |~ "{log_level}"' params = { "query": loki_query, "limit": request.limit, "start": start_ns, "end": end_ns, } url = f"{loki_url}/loki/api/v1/query_range" response = requests.get(url, params=params) if response.status_code != 200: raise Exception(f"Query failed: {response.status_code} - {response.text}") data = response.json() streams = data.get("data", {}).get("result", []) logs = [] for stream in streams: for ts, log in stream.get("values", []): timestamp = datetime.fromtimestamp(int(ts) / 1e9) logs.append(f"[{timestamp}] {log.strip()}") return CheckApplicationLogsResponse( product_id=request.product_id, target_env=request.target_env, user_id=request.user_id, log_level=request.log_level, start_time=request.start_time, end_time=request.end_time, limit=request.limit, logs=logs ) # TODO: dummy test code, remove later async def create_dummy_code_depot( self, ) -> CodeDepotDoc: """ Create a dummy code depot for testing purposes. """ depot_name = f"dummy-depot-{uuid.uuid4()}" code_depot = CodeDepotDoc( depot_name=depot_name, product_id="dummy-product-id", depot_status=DepotStatus.CREATED ) return await CodeDepotDoc.insert_one(code_depot) deployment_service = DeploymentService() def get_deployment_service() -> DeploymentService: return deployment_service