Add message queue registration in application bootstrap and update dependencies

Signed-off-by: zhenyus <zhenyus@mathmast.com>
This commit is contained in:
zhenyus 2025-07-30 10:50:22 +08:00
parent 3ce7a7ec76
commit d0aa2ba14b
11 changed files with 565 additions and 2 deletions

3
.gitignore vendored
View File

@ -7,4 +7,5 @@
*.pyc *.pyc
freedev.code-workspace freedev.code-workspace
.idea/ .idea/
.pytest_cache/ .pytest_cache/
CLAUDE.md

View File

@ -0,0 +1 @@
# RabbitMQ infrastructure for DevOps service

View File

@ -0,0 +1,64 @@
from app.common.config.app_settings import app_settings
from app.common.log.module_logger import ModuleLogger
import asyncio
from asyncio import AbstractEventLoop
import aio_pika
class AsyncMQClient:
exchange_name_format = "freeleaps.devops.exchange.{}"
exchange_type = "direct"
def __init__(self, channel_name: str) -> None:
self.exchange_name_format = AsyncMQClient.exchange_name_format
self.channel_name = channel_name
self.exchange_type = AsyncMQClient.exchange_type
self.exchange_name = self.exchange_name_format.format(self.channel_name)
self.process_callable = None
self.routing_key = self.channel_name
self.module_logger = ModuleLogger(sender_id="AsyncMQClient")
async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None):
retry_count = 0
retry_interval = 1
while retry_count < max_retries:
try:
self.connection = await aio_pika.connect_robust(
host=app_settings.RABBITMQ_HOST,
port=int(app_settings.RABBITMQ_PORT),
login=app_settings.RABBITMQ_USERNAME,
password=app_settings.RABBITMQ_PASSWORD,
virtualhost=app_settings.RABBITMQ_VIRTUAL_HOST,
loop=event_loop,
)
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange(
name=self.exchange_name, type="direct", auto_delete=False
)
self.queue = await self.channel.declare_queue(
name=None, exclusive=True, auto_delete=True, durable=False
)
await self.queue.bind(
exchange=self.exchange, routing_key=self.routing_key
)
break
except Exception as e:
await self.module_logger.log_exception(
exception=e,
text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}",
)
await asyncio.sleep(retry_interval)
retry_interval = min(retry_interval * 2, 60)
retry_count += 1
if retry_count >= max_retries:
raise ConnectionError(
"Unable to connect to RabbitMQ after multiple retries."
)
async def close(self):
"""Unbind the queue and close the connection gracefully."""
await self.queue.unbind(self.exchange, self.routing_key)
await self.connection.close()

View File

@ -0,0 +1,84 @@
from asyncio import AbstractEventLoop
from app.common.log.module_logger import ModuleLogger
import json
import asyncio
from .async_client import AsyncMQClient
class AsyncMQSubscriber(AsyncMQClient):
def __init__(self, channel_name: str) -> None:
super().__init__(channel_name=channel_name)
self.process_callable = None
self.routing_key = self.channel_name
self.consumer_callbacks = {}
self.consumer_callbacks_lock = asyncio.Lock()
self.module_logger = ModuleLogger(sender_id="AsyncMQSubscriber")
async def process_incoming_message(self, message):
"""Processing incoming message from RabbitMQ"""
await message.ack()
body = message.body
if body:
async with self.consumer_callbacks_lock:
for registry_key, callback_info in self.consumer_callbacks.items():
try:
await callback_info["method"](
registry_key, json.loads(body), callback_info["args"]
)
except Exception as err:
await self.module_logger.log_exception(
exception=err,
text=f"Error processing message for consumer '{registry_key}'",
)
async def subscribe(self, max_retries=10, event_loop: AbstractEventLoop = None):
"""Attempts to bind and consume messages, with retry mechanism."""
retries = 0
while retries < max_retries:
try:
await self.bind(max_retries=5, event_loop=event_loop)
await self.queue.consume(
no_ack=False, exclusive=True, callback=self.process_incoming_message
)
break
except Exception as e:
await self.module_logger.log_exception(
exception=e,
text=f"Failed to subscribe at {retries} time, will retry",
)
retries += 1
await asyncio.sleep(5)
else:
await self.module_logger.log_exception(
exception=ConnectionError(
f"Exceeded max retries ({max_retries}) for subscription."
),
text=f"Subscription failed for {self.channel_name} after {max_retries} attempts.",
)
async def register_consumer(
self,
registry_key: str,
callback_method,
args: dict,
):
"""Register a consumer callback with a unique key."""
async with self.consumer_callbacks_lock:
self.consumer_callbacks[registry_key] = {
"method": callback_method,
"args": args,
}
async def unregister_consumer(
self,
registry_key: str,
):
"""Unregister a consumer callback by its key."""
async with self.consumer_callbacks_lock:
if registry_key in self.consumer_callbacks:
del self.consumer_callbacks[registry_key]
async def clear_all_consumers(self):
"""Unregister all consumer callbacks."""
async with self.consumer_callbacks_lock:
self.consumer_callbacks.clear()

