import uuid from collections import defaultdict from datetime import datetime, timedelta from typing import List from fastapi import HTTPException from app.common.models import Deployment from app.common.models.deployment.deployment import InitDeploymentRequest class DeploymentService: def __init__(self): pass async def init_deployment( self, request: InitDeploymentRequest ) -> Deployment: """ """ # TODO validate permission with user_id # currently skip git_url = await self._retrieve_git_url_by_product_id(request.product_id) product_initialized = await self._check_if_project_initialized(git_url, request.product_id) if not product_initialized: await self._init_product(git_url, request.product_id) # retrieve project name project_name = "TODO" # retrieve product info product_id = request.product_id product_name = "TODO" deployment = Deployment.model_construct( deployment_id = str(uuid.uuid4()), deployment_stage = "init", deployment_status = "started", deployment_target_env = request.target_env, deployment_ttl_hours = request.ttl_hours, deployment_project_id = "project_id", deployment_project_name = "project_name", deployment_product_id = product_id, deployment_product_name = product_name, deployment_git_url = git_url, deployment_git_sha256 = request.sha256, deployment_reason = request.reason, deployed_by = request.user_id, created_at = datetime.now(), updated_at = datetime.now(), ) await self._start_deployment(deployment) res = await deployment.insert() return res async def check_deployment_status( self, product_id: str, ) -> List[Deployment]: """ Check the deployment status of the application, only check past 48 hours """ # TODO implement this function time_threshold = datetime.now() - timedelta(hours=48) deployment_records = await Deployment.find( Deployment.deployment_product_id == product_id, Deployment.created_at >= time_threshold ).to_list() grouped = defaultdict(list) for deployment in deployment_records: grouped[deployment.deployment_id].append(deployment) for deployment_list in grouped.values(): deployment_list.sort(key=lambda d: (d.created_at, d.updated_at), reverse=True) latest_deployments = [deployments[-1] for deployments in grouped.values()] return latest_deployments async def update_deployment_status( self, deployment: Deployment ) -> bool: latest_record = await Deployment.find_one( Deployment.deployment_id == deployment.deployment_id, sort=[("created_at", -1)] ) if not latest_record: raise HTTPException(status_code=404, detail="No record found, please initiate deployment first") # TODO add more sanity check logic here if deployment.deployment_stage == latest_record.deployment_status: # update existing record latest_record.deployment_status = deployment.deployment_status latest_record.updated_at = deployment.updated_at or datetime.now() await latest_record.save() else: # create new record deployment.deployment_id = latest_record.deployment_id deployment.created_at = latest_record.created_at deployment.updated_at = datetime.now() await deployment.insert() return True async def _retrieve_git_url_by_product_id( self, product_id: str ) -> str: """ Retrieve git url by product id """ # TODO implement this function return "TODO-git_url" async def _check_if_project_initialized( self, git_url: str, product_id: str ) -> bool: """ Check if the project has been initialized """ # TODO implement this function return True async def _init_product( self, git_url: str, product_id: str ) -> bool: """ Initialize the product """ # TODO implement this function pass async def _start_deployment( self, deployment: Deployment ) -> bool: """ Start the deployment """ # TODO implement this function pass deployment_service = DeploymentService() def get_deployment_service() -> DeploymentService: return deployment_service