Implement get_latest_deployment
This commit is contained in:
parent
dd642994d4
commit
921633dc04
@ -1,5 +1,5 @@
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from loguru import logger
|
||||
@ -22,14 +22,13 @@ async def init_deployment(
|
||||
@router.get('/getLatestDeployment')
|
||||
async def get_latest_deployment(
|
||||
product_id: str,
|
||||
target_env: str = "alpha",
|
||||
service: DeploymentService = Depends(get_deployment_service)
|
||||
) -> Deployment:
|
||||
) -> Optional[Deployment]:
|
||||
"""
|
||||
Get the latest deployment for a given product ID.
|
||||
"""
|
||||
# return await service.get_latest_deployment(product_id)
|
||||
return None
|
||||
|
||||
return await service.get_latest_deployment(product_id, target_env)
|
||||
|
||||
@router.post("/updateDeploymentStatus")
|
||||
async def update_deployment(
|
||||
|
||||
@ -13,6 +13,8 @@ from app.common.models.code_depot.code_depot import CodeDepotDoc, DepotStatus
|
||||
from app.common.models.deployment.deployment import InitDeploymentRequest, CheckApplicationLogsRequest, \
|
||||
CheckApplicationLogsResponse
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class DeploymentService:
|
||||
|
||||
@ -63,6 +65,25 @@ class DeploymentService:
|
||||
|
||||
return res
|
||||
|
||||
async def get_latest_deployment(
|
||||
self,
|
||||
product_id: str,
|
||||
target_env: str,
|
||||
) -> Deployment:
|
||||
time_threshold = datetime.now() - timedelta(hours=168) # 7 days
|
||||
deployment_records = await Deployment.find(
|
||||
Deployment.deployment_product_id == product_id,
|
||||
Deployment.deployment_target_env == target_env,
|
||||
Deployment.updated_at >= time_threshold
|
||||
).to_list()
|
||||
|
||||
if not deployment_records or len(deployment_records) == 0:
|
||||
logger.warning(f"No deployment records found for product ID: {product_id} in the last 7 days")
|
||||
return None
|
||||
|
||||
latest_deployment = max(deployment_records, key=lambda d: (d.updated_at, d.created_at))
|
||||
return latest_deployment
|
||||
|
||||
async def check_deployment_status(
|
||||
self,
|
||||
product_id: str,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user