From 1510010091a774c08b60d4a6309b76fddab1c606 Mon Sep 17 00:00:00 2001 From: Jet Li Date: Fri, 25 Oct 2024 02:51:50 +0000 Subject: [PATCH] Temp commit for notification --- app/central_storage/Dockerfile | 2 +- .../backend/application/notification_hub.py | 38 ++- .../backend/business/notification_manager.py | 253 +++++++++++++++++- .../backend/infra/rabbitmq/__init__.py | 0 .../backend/infra/rabbitmq/async_client.py | 64 +++++ .../backend/infra/rabbitmq/async_publisher.py | 50 ++++ .../infra/rabbitmq/async_subscriber.py | 85 ++++++ app/notification/backend/models/constants.py | 127 +++++++++ .../notification_publisher_service.py | 16 ++ .../common/config/app_settings.py | 10 +- .../webapi/routes/send_notification.py | 50 ++++ docker-compose.yaml | 26 +- 12 files changed, 696 insertions(+), 25 deletions(-) create mode 100644 app/notification/backend/infra/rabbitmq/__init__.py create mode 100644 app/notification/backend/infra/rabbitmq/async_client.py create mode 100644 app/notification/backend/infra/rabbitmq/async_publisher.py create mode 100644 app/notification/backend/infra/rabbitmq/async_subscriber.py create mode 100644 app/notification/backend/models/constants.py create mode 100644 app/notification/backend/services/notification_publisher_service.py create mode 100644 app/notification/webapi/routes/send_notification.py diff --git a/app/central_storage/Dockerfile b/app/central_storage/Dockerfile index 4aa5175..9aeaf6c 100644 --- a/app/central_storage/Dockerfile +++ b/app/central_storage/Dockerfile @@ -16,4 +16,4 @@ EXPOSE 8005 # Run the application using the start script -CMD ["uvicorn", "app.central_storage.webapi.main:app", "--reload", "--port=8005", "--host=0.0.0.0", "--log-level", "warning"] +CMD ["uvicorn", "app.central_storage.webapi.main:app", "--reload", "--port=8005", "--host=0.0.0.0"] diff --git a/app/notification/backend/application/notification_hub.py b/app/notification/backend/application/notification_hub.py index 1323e4e..98a4855 100644 --- a/app/notification/backend/application/notification_hub.py +++ b/app/notification/backend/application/notification_hub.py @@ -1,11 +1,37 @@ -from app.notification.backend.business.notification_manager import ( - NotificationManager, -) +from typing import Dict +from app.notification.backend.business.notification_manager import NotificationManager +from app.notification.backend.models.constants import NotificationChannel class NotificationHub: - def __init__( + def __init__(self): + pass + + async def enqueue_notification( self, + sender_id: str, # Added sender_id here + channels: list[str], # Accept multiple channels as a list of strings + receiver_id: str, + subject: str, + event: str, + properties: Dict, ): - self.notification_manager = NotificationManager() - return + # Convert string channels to NotificationChannel enums + notification_channels = [] + for channel_str in channels: + try: + notification_channels.append(NotificationChannel[channel_str.upper()]) + except KeyError: + raise ValueError(f"Unsupported notification channel: {channel_str}") + + # Initialize NotificationManager with sender_id + notification_manager = NotificationManager(sender_id=sender_id) + + # Call the enqueue_notification method in NotificationManager + return await notification_manager.enqueue_notification( + channels=notification_channels, + receiver_id=receiver_id, + subject=subject, + event=event, + properties=properties, + ) diff --git a/app/notification/backend/business/notification_manager.py b/app/notification/backend/business/notification_manager.py index 57e663b..4c6f24f 100644 --- a/app/notification/backend/business/notification_manager.py +++ b/app/notification/backend/business/notification_manager.py @@ -1,10 +1,261 @@ +from typing import Dict from app.notification.backend.services.sms_service import SmsService from app.notification.backend.services.in_app_notif_service import InAppNotifService from app.notification.backend.services.email_service import EmailService +from app.notification.backend.services.notification_publisher_service import ( + NotificationPublisherService, +) +from app.notification.backend.models.constants import ( + NotificationChannel, + NotificationMessage, + SystemNotifications, +) +import threading +from datetime import datetime, timezone +from typing import Optional, Type +from types import TracebackType +from infra.models.constants import UserRegion +from app.notification.backend.services.notification_publisher_service import ( + NotificationPublisherService, +) +from app.notification.common.config.app_settings import app_settings +from datetime import datetime, timezone class NotificationManager: - def __init__(self) -> None: + notification_queues: dict[NotificationChannel, NotificationPublisherService] = None + instance_counter = 0 + instance_counter_lock = threading.Lock() + + def __init__( + self, + sender_id: str, # Require sender_id in the constructor + ) -> None: + self.notification_publisher_service = NotificationPublisherService() self.sms_service = SmsService() self.in_app_notif_service = InAppNotifService() self.email_service = EmailService() + self.sender_id = sender_id + self.notification_queues = NotificationManager.__create_notification_queues__() + + @staticmethod + def __increment_instance_counter__() -> int: + with NotificationManager.instance_counter_lock: + NotificationManager.instance_counter += 1 + return NotificationManager.instance_counter + + @staticmethod + def __decrement_instance_counter__() -> int: + with NotificationManager.instance_counter_lock: + NotificationManager.instance_counter -= 1 + return NotificationManager.instance_counter + + @staticmethod + def __create_notification_queues__() -> None: + if not NotificationManager.notification_queues: + NotificationManager.notification_queues = {} + for channel in NotificationChannel: + NotificationManager.notification_queues[channel] = ( + NotificationPublisherService(channel=channel) + ) + return NotificationManager.notification_queues + + async def __aenter__(self): + if NotificationManager.__increment_instance_counter__() == 1: + for channel in NotificationManager.notification_queues: + await NotificationManager.notification_queues[channel].bind() + + async def __aexit__( + self, + exctype: Optional[Type[BaseException]], + excinst: Optional[BaseException], + exctb: Optional[TracebackType], + ): + if NotificationManager.__decrement_instance_counter__() == 0: + for channel in NotificationManager.notification_queues: + await NotificationManager.notification_queues[channel].close() + + async def __publish_notification__( + self, + channel: NotificationChannel, + message: NotificationMessage, + ) -> None: + message.properties["publish_time"] = datetime.now(timezone.utc).isoformat() + await self.notification_queues[channel].publish( + message=message.model_dump_json() + ) + + async def __generate_message_from_subject_and_event__( + self, + subject: str, + event: str, + properties: dict, + region: Optional[UserRegion] = None, + ) -> str: + # leverage the information in properties to enrich the message. + message_subject = None + message = None + if subject.lower() == "payment": + pass + + # Default region to be international if not set + if region is None: + region = UserRegion.OTHER + + message_subject = SystemNotifications[region][subject.lower()][event.lower()][ + "message_subject" + ] + message = SystemNotifications[region][subject.lower()][event.lower()]["message"] + + if event.lower() == "authentication": + message = message.format(properties["auth_code"]) + if not message: + raise RuntimeError("unsupported event:{}".format(event)) + return message, message_subject + + async def send_in_app_notification( + self, receiver_id: str, subject: str, event: str, properties: dict = None + ) -> None: + await self.__publish_notification__( + channel=NotificationChannel.IN_APP, + message=NotificationMessage( + sender_id=app_settings.SYSTEM_USER_ID, + receiver_id=receiver_id, + subject=subject, + event=event, + properties=properties, + ), + ) + + async def send_chat_message_notification( + self, receiver_id: str, subject: str, event: str, properties: dict = None + ) -> None: + ( + content_text, + content_subject, + ) = await self.__generate_message_from_subject_and_event__( + subject=subject, event=event, properties=properties + ) + properties_dict = { + "content_text": content_text, + "content_subject": content_subject, + "receiver_type": "user", # or 'conversation' + } + + await self.__publish_notification__( + channel=NotificationChannel.CHAT_MESSAGE, + message=NotificationMessage( + sender_id=app_settings.SYSTEM_USER_ID, + receiver_id=receiver_id, + subject=subject, + event=event, + properties=( + {**properties_dict, **properties} if properties else properties_dict + ), + ), + ) + + async def send_email_notification( + self, + receiver_id: str, + subject: str, + event: str, + properties: dict = None, + region: Optional[UserRegion] = None, + ) -> None: + ( + content_text, + content_subject, + ) = await self.__generate_message_from_subject_and_event__( + subject=subject, event=event, properties=properties, region=region + ) + properties_dict = { + "content_text": content_text, + "content_subject": content_subject, + "sender_email": app_settings.EMAIL_FROM, + "receiver_type": "email", + } + await self.__publish_notification__( + channel=NotificationChannel.EMAIL, + message=NotificationMessage( + sender_id=app_settings.SYSTEM_USER_ID, + receiver_id=receiver_id, + subject=subject, + event=event, + properties=( + properties_dict.update(properties) + if not properties + else properties_dict + ), + ), + ) + + async def send_sms_notification( + self, receiver_id: str, subject: str, event: str, properties: dict = None + ) -> None: + ( + content_text, + content_subject, + ) = await self.__generate_message_from_subject_and_event__( + subject=subject, event=event, properties=properties + ) + properties_dict = { + "content_text": content_text, + "content_subject": content_subject, + "sender_mobile": app_settings.SMS_FROM, + "receiver_type": "mobile", + } + await self.__publish_notification__( + channel=NotificationChannel.SMS, + message=NotificationMessage( + sender_id=app_settings.SYSTEM_USER_ID, + receiver_id=receiver_id, + subject=subject, + event=event, + properties=( + properties_dict.update(properties) + if not properties + else properties_dict + ), + ), + ) + + async def enqueue_notification( + self, + channels: list[NotificationChannel], + receiver_id: str, + subject: str, + event: str, + properties: dict = None, + ) -> None: + for channel in channels: + if channel == NotificationChannel.CHAT_MESSAGE: + await self.send_chat_message_notification( + receiver_id=receiver_id, + subject=subject, + event=event, + properties=properties, + ) + elif channel == NotificationChannel.IN_APP: + await self.send_in_app_notification( + receiver_id=receiver_id, + subject=subject, + event=event, + properties=properties, + ) + elif channel == NotificationChannel.EMAIL: + await self.send_email_notification( + receiver_id=receiver_id, + subject=subject, + event=event, + properties=properties, + ) + elif channel == NotificationChannel.SMS: + await self.send_sms_notification( + receiver_id=receiver_id, + subject=subject, + event=event, + properties=properties, + ) + else: + raise RuntimeError(f"Unsupported notification channel: {channel}") diff --git a/app/notification/backend/infra/rabbitmq/__init__.py b/app/notification/backend/infra/rabbitmq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/notification/backend/infra/rabbitmq/async_client.py b/app/notification/backend/infra/rabbitmq/async_client.py new file mode 100644 index 0000000..c480ca3 --- /dev/null +++ b/app/notification/backend/infra/rabbitmq/async_client.py @@ -0,0 +1,64 @@ +from app.notification.common.config.app_settings import app_settings +from infra.log.module_logger import ModuleLogger +import asyncio +from asyncio import AbstractEventLoop +import aio_pika + + +class AsyncMQClient: + exchange_name_format = "freeleaps.notification.exchange.{}" + exchange_type = "direct" + + def __init__(self, channel_name: str) -> None: + self.exchange_name_format = AsyncMQClient.exchange_name_format + self.channel_name = channel_name + self.exchange_type = AsyncMQClient.exchange_type + self.exchange_name = self.exchange_name_format.format(self.channel_name) + self.process_callable = None + self.routing_key = self.channel_name + self.module_logger = ModuleLogger(sender_id="AsyncMQClient") + + async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None): + retry_count = 0 + retry_interval = 1 # Start with a 1-second interval + + while retry_count < max_retries: + try: + self.connection = await aio_pika.connect_robust( + host=app_settings.RABBITMQ_HOST, + port=int(app_settings.RABBITMQ_PORT), + loop=event_loop, + ) + self.channel = await self.connection.channel() + self.exchange = await self.channel.declare_exchange( + name=self.exchange_name, type="direct", auto_delete=False + ) + # Declare and bind queue if it's not set by a specific client + + self.queue = await self.channel.declare_queue( + name=None, exclusive=True, auto_delete=True, durable=False + ) + await self.queue.bind( + exchange=self.exchange, routing_key=self.routing_key + ) + break # Exit loop once connected + except Exception as e: + await self.module_logger.log_exception( + exception=e, + text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}", + ) + await asyncio.sleep(retry_interval) + retry_interval = min( + retry_interval * 2, 60 + ) # Exponential backoff, up to 60s max + retry_count += 1 + + if retry_count >= max_retries: + raise ConnectionError( + "Unable to connect to RabbitMQ after multiple retries." + ) + + async def close(self): + """Unbind the queue and close the connection gracefully.""" + await self.queue.unbind(self.exchange, self.routing_key) + await self.connection.close() diff --git a/app/notification/backend/infra/rabbitmq/async_publisher.py b/app/notification/backend/infra/rabbitmq/async_publisher.py new file mode 100644 index 0000000..594753d --- /dev/null +++ b/app/notification/backend/infra/rabbitmq/async_publisher.py @@ -0,0 +1,50 @@ +from infra.log.module_logger import ModuleLogger +from .async_client import AsyncMQClient +import aio_pika +import json +import asyncio + + +class AsyncMQPublisher(AsyncMQClient): + def __init__(self, channel_name: str) -> None: + super().__init__(channel_name=channel_name) + self.module_logger = ModuleLogger(sender_id="AsyncMQPublisher") + + async def publish(self, message: str | object, max_retries: int = 3): + retries = 0 + while retries < max_retries: + try: + if not hasattr(self, "exchange") or self.exchange is None: + # Ensure the exchange is bound before publishing + await self.bind() + + await self.exchange.publish( + aio_pika.Message( + bytes( + ( + message + if isinstance(message, str) + else json.dumps(message) + ), + "utf-8", + ), + content_type="text/plain", + ), + self.routing_key, + ) + return # Exit after successful publish + except aio_pika.exceptions.ChannelInvalidStateError as e: + retries += 1 + await self.module_logger.log_exception( + exception=e, + text=f"Attempting reconnect and retry {retries}/{max_retries} " + "for publish ran into ChannelInvalidStateError", + ) + await asyncio.sleep(2) # Short delay before retrying + await self.bind() # Rebind to re-establish connection/channel + + # Log final failure if retries are exhausted + await self.module_logger.log_exception( + exception=ConnectionError("Unable to publish after max retries."), + text=f"Publish failed after {max_retries} retries.", + ) diff --git a/app/notification/backend/infra/rabbitmq/async_subscriber.py b/app/notification/backend/infra/rabbitmq/async_subscriber.py new file mode 100644 index 0000000..07a3ef6 --- /dev/null +++ b/app/notification/backend/infra/rabbitmq/async_subscriber.py @@ -0,0 +1,85 @@ +from asyncio import AbstractEventLoop +from infra.log.module_logger import ModuleLogger +import json +import asyncio +from .async_client import AsyncMQClient + + +class AsyncMQSubscriber(AsyncMQClient): + def __init__(self, channel_name: str) -> None: + super().__init__(channel_name=channel_name) + self.process_callable = None + self.routing_key = self.channel_name + self.consumer_callbacks = {} + self.consumer_callbacks_lock = asyncio.Lock() # Async lock for async context + self.module_logger = ModuleLogger(sender_id="AsyncMQSubscriber") + + async def process_incoming_message(self, message): + """Processing incoming message from RabbitMQ""" + await message.ack() + body = message.body + if body: + async with self.consumer_callbacks_lock: # Use async lock for safe concurrent access + for registry_key, callback_info in self.consumer_callbacks.items(): + try: + await callback_info["method"]( + registry_key, json.loads(body), callback_info["args"] + ) + except Exception as err: + # Log each exception that occurs within callback processing + await self.module_logger.log_exception( + exception=err, + text=f"Error processing message for consumer '{registry_key}'", + ) + + async def subscribe(self, max_retries=10, event_loop: AbstractEventLoop = None): + """Attempts to bind and consume messages, with retry mechanism.""" + retries = 0 + while retries < max_retries: + try: + await self.bind(max_retries=5, event_loop=event_loop) + await self.queue.consume( + no_ack=False, exclusive=True, callback=self.process_incoming_message + ) + break # Exit loop if subscription is successful + except Exception as e: + await self.module_logger.log_exception( + exception=e, + text=f"Failed to subscribe at {retries} time, will retry", + ) + retries += 1 + await asyncio.sleep(5) # Delay before retrying + else: + await self.module_logger.log_exception( + exception=ConnectionError( + f"Exceeded max retries ({max_retries}) for subscription." + ), + text=f"Subscription failed for {self.channel_name} after {max_retries} attempts.", + ) + + async def register_consumer( + self, + registry_key: str, # a unique string to identify the callback + callback_method, + args: dict, + ): + """Register a consumer callback with a unique key.""" + async with self.consumer_callbacks_lock: + self.consumer_callbacks[registry_key] = { + "method": callback_method, + "args": args, + } + + async def unregister_consumer( + self, + registry_key: str, # a unique string to identify the callback + ): + """Unregister a consumer callback by its key.""" + async with self.consumer_callbacks_lock: + if registry_key in self.consumer_callbacks: + del self.consumer_callbacks[registry_key] + + async def clear_all_consumers(self): + """Unregister all consumer callbacks.""" + async with self.consumer_callbacks_lock: + self.consumer_callbacks.clear() diff --git a/app/notification/backend/models/constants.py b/app/notification/backend/models/constants.py new file mode 100644 index 0000000..205a139 --- /dev/null +++ b/app/notification/backend/models/constants.py @@ -0,0 +1,127 @@ +from enum import Enum +from pydantic import BaseModel +from infra.models.constants import UserRegion + + +class NotificationChannel(Enum): + IN_APP = 0 + CHAT_MESSAGE = 1 + EMAIL = 2 + SMS = 3 + + +class NotificationMessage(BaseModel): + sender_id: str + receiver_id: str + subject: str + event: str + properties: dict + + +SystemNotifications = { + UserRegion.OTHER: { + "request": { # Subject + "quoted": { # Event + "message_subject": "update for your request", + "message": "There is a quote for your request", + }, + "unquoted": { + "message_subject": "update for your request", + "message": "A quote for your request has been withdrawed", + }, + "invite": { + "message_subject": "You are invited", + "message": "You are invited to provide proposal for a request.", + }, + "chat_created": { + "message_subject": "new chat created", + "message": "This chat group is created for the project.", + }, + }, + "quote": { # Subject + "accepted": { # Event + "message_subject": "update for your quote", + "message": "Your quote has been accepted", + }, + "rejected": { + "message_subject": "update for your quote", + "message": "Your quote has been rejected", + }, + }, + "milestone": { # Subject + "state-change": { # Event + "message_subject": "update for your milestone", + "message": "Your project milestone has a state change.", + } + }, + "email": { # Subject + "authentication": { # Event + "message_subject": "Freeleaps Support", + "message": "The auth code is: {} . \r\nDo not share this to anyone.", + } + }, + "mobile": { # Subject + "authentication": { # Event + "message_subject": "Freeleaps Support", + "message": "The auth code is: {} . \r\nDo not share this to anyone.", + } + }, + "message": { # Subject + "new": { # Event + "message_subject": "new messages", + "message": "There are new messages.", + } + }, + }, + UserRegion.ZH_CN: { + "request": { # Subject + "quoted": { # Event + "message_subject": "你的请求有更新", + "message": "这里有一条关于你的请求的报价", + }, + "unquoted": { + "message_subject": "你的请求有更新", + "message": "一条你的请求的报价被拒绝", + }, + "invite": { + "message_subject": "有人邀请你", + "message": "你被邀请为一个请求提供报价", + }, + "chat_created": { + "message_subject": "创建了聊天群组", + "message": "本项目的聊天群组以及建立完成.", + }, + }, + "quote": { # Subject + "accepted": { + "message_subject": "你的报价有更新", + "message": "你的报价已被接受", + }, # Event + "rejected": { + "message_subject": "你的报价有更新", + "message": "你的报价被拒绝", + }, + }, + "milestone": { # Subject + "state-change": { # Event + "message_subject": "你的里程碑有更新", + "message": "你的项目里程碑发生了状态改变", + } + }, + "email": { # Subject + "authentication": { # Event + "message_subject": "自由跳跃技术支持", + "message": "安全认证码为: {} . \r\n切勿分享给他人", + } + }, + "mobile": { # Subject + "authentication": { # Event + "message_subject": "自由跳跃技术支持", + "message": "安全认证码为: {} . \r\n切勿分享给他人", + } + }, + "message": { # Subject + "new": {"message_subject": "新消息", "message": "你有新的会话消息"} # Event + }, + }, +} diff --git a/app/notification/backend/services/notification_publisher_service.py b/app/notification/backend/services/notification_publisher_service.py new file mode 100644 index 0000000..c0a311f --- /dev/null +++ b/app/notification/backend/services/notification_publisher_service.py @@ -0,0 +1,16 @@ +from app.notification.backend.models.constants import NotificationChannel +from backend.infra.rabbitmq.async_publisher import AsyncMQPublisher + + +class NotificationPublisherService: + def __init__(self, channel: NotificationChannel) -> None: + self.mq_client = AsyncMQPublisher(channel.name) + + async def bind(self): + await self.mq_client.bind(max_retries=5) + + async def publish(self, message: str | bytes): + await self.mq_client.publish(message=message) + + async def close(self): + await self.mq_client.close() diff --git a/app/notification/common/config/app_settings.py b/app/notification/common/config/app_settings.py index a0ec4dd..83042aa 100644 --- a/app/notification/common/config/app_settings.py +++ b/app/notification/common/config/app_settings.py @@ -5,10 +5,12 @@ from pydantic_settings import BaseSettings class AppSettings(BaseSettings): NAME: str = "notification" - AZURE_STORAGE_DOCUMENT_API_ENDPOINT: str = ( - "https://freeleaps1document.blob.core.windows.net/" - ) - AZURE_STORAGE_DOCUMENT_API_KEY: str = "" + RABBITMQ_HOST: str = "rabbitmq" + RABBITMQ_PORT: int = 5672 + + SYSTEM_USER_ID: str = "" + SMS_FROM: str = "" + EMAIL_FROM: str = "" class Config: env_file = ".myapp.env" diff --git a/app/notification/webapi/routes/send_notification.py b/app/notification/webapi/routes/send_notification.py new file mode 100644 index 0000000..251c81f --- /dev/null +++ b/app/notification/webapi/routes/send_notification.py @@ -0,0 +1,50 @@ +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import JSONResponse +from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR +from app.notification.backend.application.notification_hub import NotificationHub +from pydantic import BaseModel +from app.notification.backend.models.constants import NotificationChannel +from typing import Dict + +router = APIRouter() + + +# Define the request body schema +class NotificationRequest(BaseModel): + sender_id: str + channels: list[str] + receiver_id: str + subject: str + event: str + properties: Dict + + +# Web API +# Send notification to user for a certain channel +@router.post( + "/enqueue_notification", + operation_id="enqueue_notification", + summary="Send notification to user for a certain channel", + description="Send notification to user for a channel (e.g., sms, email, in-app, etc.)", + response_description="Success/failure response in processing the notification send request", +) +# API route to enqueue notifications +@router.post("/enqueue_notification") +async def enqueue_notification(request: NotificationRequest): + try: + notification_hub = NotificationHub() + await notification_hub.enqueue_notification( + sender_id=request.sender_id, # Pass sender_id + channels=request.channels, + receiver_id=request.receiver_id, + subject=request.subject, + event=request.event, + properties=request.properties, + ) + return JSONResponse( + content={"message": "Notifications queued successfully."}, status_code=200 + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail="Failed to enqueue notification") diff --git a/docker-compose.yaml b/docker-compose.yaml index b63a240..c13e6fe 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,17 +1,17 @@ services: - rabbitmq: - image: rabbitmq:3-management - ports: - - "5672:5672" # RabbitMQ communication port - - "15672:15672" # RabbitMQ management port - networks: - - freeleaps_service_hub_network - healthcheck: - test: [ "CMD", "rabbitmq-diagnostics", "ping" ] - interval: 30s - retries: 5 - start_period: 10s - timeout: 10s + # rabbitmq: + # image: rabbitmq:3-management + # ports: + # - "5672:5672" # RabbitMQ communication port + # - "15672:15672" # RabbitMQ management port + # networks: + # - freeleaps_service_hub_network + # healthcheck: + # test: [ "CMD", "rabbitmq-diagnostics", "ping" ] + # interval: 30s + # retries: 5 + # start_period: 10s + # timeout: 10s central_storage: build: