diff --git a/.gitignore b/.gitignore index b022725..c6032c1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ *.pyc freedev.code-workspace .idea/ -.pytest_cache/ \ No newline at end of file +.pytest_cache/ +CLAUDE.md diff --git a/apps/devops/app/backend/infra/rabbitmq/__init__.py b/apps/devops/app/backend/infra/rabbitmq/__init__.py new file mode 100644 index 0000000..881d42a --- /dev/null +++ b/apps/devops/app/backend/infra/rabbitmq/__init__.py @@ -0,0 +1 @@ +# RabbitMQ infrastructure for DevOps service \ No newline at end of file diff --git a/apps/devops/app/backend/infra/rabbitmq/async_client.py b/apps/devops/app/backend/infra/rabbitmq/async_client.py new file mode 100644 index 0000000..a48fc58 --- /dev/null +++ b/apps/devops/app/backend/infra/rabbitmq/async_client.py @@ -0,0 +1,64 @@ +from app.common.config.app_settings import app_settings +from app.common.log.module_logger import ModuleLogger +import asyncio +from asyncio import AbstractEventLoop +import aio_pika + + +class AsyncMQClient: + exchange_name_format = "freeleaps.devops.exchange.{}" + exchange_type = "direct" + + def __init__(self, channel_name: str) -> None: + self.exchange_name_format = AsyncMQClient.exchange_name_format + self.channel_name = channel_name + self.exchange_type = AsyncMQClient.exchange_type + self.exchange_name = self.exchange_name_format.format(self.channel_name) + self.process_callable = None + self.routing_key = self.channel_name + self.module_logger = ModuleLogger(sender_id="AsyncMQClient") + + async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None): + retry_count = 0 + retry_interval = 1 + + while retry_count < max_retries: + try: + self.connection = await aio_pika.connect_robust( + host=app_settings.RABBITMQ_HOST, + port=int(app_settings.RABBITMQ_PORT), + login=app_settings.RABBITMQ_USERNAME, + password=app_settings.RABBITMQ_PASSWORD, + virtualhost=app_settings.RABBITMQ_VIRTUAL_HOST, + loop=event_loop, + ) + self.channel = await self.connection.channel() + self.exchange = await self.channel.declare_exchange( + name=self.exchange_name, type="direct", auto_delete=False + ) + + self.queue = await self.channel.declare_queue( + name=None, exclusive=True, auto_delete=True, durable=False + ) + await self.queue.bind( + exchange=self.exchange, routing_key=self.routing_key + ) + break + except Exception as e: + await self.module_logger.log_exception( + exception=e, + text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}", + ) + await asyncio.sleep(retry_interval) + retry_interval = min(retry_interval * 2, 60) + retry_count += 1 + + if retry_count >= max_retries: + raise ConnectionError( + "Unable to connect to RabbitMQ after multiple retries." + ) + + async def close(self): + """Unbind the queue and close the connection gracefully.""" + await self.queue.unbind(self.exchange, self.routing_key) + await self.connection.close() \ No newline at end of file diff --git a/apps/devops/app/backend/infra/rabbitmq/async_subscriber.py b/apps/devops/app/backend/infra/rabbitmq/async_subscriber.py new file mode 100644 index 0000000..05ef7e4 --- /dev/null +++ b/apps/devops/app/backend/infra/rabbitmq/async_subscriber.py @@ -0,0 +1,84 @@ +from asyncio import AbstractEventLoop +from app.common.log.module_logger import ModuleLogger +import json +import asyncio +from .async_client import AsyncMQClient + + +class AsyncMQSubscriber(AsyncMQClient): + def __init__(self, channel_name: str) -> None: + super().__init__(channel_name=channel_name) + self.process_callable = None + self.routing_key = self.channel_name + self.consumer_callbacks = {} + self.consumer_callbacks_lock = asyncio.Lock() + self.module_logger = ModuleLogger(sender_id="AsyncMQSubscriber") + + async def process_incoming_message(self, message): + """Processing incoming message from RabbitMQ""" + await message.ack() + body = message.body + if body: + async with self.consumer_callbacks_lock: + for registry_key, callback_info in self.consumer_callbacks.items(): + try: + await callback_info["method"]( + registry_key, json.loads(body), callback_info["args"] + ) + except Exception as err: + await self.module_logger.log_exception( + exception=err, + text=f"Error processing message for consumer '{registry_key}'", + ) + + async def subscribe(self, max_retries=10, event_loop: AbstractEventLoop = None): + """Attempts to bind and consume messages, with retry mechanism.""" + retries = 0 + while retries < max_retries: + try: + await self.bind(max_retries=5, event_loop=event_loop) + await self.queue.consume( + no_ack=False, exclusive=True, callback=self.process_incoming_message + ) + break + except Exception as e: + await self.module_logger.log_exception( + exception=e, + text=f"Failed to subscribe at {retries} time, will retry", + ) + retries += 1 + await asyncio.sleep(5) + else: + await self.module_logger.log_exception( + exception=ConnectionError( + f"Exceeded max retries ({max_retries}) for subscription." + ), + text=f"Subscription failed for {self.channel_name} after {max_retries} attempts.", + ) + + async def register_consumer( + self, + registry_key: str, + callback_method, + args: dict, + ): + """Register a consumer callback with a unique key.""" + async with self.consumer_callbacks_lock: + self.consumer_callbacks[registry_key] = { + "method": callback_method, + "args": args, + } + + async def unregister_consumer( + self, + registry_key: str, + ): + """Unregister a consumer callback by its key.""" + async with self.consumer_callbacks_lock: + if registry_key in self.consumer_callbacks: + del self.consumer_callbacks[registry_key] + + async def clear_all_consumers(self): + """Unregister all consumer callbacks.""" + async with self.consumer_callbacks_lock: + self.consumer_callbacks.clear() \ No newline at end of file diff --git a/apps/devops/app/backend/services/deployment_status_update_service.py b/apps/devops/app/backend/services/deployment_status_update_service.py new file mode 100644 index 0000000..2dfa53a --- /dev/null +++ b/apps/devops/app/backend/services/deployment_status_update_service.py @@ -0,0 +1,100 @@ +from datetime import datetime +from typing import Dict, Literal +from app.common.log.module_logger import ModuleLogger +from app.common.models.deployment.deployment import Deployment +from app.common.models.deployment.heartbeat import DevOpsReconcileJobHeartbeatMessage + + +class DeploymentStatusUpdateService: + def __init__(self): + self.module_logger = ModuleLogger(sender_id="DeploymentStatusUpdateService") + + # Status mapping from heartbeat to deployment model + self.status_mapping: Dict[str, Literal["started", "failed", "succeeded", "aborted"]] = { + "running": "started", + "success": "succeeded", + "failed": "failed", + "terminated": "aborted" + } + + # Phase to stage mapping for more detailed tracking + self.phase_to_stage_mapping: Dict[str, str] = { + "initializing": "initialization", + "jenkins_build": "build", + "building": "build", + "deploying": "deployment", + "finished": "completed" + } + + async def process_heartbeat_message(self, registry_key: str, message_data: dict, args: dict): + """Process incoming heartbeat message and update deployment status""" + # registry_key and args are provided by the message queue framework but not used in this implementation + _ = registry_key, args + try: + # Parse the message using our Pydantic model + heartbeat_message = DevOpsReconcileJobHeartbeatMessage(**message_data) + payload = heartbeat_message.payload + + await self.module_logger.log_info( + text=f"Processing heartbeat for deployment {payload.id}: {payload.status} - {payload.phase}", + data={"deployment_id": payload.id, "status": payload.status, "phase": payload.phase} + ) + + # Find the deployment by ID + deployment = await Deployment.find_one(Deployment.deployment_id == payload.id) + if not deployment: + await self.module_logger.log_warning( + text=f"Deployment not found: {payload.id}", + data={"deployment_id": payload.id} + ) + return + + # Map heartbeat status to deployment status + if payload.status in self.status_mapping: + deployment.deployment_status = self.status_mapping[payload.status] + else: + await self.module_logger.log_warning( + text=f"Unknown status received: {payload.status}", + data={"deployment_id": payload.id, "status": payload.status} + ) + return + + # Map phase to deployment stage + if payload.phase in self.phase_to_stage_mapping: + deployment.deployment_stage = self.phase_to_stage_mapping[payload.phase] + else: + deployment.deployment_stage = payload.phase + + # Update app URL if provided and deployment is successful + if payload.url and payload.status == "success": + deployment.deployment_app_url = payload.url + + # Update timestamp + deployment.updated_at = datetime.now() + + # Save the updated deployment + await deployment.save() + + await self.module_logger.log_info( + text=f"Updated deployment {payload.id}: status={deployment.deployment_status}, stage={deployment.deployment_stage}", + data={ + "deployment_id": payload.id, + "status": deployment.deployment_status, + "stage": deployment.deployment_stage, + "app_url": deployment.deployment_app_url, + "error": payload.error + } + ) + + # Log errors if present + if payload.error: + await self.module_logger.log_error( + text=f"Deployment {payload.id} failed: {payload.error}", + data={"deployment_id": payload.id, "error": payload.error, "phase": payload.phase} + ) + + except Exception as e: + await self.module_logger.log_exception( + exception=e, + text=f"Error processing heartbeat message: {message_data}", + ) \ No newline at end of file diff --git a/apps/devops/app/bootstrap/application.py b/apps/devops/app/bootstrap/application.py index 81db6f5..3cf635b 100644 --- a/apps/devops/app/bootstrap/application.py +++ b/apps/devops/app/bootstrap/application.py @@ -9,6 +9,7 @@ from app.providers import database from app.providers import metrics from app.providers import probes from app.providers import exception_handler +from app.providers import message_queue from app.common.config.app_settings import app_settings def create_app() -> FastAPI: @@ -21,6 +22,7 @@ def create_app() -> FastAPI: register(app, database) register(app, router) register(app, common) + register(app, message_queue) # Call the custom_openapi function to change the OpenAPI version customize_openapi_security(app) diff --git a/apps/devops/app/common/log/module_logger.py b/apps/devops/app/common/log/module_logger.py new file mode 100644 index 0000000..3e82f74 --- /dev/null +++ b/apps/devops/app/common/log/module_logger.py @@ -0,0 +1,46 @@ +from .application_logger import ApplicationLogger + + +class ModuleLogger(ApplicationLogger): + def __init__(self, sender_id: str) -> None: + super().__init__() + self.event_sender_id = sender_id + self.event_receiver_id = "ModuleLogger" + self.event_subject = "module" + + async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None: + return await super().log_exception( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + exception=exception, + text=text, + properties=properties, + ) + + async def log_info(self, text: str, data: dict[str, any] = None) -> None: + return await super().log_info( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=text, + properties=data, + ) + + async def log_warning(self, text: str, data: dict[str, any] = None) -> None: + return await super().log_warning( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=text, + properties=data, + ) + + async def log_error(self, text: str, data: dict[str, any] = None) -> None: + return await super().log_error( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=text, + properties=data, + ) \ No newline at end of file diff --git a/apps/devops/app/common/models/deployment/heartbeat.py b/apps/devops/app/common/models/deployment/heartbeat.py new file mode 100644 index 0000000..7c7a624 --- /dev/null +++ b/apps/devops/app/common/models/deployment/heartbeat.py @@ -0,0 +1,17 @@ +from typing import Literal, Optional +from pydantic import BaseModel + + +class DevOpsReconcileJobHeartbeatPayload(BaseModel): + operation: Literal["heartbeat"] = "heartbeat" + id: str + status: Literal["running", "success", "failed", "terminated"] + phase: Literal["initializing", "jenkins_build", "building", "deploying", "finished"] + phase_message: str + error: Optional[str] = None + url: Optional[str] = None + + +class DevOpsReconcileJobHeartbeatMessage(BaseModel): + event_type: Literal["DevOpsReconcileJobHeartbeat"] + payload: DevOpsReconcileJobHeartbeatPayload \ No newline at end of file diff --git a/apps/devops/app/providers/message_queue.py b/apps/devops/app/providers/message_queue.py new file mode 100644 index 0000000..03ecb00 --- /dev/null +++ b/apps/devops/app/providers/message_queue.py @@ -0,0 +1,30 @@ +import asyncio +from app.backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber +from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService + + +def register(app): + # Initialize the message subscriber and status update service + app.deployment_heartbeat_subscriber = AsyncMQSubscriber("devops_reconcile_heartbeat") + app.deployment_status_service = DeploymentStatusUpdateService() + + @app.on_event("startup") + async def start_message_consumers(): + # Register the heartbeat processor + await app.deployment_heartbeat_subscriber.register_consumer( + registry_key="deployment_heartbeat_processor", + callback_method=app.deployment_status_service.process_heartbeat_message, + args={} + ) + + # Start the subscriber + loop = asyncio.get_running_loop() + await loop.create_task( + app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop) + ) + + @app.on_event("shutdown") + async def stop_message_consumers(): + # Clear consumers and close connection + await app.deployment_heartbeat_subscriber.clear_all_consumers() + await app.deployment_heartbeat_subscriber.close() \ No newline at end of file diff --git a/apps/devops/requirements.txt b/apps/devops/requirements.txt index c593732..a83128e 100644 --- a/apps/devops/requirements.txt +++ b/apps/devops/requirements.txt @@ -11,4 +11,6 @@ httpx==0.24.0 pydantic-settings~=2.9.1 pymongo~=4.12.1 pydantic~=2.11.4 -requests~=2.32.3 \ No newline at end of file +requests~=2.32.3 +aio-pika==9.4.3 +pytest-asyncio==0.24.0 \ No newline at end of file diff --git a/apps/devops/tests/services/test_deployment_status_update_service.py b/apps/devops/tests/services/test_deployment_status_update_service.py new file mode 100644 index 0000000..8bd4a01 --- /dev/null +++ b/apps/devops/tests/services/test_deployment_status_update_service.py @@ -0,0 +1,216 @@ +import pytest +from unittest.mock import AsyncMock +from datetime import datetime +from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService +from app.common.models.deployment.deployment import Deployment + + +@pytest.fixture +def status_update_service(): + return DeploymentStatusUpdateService() + + +@pytest.fixture +def sample_heartbeat_message(): + return { + "event_type": "DevOpsReconcileJobHeartbeat", + "payload": { + "operation": "heartbeat", + "id": "deployment-123-abc", + "status": "running", + "phase": "building", + "phase_message": "Building container image", + "error": None, + "url": None + } + } + + +@pytest.fixture +def sample_success_message(): + return { + "event_type": "DevOpsReconcileJobHeartbeat", + "payload": { + "operation": "heartbeat", + "id": "deployment-789-ghi", + "status": "success", + "phase": "finished", + "phase_message": "Deployment completed successfully", + "error": None, + "url": "https://my-app-alpha.freeleaps.com" + } + } + + +@pytest.fixture +def sample_failed_message(): + return { + "event_type": "DevOpsReconcileJobHeartbeat", + "payload": { + "operation": "heartbeat", + "id": "deployment-456-def", + "status": "failed", + "phase": "jenkins_build", + "phase_message": "Build failed due to compilation errors", + "error": "Build step 'Invoke top-level Maven targets' marked build as failure", + "url": None + } + } + + +@pytest.fixture +def mock_deployment(): + from unittest.mock import AsyncMock + + class MockDeployment: + def __init__(self): + self.deployment_id = "deployment-123-abc" + self.deployment_status = "started" + self.deployment_stage = "initialization" + self.deployment_app_url = "" + self.updated_at = datetime.now() + self.save = AsyncMock() + + return MockDeployment() + + +class TestDeploymentStatusUpdateService: + + @pytest.mark.asyncio + async def test_status_mapping(self, status_update_service): + """Test that status mapping works correctly""" + assert status_update_service.status_mapping["running"] == "started" + assert status_update_service.status_mapping["success"] == "succeeded" + assert status_update_service.status_mapping["failed"] == "failed" + assert status_update_service.status_mapping["terminated"] == "aborted" + + @pytest.mark.asyncio + async def test_phase_to_stage_mapping(self, status_update_service): + """Test that phase to stage mapping works correctly""" + assert status_update_service.phase_to_stage_mapping["initializing"] == "initialization" + assert status_update_service.phase_to_stage_mapping["jenkins_build"] == "build" + assert status_update_service.phase_to_stage_mapping["building"] == "build" + assert status_update_service.phase_to_stage_mapping["deploying"] == "deployment" + assert status_update_service.phase_to_stage_mapping["finished"] == "completed" + + @pytest.mark.asyncio + async def test_process_running_heartbeat_message(self, status_update_service, sample_heartbeat_message, mock_deployment, monkeypatch): + """Test processing a running status heartbeat""" + # Mock Deployment.find_one to return our mock deployment + async def mock_find_one(query): + _ = query # Parameter required by interface but not used in mock + return mock_deployment + + # Mock the logger methods to avoid actual logging during tests + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Mock the Beanie query mechanism properly + mock_deployment_class = AsyncMock() + mock_deployment_class.find_one = mock_find_one + monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class) + + await status_update_service.process_heartbeat_message( + "test_key", sample_heartbeat_message, {} + ) + + # Verify the deployment was updated correctly + assert mock_deployment.deployment_status == "started" + assert mock_deployment.deployment_stage == "build" + mock_deployment.save.assert_called_once() + + @pytest.mark.asyncio + async def test_process_success_heartbeat_message(self, status_update_service, sample_success_message, mock_deployment, monkeypatch): + """Test processing a success status heartbeat with URL""" + async def mock_find_one(query): + _ = query # Parameter required by interface but not used in mock + return mock_deployment + + # Mock the logger methods + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Mock the Beanie query mechanism properly + mock_deployment_class = AsyncMock() + mock_deployment_class.find_one = mock_find_one + monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class) + + await status_update_service.process_heartbeat_message( + "test_key", sample_success_message, {} + ) + + # Verify the deployment was updated correctly + assert mock_deployment.deployment_status == "succeeded" + assert mock_deployment.deployment_stage == "completed" + assert mock_deployment.deployment_app_url == "https://my-app-alpha.freeleaps.com" + mock_deployment.save.assert_called_once() + + @pytest.mark.asyncio + async def test_process_failed_heartbeat_message(self, status_update_service, sample_failed_message, mock_deployment, monkeypatch): + """Test processing a failed status heartbeat""" + async def mock_find_one(query): + _ = query # Parameter required by interface but not used in mock + return mock_deployment + + # Mock the logger methods + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Mock the Beanie query mechanism properly + mock_deployment_class = AsyncMock() + mock_deployment_class.find_one = mock_find_one + monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class) + + await status_update_service.process_heartbeat_message( + "test_key", sample_failed_message, {} + ) + + # Verify the deployment was updated correctly + assert mock_deployment.deployment_status == "failed" + assert mock_deployment.deployment_stage == "build" + mock_deployment.save.assert_called_once() + + @pytest.mark.asyncio + async def test_deployment_not_found(self, status_update_service, sample_heartbeat_message, monkeypatch): + """Test handling when deployment is not found""" + async def mock_find_one(query): + _ = query # Parameter required by interface but not used in mock + return None + + # Mock the logger methods + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Mock the Beanie query mechanism properly + mock_deployment_class = AsyncMock() + mock_deployment_class.find_one = mock_find_one + monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class) + + # Should not raise an exception + await status_update_service.process_heartbeat_message( + "test_key", sample_heartbeat_message, {} + ) + + @pytest.mark.asyncio + async def test_invalid_message_format(self, status_update_service): + """Test handling invalid message format""" + invalid_message = {"invalid": "format"} + + # Mock the logger methods + status_update_service.module_logger.log_info = AsyncMock() + status_update_service.module_logger.log_warning = AsyncMock() + status_update_service.module_logger.log_error = AsyncMock() + status_update_service.module_logger.log_exception = AsyncMock() + + # Should not raise an exception due to try/catch in the method + await status_update_service.process_heartbeat_message( + "test_key", invalid_message, {} + ) \ No newline at end of file