from datetime import datetime from typing import Dict, Literal from app.common.log.module_logger import ModuleLogger from app.common.models.deployment.deployment import Deployment from app.common.models.deployment.heartbeat import DevOpsReconcileJobHeartbeatMessage class DeploymentStatusUpdateService: def __init__(self): self.module_logger = ModuleLogger(sender_id="DeploymentStatusUpdateService") # Status mapping from heartbeat to deployment model self.status_mapping: Dict[str, Literal["started", "failed", "succeeded", "aborted"]] = { "running": "started", "success": "succeeded", "failed": "failed", "terminated": "aborted" } # Phase to stage mapping for more detailed tracking self.phase_to_stage_mapping: Dict[str, str] = { "initializing": "initialization", "jenkins_build": "build", "building": "build", "deploying": "deployment", "finished": "completed" } async def process_heartbeat_message(self, registry_key: str, message_data: dict, args: dict): """Process incoming heartbeat message and update deployment status""" # registry_key and args are provided by the message queue framework but not used in this implementation _ = registry_key, args try: # Parse the message using our Pydantic model heartbeat_message = DevOpsReconcileJobHeartbeatMessage(**message_data) payload = heartbeat_message.payload await self.module_logger.log_info( text=f"Processing heartbeat for deployment {payload.id}: {payload.status} - {payload.phase}", data={"deployment_id": payload.id, "status": payload.status, "phase": payload.phase} ) # Find the deployment by ID deployment = await Deployment.find_one(Deployment.deployment_id == payload.id) if not deployment: await self.module_logger.log_warning( text=f"Deployment not found: {payload.id}", data={"deployment_id": payload.id} ) return # Map heartbeat status to deployment status if payload.status in self.status_mapping: deployment.deployment_status = self.status_mapping[payload.status] else: await self.module_logger.log_warning( text=f"Unknown status received: {payload.status}", data={"deployment_id": payload.id, "status": payload.status} ) return # Map phase to deployment stage if payload.phase in self.phase_to_stage_mapping: deployment.deployment_stage = self.phase_to_stage_mapping[payload.phase] else: deployment.deployment_stage = payload.phase # Update app URL if provided and deployment is successful if payload.url and payload.status == "success": deployment.deployment_app_url = payload.url # Update timestamp deployment.updated_at = datetime.now() # Save the updated deployment await deployment.save() await self.module_logger.log_info( text=f"Updated deployment {payload.id}: status={deployment.deployment_status}, stage={deployment.deployment_stage}", data={ "deployment_id": payload.id, "status": deployment.deployment_status, "stage": deployment.deployment_stage, "app_url": deployment.deployment_app_url, "error": payload.error } ) # Log errors if present if payload.error: await self.module_logger.log_error( text=f"Deployment {payload.id} failed: {payload.error}", data={"deployment_id": payload.id, "error": payload.error, "phase": payload.phase} ) except Exception as e: await self.module_logger.log_exception( exception=e, text=f"Error processing heartbeat message: {message_data}", )