freeleaps-service-hub/apps/devops/app/routes/deployment/service.py
2025-06-07 11:32:48 -07:00

179 lines
6.1 KiB
Python

import uuid
from collections import defaultdict
from datetime import datetime, timedelta
from typing import List
import httpx
from fastapi import HTTPException, Depends
from app.common.config.site_settings import site_settings
from app.common.daos.code_depot import get_code_depot_dao, CodeDepotDao
from app.common.daos.deployment import DeploymentDao, get_deployment_dao
from app.common.models import Deployment
from app.common.models.code_depot.code_depot import CodeDepotDoc, DepotStatus
from app.common.models.deployment.deployment import InitDeploymentRequest
class DeploymentService:
def __init__(self):
pass
async def init_deployment(
self,
request: InitDeploymentRequest,
dao: DeploymentDao = Depends(get_deployment_dao)
) -> Deployment:
"""
"""
# TODO validate permission with user_id
# currently skip
code_depot = await self._get_code_depot_by_product_id(request.product_id)
git_url = await self._compose_git_url(code_depot.depot_name)
# retrieve project name
project_name = "TODO"
# retrieve product info, depot name should be the same as product name
product_id = request.product_id
product_name = code_depot.depot_name
deployment = Deployment.model_construct(
deployment_id = str(uuid.uuid4()),
deployment_stage = "init",
deployment_status = "started",
deployment_target_env = request.target_env,
deployment_ttl_hours = request.ttl_hours,
deployment_project_id = "project_id",
deployment_project_name = "project_name",
deployment_product_id = product_id,
deployment_product_name = product_name,
deployment_git_url = git_url,
deployment_git_sha256 = request.sha256,
deployment_reason = request.reason,
deployed_by = request.user_id,
created_at = datetime.now(),
updated_at = datetime.now(),
)
await self._start_deployment(deployment)
res = await dao.create_deployment(deployment)
return res
async def check_deployment_status(
self,
product_id: str,
) -> List[Deployment]:
"""
Check the deployment status of the application, only check past 48 hours
"""
# TODO implement this function
time_threshold = datetime.now() - timedelta(hours=48)
deployment_records = await Deployment.find(
Deployment.deployment_product_id == product_id,
Deployment.created_at >= time_threshold
).to_list()
grouped = defaultdict(list)
for deployment in deployment_records:
grouped[deployment.deployment_id].append(deployment)
for deployment_list in grouped.values():
deployment_list.sort(key=lambda d: (d.created_at, d.updated_at), reverse=True)
latest_deployments = [deployments[-1] for deployments in grouped.values()]
return latest_deployments
async def update_deployment_status(
self,
deployment: Deployment
) -> bool:
latest_record = await Deployment.find_one(
Deployment.deployment_id == deployment.deployment_id,
sort=[("created_at", -1)]
)
if not latest_record:
raise HTTPException(status_code=404, detail="No record found, please initiate deployment first")
# TODO add more sanity check logic here
if deployment.deployment_stage == latest_record.deployment_status:
# update existing record
latest_record.deployment_status = deployment.deployment_status
latest_record.updated_at = deployment.updated_at or datetime.now()
await latest_record.save()
else:
# create new record
deployment.deployment_id = latest_record.deployment_id
deployment.created_at = latest_record.created_at
deployment.updated_at = datetime.now()
await deployment.insert()
return True
async def _get_code_depot_by_product_id(
self,
product_id: str,
code_depot_dao: CodeDepotDao = Depends(get_code_depot_dao)
) -> CodeDepotDoc:
"""
Retrieve code depot by product id
"""
code_depot = await code_depot_dao.get_code_depot_by_product_id(product_id)
if not code_depot:
raise HTTPException(status_code=404,
detail="Code depot not found for the given product id, "
"please initialize the product first"
)
return code_depot
async def _compose_git_url(
self,
code_depot_name: str,
gitea_base_url: str = site_settings.BASE_GITEA_URL
) -> str:
"""
Retrieve git url by product id
"""
return f"{gitea_base_url}/prodcuts/{code_depot_name.lower()}.git"
async def _start_deployment(
self,
deployment: Deployment,
reconsile_base_url: str = site_settings.BASE_RECONSILE_URL,
) -> bool:
"""
Start the deployment
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{reconsile_base_url}/api/devops/reconcile",
json=deployment.model_dump()
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return True
async def create_dummy_code_depot(
self,
code_depot_dao: CodeDepotDao = Depends(get_code_depot_dao)
) -> CodeDepotDoc:
"""
Create a dummy code depot for testing purposes.
"""
depot_name = f"dummy-depot-{uuid.uuid4()}"
code_depot = CodeDepotDoc(
depot_name=depot_name,
product_id="dummy-product-id",
depot_status=DepotStatus.CREATED
)
return await code_depot.insert_one(code_depot)
deployment_service = DeploymentService()
def get_deployment_service() -> DeploymentService:
return deployment_service