import uuid from collections import defaultdict from datetime import datetime, timedelta from typing import List import httpx from fastapi import HTTPException, Depends from app.common.config.site_settings import site_settings from app.common.daos.code_depot import get_code_depot_dao, CodeDepotDao from app.common.daos.deployment import DeploymentDao, get_deployment_dao from app.common.models import Deployment from app.common.models.code_depot.code_depot import CodeDepotDoc, DepotStatus from app.common.models.deployment.deployment import InitDeploymentRequest class DeploymentService: def __init__(self): pass async def init_deployment( self, request: InitDeploymentRequest, dao: DeploymentDao = get_deployment_dao() ) -> Deployment: """ """ # TODO validate permission with user_id # currently skip code_depot = await self._get_code_depot_by_product_id(request.product_id) git_url = await self._compose_git_url(code_depot.depot_name) # retrieve project name project_name = "TODO" # retrieve product info, depot name should be the same as product name product_id = request.product_id product_name = code_depot.depot_name deployment = Deployment.model_construct( deployment_id = str(uuid.uuid4()), deployment_stage = "init", deployment_status = "started", deployment_target_env = request.target_env, deployment_ttl_hours = request.ttl_hours, deployment_project_id = "project_id", deployment_project_name = "project_name", deployment_product_id = product_id, deployment_product_name = product_name, deployment_git_url = git_url, deployment_git_sha256 = request.sha256, deployment_reason = request.reason, deployed_by = request.user_id, created_at = datetime.now(), updated_at = datetime.now(), ) await self._start_deployment(deployment) res = await dao.create_deployment(deployment) return res async def check_deployment_status( self, product_id: str, ) -> List[Deployment]: """ Check the deployment status of the application, only check past 48 hours """ # TODO implement this function time_threshold = datetime.now() - timedelta(hours=48) deployment_records = await Deployment.find( Deployment.deployment_product_id == product_id, Deployment.created_at >= time_threshold ).to_list() grouped = defaultdict(list) for deployment in deployment_records: grouped[deployment.deployment_id].append(deployment) for deployment_list in grouped.values(): deployment_list.sort(key=lambda d: (d.created_at, d.updated_at), reverse=True) latest_deployments = [deployments[-1] for deployments in grouped.values()] return latest_deployments async def update_deployment_status( self, deployment: Deployment ) -> bool: latest_record = await Deployment.find_one( Deployment.deployment_id == deployment.deployment_id, sort=[("created_at", -1)] ) if not latest_record: raise HTTPException(status_code=404, detail="No record found, please initiate deployment first") # TODO add more sanity check logic here if deployment.deployment_stage == latest_record.deployment_status: # update existing record latest_record.deployment_status = deployment.deployment_status latest_record.updated_at = deployment.updated_at or datetime.now() await latest_record.save() else: # create new record deployment.deployment_id = latest_record.deployment_id deployment.created_at = latest_record.created_at deployment.updated_at = datetime.now() await deployment.insert() return True async def _get_code_depot_by_product_id( self, product_id: str, code_depot_dao: CodeDepotDao = get_code_depot_dao() ) -> CodeDepotDoc: """ Retrieve code depot by product id """ code_depot = await code_depot_dao.get_code_depot_by_product_id(product_id) if not code_depot: raise HTTPException(status_code=404, detail="Code depot not found for the given product id, " "please initialize the product first" ) return code_depot async def _compose_git_url( self, code_depot_name: str, gitea_base_url: str = site_settings.BASE_GITEA_URL ) -> str: """ Retrieve git url by product id """ return f"{gitea_base_url}/prodcuts/{code_depot_name.lower()}.git" async def _start_deployment( self, deployment: Deployment, reconsile_base_url: str = site_settings.BASE_RECONSILE_URL, ) -> bool: """ Start the deployment Return true atm, modify calling reconsile service later """ # async with httpx.AsyncClient() as client: # response = await client.post( # f"{reconsile_base_url}/api/devops/reconcile", # json=deployment.model_dump() # ) # if response.status_code != 200: # raise HTTPException(status_code=response.status_code, detail=response.text) return True # TODO: dummy test code, remove later async def create_dummy_code_depot( self, code_depot_dao: CodeDepotDao = Depends(get_code_depot_dao) ) -> CodeDepotDoc: """ Create a dummy code depot for testing purposes. """ depot_name = f"dummy-depot-{uuid.uuid4()}" code_depot = CodeDepotDoc( depot_name=depot_name, product_id="dummy-product-id", depot_status=DepotStatus.CREATED ) return await code_depot.insert_one(code_depot) deployment_service = DeploymentService() def get_deployment_service() -> DeploymentService: return deployment_service