View File

@ -0,0 +1,100 @@
from datetime import datetime
from typing import Dict, Literal
from app.common.log.module_logger import ModuleLogger
from app.common.models.deployment.deployment import Deployment
from app.common.models.deployment.heartbeat import DevOpsReconcileJobHeartbeatMessage
class DeploymentStatusUpdateService:
def __init__(self):
self.module_logger = ModuleLogger(sender_id="DeploymentStatusUpdateService")
# Status mapping from heartbeat to deployment model
self.status_mapping: Dict[str, Literal["started", "failed", "succeeded", "aborted"]] = {
"running": "started",
"success": "succeeded",
"failed": "failed",
"terminated": "aborted"
}
# Phase to stage mapping for more detailed tracking
self.phase_to_stage_mapping: Dict[str, str] = {
"initializing": "initialization",
"jenkins_build": "build",
"building": "build",
"deploying": "deployment",
"finished": "completed"
}
async def process_heartbeat_message(self, registry_key: str, message_data: dict, args: dict):
"""Process incoming heartbeat message and update deployment status"""
# registry_key and args are provided by the message queue framework but not used in this implementation
_ = registry_key, args
try:
# Parse the message using our Pydantic model
heartbeat_message = DevOpsReconcileJobHeartbeatMessage(**message_data)
payload = heartbeat_message.payload
await self.module_logger.log_info(
text=f"Processing heartbeat for deployment {payload.id}: {payload.status} - {payload.phase}",
data={"deployment_id": payload.id, "status": payload.status, "phase": payload.phase}
)
# Find the deployment by ID
deployment = await Deployment.find_one(Deployment.deployment_id == payload.id)
if not deployment:
await self.module_logger.log_warning(
text=f"Deployment not found: {payload.id}",
data={"deployment_id": payload.id}
)
return
# Map heartbeat status to deployment status
if payload.status in self.status_mapping:
deployment.deployment_status = self.status_mapping[payload.status]
else:
await self.module_logger.log_warning(
text=f"Unknown status received: {payload.status}",
data={"deployment_id": payload.id, "status": payload.status}
)
return
# Map phase to deployment stage
if payload.phase in self.phase_to_stage_mapping:
deployment.deployment_stage = self.phase_to_stage_mapping[payload.phase]
else:
deployment.deployment_stage = payload.phase
# Update app URL if provided and deployment is successful
if payload.url and payload.status == "success":
deployment.deployment_app_url = payload.url
# Update timestamp
deployment.updated_at = datetime.now()
# Save the updated deployment
await deployment.save()
await self.module_logger.log_info(
text=f"Updated deployment {payload.id}: status={deployment.deployment_status}, stage={deployment.deployment_stage}",
data={
"deployment_id": payload.id,
"status": deployment.deployment_status,
"stage": deployment.deployment_stage,
"app_url": deployment.deployment_app_url,
"error": payload.error
}
)
# Log errors if present
if payload.error:
await self.module_logger.log_error(
text=f"Deployment {payload.id} failed: {payload.error}",
data={"deployment_id": payload.id, "error": payload.error, "phase": payload.phase}
)
except Exception as e:
await self.module_logger.log_exception(
exception=e,
text=f"Error processing heartbeat message: {message_data}",
)

View File

@ -9,6 +9,7 @@ from app.providers import database
from app.providers import metrics from app.providers import metrics
from app.providers import probes from app.providers import probes
from app.providers import exception_handler from app.providers import exception_handler
from app.providers import message_queue
from app.common.config.app_settings import app_settings from app.common.config.app_settings import app_settings
def create_app() -> FastAPI: def create_app() -> FastAPI:
@ -21,6 +22,7 @@ def create_app() -> FastAPI:
register(app, database) register(app, database)
register(app, router) register(app, router)
register(app, common) register(app, common)
register(app, message_queue)
# Call the custom_openapi function to change the OpenAPI version # Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app) customize_openapi_security(app)

View File

@ -0,0 +1,46 @@
from .application_logger import ApplicationLogger
class ModuleLogger(ApplicationLogger):
def __init__(self, sender_id: str) -> None:
super().__init__()
self.event_sender_id = sender_id
self.event_receiver_id = "ModuleLogger"
self.event_subject = "module"
async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None:
return await super().log_exception(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
exception=exception,
text=text,
properties=properties,
)
async def log_info(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_info(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)
async def log_warning(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_warning(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)
async def log_error(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_error(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)

View File

@ -0,0 +1,17 @@
from typing import Literal, Optional
from pydantic import BaseModel
class DevOpsReconcileJobHeartbeatPayload(BaseModel):
operation: Literal["heartbeat"] = "heartbeat"
id: str
status: Literal["running", "success", "failed", "terminated"]
phase: Literal["initializing", "jenkins_build", "building", "deploying", "finished"]
phase_message: str
error: Optional[str] = None
url: Optional[str] = None
class DevOpsReconcileJobHeartbeatMessage(BaseModel):
event_type: Literal["DevOpsReconcileJobHeartbeat"]
payload: DevOpsReconcileJobHeartbeatPayload

View File

@ -0,0 +1,30 @@
import asyncio
from app.backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService
def register(app):
# Initialize the message subscriber and status update service
app.deployment_heartbeat_subscriber = AsyncMQSubscriber("devops_reconcile_heartbeat")
app.deployment_status_service = DeploymentStatusUpdateService()
@app.on_event("startup")
async def start_message_consumers():
# Register the heartbeat processor
await app.deployment_heartbeat_subscriber.register_consumer(
registry_key="deployment_heartbeat_processor",
callback_method=app.deployment_status_service.process_heartbeat_message,
args={}
)
# Start the subscriber
loop = asyncio.get_running_loop()
await loop.create_task(
app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop)
)
@app.on_event("shutdown")
async def stop_message_consumers():
# Clear consumers and close connection
await app.deployment_heartbeat_subscriber.clear_all_consumers()
await app.deployment_heartbeat_subscriber.close()

View File

@ -11,4 +11,6 @@ httpx==0.24.0
pydantic-settings~=2.9.1 pydantic-settings~=2.9.1
pymongo~=4.12.1 pymongo~=4.12.1
pydantic~=2.11.4 pydantic~=2.11.4
requests~=2.32.3 requests~=2.32.3
aio-pika==9.4.3
pytest-asyncio==0.24.0

View File

@ -0,0 +1,216 @@
import pytest
from unittest.mock import AsyncMock
from datetime import datetime
from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService
from app.common.models.deployment.deployment import Deployment
@pytest.fixture
def status_update_service():
return DeploymentStatusUpdateService()
@pytest.fixture
def sample_heartbeat_message():
return {
"event_type": "DevOpsReconcileJobHeartbeat",
"payload": {
"operation": "heartbeat",
"id": "deployment-123-abc",
"status": "running",
"phase": "building",
"phase_message": "Building container image",
"error": None,
"url": None
}
}
@pytest.fixture
def sample_success_message():
return {
"event_type": "DevOpsReconcileJobHeartbeat",
"payload": {
"operation": "heartbeat",
"id": "deployment-789-ghi",
"status": "success",
"phase": "finished",
"phase_message": "Deployment completed successfully",
"error": None,
"url": "https://my-app-alpha.freeleaps.com"
}
}
@pytest.fixture
def sample_failed_message():
return {
"event_type": "DevOpsReconcileJobHeartbeat",
"payload": {
"operation": "heartbeat",
"id": "deployment-456-def",
"status": "failed",
"phase": "jenkins_build",
"phase_message": "Build failed due to compilation errors",
"error": "Build step 'Invoke top-level Maven targets' marked build as failure",
"url": None
}
}
@pytest.fixture
def mock_deployment():
from unittest.mock import AsyncMock
class MockDeployment:
def __init__(self):
self.deployment_id = "deployment-123-abc"
self.deployment_status = "started"
self.deployment_stage = "initialization"
self.deployment_app_url = ""
self.updated_at = datetime.now()
self.save = AsyncMock()
return MockDeployment()
class TestDeploymentStatusUpdateService:
@pytest.mark.asyncio
async def test_status_mapping(self, status_update_service):
"""Test that status mapping works correctly"""
assert status_update_service.status_mapping["running"] == "started"
assert status_update_service.status_mapping["success"] == "succeeded"
assert status_update_service.status_mapping["failed"] == "failed"
assert status_update_service.status_mapping["terminated"] == "aborted"
@pytest.mark.asyncio
async def test_phase_to_stage_mapping(self, status_update_service):
"""Test that phase to stage mapping works correctly"""
assert status_update_service.phase_to_stage_mapping["initializing"] == "initialization"
assert status_update_service.phase_to_stage_mapping["jenkins_build"] == "build"
assert status_update_service.phase_to_stage_mapping["building"] == "build"
assert status_update_service.phase_to_stage_mapping["deploying"] == "deployment"
assert status_update_service.phase_to_stage_mapping["finished"] == "completed"
@pytest.mark.asyncio
async def test_process_running_heartbeat_message(self, status_update_service, sample_heartbeat_message, mock_deployment, monkeypatch):
"""Test processing a running status heartbeat"""
# Mock Deployment.find_one to return our mock deployment
async def mock_find_one(query):
_ = query # Parameter required by interface but not used in mock
return mock_deployment
# Mock the logger methods to avoid actual logging during tests
status_update_service.module_logger.log_info = AsyncMock()
status_update_service.module_logger.log_warning = AsyncMock()
status_update_service.module_logger.log_error = AsyncMock()
status_update_service.module_logger.log_exception = AsyncMock()
# Mock the Beanie query mechanism properly
mock_deployment_class = AsyncMock()
mock_deployment_class.find_one = mock_find_one
monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class)
await status_update_service.process_heartbeat_message(
"test_key", sample_heartbeat_message, {}
)
# Verify the deployment was updated correctly
assert mock_deployment.deployment_status == "started"
assert mock_deployment.deployment_stage == "build"
mock_deployment.save.assert_called_once()
@pytest.mark.asyncio
async def test_process_success_heartbeat_message(self, status_update_service, sample_success_message, mock_deployment, monkeypatch):
"""Test processing a success status heartbeat with URL"""
async def mock_find_one(query):
_ = query # Parameter required by interface but not used in mock
return mock_deployment
# Mock the logger methods
status_update_service.module_logger.log_info = AsyncMock()
status_update_service.module_logger.log_warning = AsyncMock()
status_update_service.module_logger.log_error = AsyncMock()
status_update_service.module_logger.log_exception = AsyncMock()
# Mock the Beanie query mechanism properly
mock_deployment_class = AsyncMock()
mock_deployment_class.find_one = mock_find_one
monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class)
await status_update_service.process_heartbeat_message(
"test_key", sample_success_message, {}
)
# Verify the deployment was updated correctly
assert mock_deployment.deployment_status == "succeeded"
assert mock_deployment.deployment_stage == "completed"
assert mock_deployment.deployment_app_url == "https://my-app-alpha.freeleaps.com"
mock_deployment.save.assert_called_once()
@pytest.mark.asyncio
async def test_process_failed_heartbeat_message(self, status_update_service, sample_failed_message, mock_deployment, monkeypatch):
"""Test processing a failed status heartbeat"""
async def mock_find_one(query):
_ = query # Parameter required by interface but not used in mock
return mock_deployment
# Mock the logger methods
status_update_service.module_logger.log_info = AsyncMock()
status_update_service.module_logger.log_warning = AsyncMock()
status_update_service.module_logger.log_error = AsyncMock()
status_update_service.module_logger.log_exception = AsyncMock()
# Mock the Beanie query mechanism properly
mock_deployment_class = AsyncMock()
mock_deployment_class.find_one = mock_find_one
monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class)
await status_update_service.process_heartbeat_message(
"test_key", sample_failed_message, {}
)
# Verify the deployment was updated correctly
assert mock_deployment.deployment_status == "failed"
assert mock_deployment.deployment_stage == "build"
mock_deployment.save.assert_called_once()
@pytest.mark.asyncio
async def test_deployment_not_found(self, status_update_service, sample_heartbeat_message, monkeypatch):
"""Test handling when deployment is not found"""
async def mock_find_one(query):
_ = query # Parameter required by interface but not used in mock
return None
# Mock the logger methods
status_update_service.module_logger.log_info = AsyncMock()
status_update_service.module_logger.log_warning = AsyncMock()
status_update_service.module_logger.log_error = AsyncMock()
status_update_service.module_logger.log_exception = AsyncMock()
# Mock the Beanie query mechanism properly
mock_deployment_class = AsyncMock()
mock_deployment_class.find_one = mock_find_one
monkeypatch.setattr("app.backend.services.deployment_status_update_service.Deployment", mock_deployment_class)
# Should not raise an exception
await status_update_service.process_heartbeat_message(
"test_key", sample_heartbeat_message, {}
)
@pytest.mark.asyncio
async def test_invalid_message_format(self, status_update_service):
"""Test handling invalid message format"""
invalid_message = {"invalid": "format"}
# Mock the logger methods
status_update_service.module_logger.log_info = AsyncMock()
status_update_service.module_logger.log_warning = AsyncMock()
status_update_service.module_logger.log_error = AsyncMock()
status_update_service.module_logger.log_exception = AsyncMock()
# Should not raise an exception due to try/catch in the method
await status_update_service.process_heartbeat_message(
"test_key", invalid_message, {}
)