Temp commit for notification

This commit is contained in:
jetli 2024-10-25 02:51:50 +00:00
parent c0986b10bf
commit 1510010091
12 changed files with 696 additions and 25 deletions

View File

@ -16,4 +16,4 @@ EXPOSE 8005
# Run the application using the start script
CMD ["uvicorn", "app.central_storage.webapi.main:app", "--reload", "--port=8005", "--host=0.0.0.0", "--log-level", "warning"]
CMD ["uvicorn", "app.central_storage.webapi.main:app", "--reload", "--port=8005", "--host=0.0.0.0"]

View File

@ -1,11 +1,37 @@
from app.notification.backend.business.notification_manager import (
NotificationManager,
)
from typing import Dict
from app.notification.backend.business.notification_manager import NotificationManager
from app.notification.backend.models.constants import NotificationChannel
class NotificationHub:
def __init__(
def __init__(self):
pass
async def enqueue_notification(
self,
sender_id: str, # Added sender_id here
channels: list[str], # Accept multiple channels as a list of strings
receiver_id: str,
subject: str,
event: str,
properties: Dict,
):
self.notification_manager = NotificationManager()
return
# Convert string channels to NotificationChannel enums
notification_channels = []
for channel_str in channels:
try:
notification_channels.append(NotificationChannel[channel_str.upper()])
except KeyError:
raise ValueError(f"Unsupported notification channel: {channel_str}")
# Initialize NotificationManager with sender_id
notification_manager = NotificationManager(sender_id=sender_id)
# Call the enqueue_notification method in NotificationManager
return await notification_manager.enqueue_notification(
channels=notification_channels,
receiver_id=receiver_id,
subject=subject,
event=event,
properties=properties,
)

View File

@ -1,10 +1,261 @@
from typing import Dict
from app.notification.backend.services.sms_service import SmsService
from app.notification.backend.services.in_app_notif_service import InAppNotifService
from app.notification.backend.services.email_service import EmailService
from app.notification.backend.services.notification_publisher_service import (
NotificationPublisherService,
)
from app.notification.backend.models.constants import (
NotificationChannel,
NotificationMessage,
SystemNotifications,
)
import threading
from datetime import datetime, timezone
from typing import Optional, Type
from types import TracebackType
from infra.models.constants import UserRegion
from app.notification.backend.services.notification_publisher_service import (
NotificationPublisherService,
)
from app.notification.common.config.app_settings import app_settings
from datetime import datetime, timezone
class NotificationManager:
def __init__(self) -> None:
notification_queues: dict[NotificationChannel, NotificationPublisherService] = None
instance_counter = 0
instance_counter_lock = threading.Lock()
def __init__(
self,
sender_id: str, # Require sender_id in the constructor
) -> None:
self.notification_publisher_service = NotificationPublisherService()
self.sms_service = SmsService()
self.in_app_notif_service = InAppNotifService()
self.email_service = EmailService()
self.sender_id = sender_id
self.notification_queues = NotificationManager.__create_notification_queues__()
@staticmethod
def __increment_instance_counter__() -> int:
with NotificationManager.instance_counter_lock:
NotificationManager.instance_counter += 1
return NotificationManager.instance_counter
@staticmethod
def __decrement_instance_counter__() -> int:
with NotificationManager.instance_counter_lock:
NotificationManager.instance_counter -= 1
return NotificationManager.instance_counter
@staticmethod
def __create_notification_queues__() -> None:
if not NotificationManager.notification_queues:
NotificationManager.notification_queues = {}
for channel in NotificationChannel:
NotificationManager.notification_queues[channel] = (
NotificationPublisherService(channel=channel)
)
return NotificationManager.notification_queues
async def __aenter__(self):
if NotificationManager.__increment_instance_counter__() == 1:
for channel in NotificationManager.notification_queues:
await NotificationManager.notification_queues[channel].bind()
async def __aexit__(
self,
exctype: Optional[Type[BaseException]],
excinst: Optional[BaseException],
exctb: Optional[TracebackType],
):
if NotificationManager.__decrement_instance_counter__() == 0:
for channel in NotificationManager.notification_queues:
await NotificationManager.notification_queues[channel].close()
async def __publish_notification__(
self,
channel: NotificationChannel,
message: NotificationMessage,
) -> None:
message.properties["publish_time"] = datetime.now(timezone.utc).isoformat()
await self.notification_queues[channel].publish(
message=message.model_dump_json()
)
async def __generate_message_from_subject_and_event__(
self,
subject: str,
event: str,
properties: dict,
region: Optional[UserRegion] = None,
) -> str:
# leverage the information in properties to enrich the message.
message_subject = None
message = None
if subject.lower() == "payment":
pass
# Default region to be international if not set
if region is None:
region = UserRegion.OTHER
message_subject = SystemNotifications[region][subject.lower()][event.lower()][
"message_subject"
]
message = SystemNotifications[region][subject.lower()][event.lower()]["message"]
if event.lower() == "authentication":
message = message.format(properties["auth_code"])
if not message:
raise RuntimeError("unsupported event:{}".format(event))
return message, message_subject
async def send_in_app_notification(
self, receiver_id: str, subject: str, event: str, properties: dict = None
) -> None:
await self.__publish_notification__(
channel=NotificationChannel.IN_APP,
message=NotificationMessage(
sender_id=app_settings.SYSTEM_USER_ID,
receiver_id=receiver_id,
subject=subject,
event=event,
properties=properties,
),
)
async def send_chat_message_notification(
self, receiver_id: str, subject: str, event: str, properties: dict = None
) -> None:
(
content_text,
content_subject,
) = await self.__generate_message_from_subject_and_event__(
subject=subject, event=event, properties=properties
)
properties_dict = {
"content_text": content_text,
"content_subject": content_subject,
"receiver_type": "user", # or 'conversation'
}
await self.__publish_notification__(
channel=NotificationChannel.CHAT_MESSAGE,
message=NotificationMessage(
sender_id=app_settings.SYSTEM_USER_ID,
receiver_id=receiver_id,
subject=subject,
event=event,
properties=(
{**properties_dict, **properties} if properties else properties_dict
),
),
)
async def send_email_notification(
self,
receiver_id: str,
subject: str,
event: str,
properties: dict = None,
region: Optional[UserRegion] = None,
) -> None:
(
content_text,
content_subject,
) = await self.__generate_message_from_subject_and_event__(
subject=subject, event=event, properties=properties, region=region
)
properties_dict = {
"content_text": content_text,
"content_subject": content_subject,
"sender_email": app_settings.EMAIL_FROM,
"receiver_type": "email",
}
await self.__publish_notification__(
channel=NotificationChannel.EMAIL,
message=NotificationMessage(
sender_id=app_settings.SYSTEM_USER_ID,
receiver_id=receiver_id,
subject=subject,
event=event,
properties=(
properties_dict.update(properties)
if not properties
else properties_dict
),
),
)
async def send_sms_notification(
self, receiver_id: str, subject: str, event: str, properties: dict = None
) -> None:
(
content_text,
content_subject,
) = await self.__generate_message_from_subject_and_event__(
subject=subject, event=event, properties=properties
)
properties_dict = {
"content_text": content_text,
"content_subject": content_subject,
"sender_mobile": app_settings.SMS_FROM,
"receiver_type": "mobile",
}
await self.__publish_notification__(
channel=NotificationChannel.SMS,
message=NotificationMessage(
sender_id=app_settings.SYSTEM_USER_ID,
receiver_id=receiver_id,
subject=subject,
event=event,
properties=(
properties_dict.update(properties)
if not properties
else properties_dict
),
),
)
async def enqueue_notification(
self,
channels: list[NotificationChannel],
receiver_id: str,
subject: str,
event: str,
properties: dict = None,
) -> None:
for channel in channels:
if channel == NotificationChannel.CHAT_MESSAGE:
await self.send_chat_message_notification(
receiver_id=receiver_id,
subject=subject,
event=event,
properties=properties,
)
elif channel == NotificationChannel.IN_APP:
await self.send_in_app_notification(
receiver_id=receiver_id,
subject=subject,
event=event,
properties=properties,
)
elif channel == NotificationChannel.EMAIL:
await self.send_email_notification(
receiver_id=receiver_id,
subject=subject,
event=event,
properties=properties,
)
elif channel == NotificationChannel.SMS:
await self.send_sms_notification(
receiver_id=receiver_id,
subject=subject,
event=event,
properties=properties,
)
else:
raise RuntimeError(f"Unsupported notification channel: {channel}")

View File

@ -0,0 +1,64 @@
from app.notification.common.config.app_settings import app_settings
from infra.log.module_logger import ModuleLogger
import asyncio
from asyncio import AbstractEventLoop
import aio_pika
class AsyncMQClient:
exchange_name_format = "freeleaps.notification.exchange.{}"
exchange_type = "direct"
def __init__(self, channel_name: str) -> None:
self.exchange_name_format = AsyncMQClient.exchange_name_format
self.channel_name = channel_name
self.exchange_type = AsyncMQClient.exchange_type
self.exchange_name = self.exchange_name_format.format(self.channel_name)
self.process_callable = None
self.routing_key = self.channel_name
self.module_logger = ModuleLogger(sender_id="AsyncMQClient")
async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None):
retry_count = 0
retry_interval = 1 # Start with a 1-second interval
while retry_count < max_retries:
try:
self.connection = await aio_pika.connect_robust(
host=app_settings.RABBITMQ_HOST,
port=int(app_settings.RABBITMQ_PORT),
loop=event_loop,
)
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange(
name=self.exchange_name, type="direct", auto_delete=False
)
# Declare and bind queue if it's not set by a specific client
self.queue = await self.channel.declare_queue(
name=None, exclusive=True, auto_delete=True, durable=False
)
await self.queue.bind(
exchange=self.exchange, routing_key=self.routing_key
)
break # Exit loop once connected
except Exception as e:
await self.module_logger.log_exception(
exception=e,
text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}",
)
await asyncio.sleep(retry_interval)
retry_interval = min(
retry_interval * 2, 60
) # Exponential backoff, up to 60s max
retry_count += 1
if retry_count >= max_retries:
raise ConnectionError(
"Unable to connect to RabbitMQ after multiple retries."
)
async def close(self):
"""Unbind the queue and close the connection gracefully."""
await self.queue.unbind(self.exchange, self.routing_key)
await self.connection.close()

View File

@ -0,0 +1,50 @@
from infra.log.module_logger import ModuleLogger
from .async_client import AsyncMQClient
import aio_pika
import json
import asyncio
class AsyncMQPublisher(AsyncMQClient):
def __init__(self, channel_name: str) -> None:
super().__init__(channel_name=channel_name)
self.module_logger = ModuleLogger(sender_id="AsyncMQPublisher")
async def publish(self, message: str | object, max_retries: int = 3):
retries = 0
while retries < max_retries:
try:
if not hasattr(self, "exchange") or self.exchange is None:
# Ensure the exchange is bound before publishing
await self.bind()
await self.exchange.publish(
aio_pika.Message(
bytes(
(
message
if isinstance(message, str)
else json.dumps(message)
),
"utf-8",
),
content_type="text/plain",
),
self.routing_key,
)
return # Exit after successful publish
except aio_pika.exceptions.ChannelInvalidStateError as e:
retries += 1
await self.module_logger.log_exception(
exception=e,
text=f"Attempting reconnect and retry {retries}/{max_retries} "
"for publish ran into ChannelInvalidStateError",
)
await asyncio.sleep(2) # Short delay before retrying
await self.bind() # Rebind to re-establish connection/channel
# Log final failure if retries are exhausted
await self.module_logger.log_exception(
exception=ConnectionError("Unable to publish after max retries."),
text=f"Publish failed after {max_retries} retries.",
)

View File

@ -0,0 +1,85 @@
from asyncio import AbstractEventLoop
from infra.log.module_logger import ModuleLogger
import json
import asyncio
from .async_client import AsyncMQClient
class AsyncMQSubscriber(AsyncMQClient):
def __init__(self, channel_name: str) -> None:
super().__init__(channel_name=channel_name)
self.process_callable = None
self.routing_key = self.channel_name
self.consumer_callbacks = {}
self.consumer_callbacks_lock = asyncio.Lock() # Async lock for async context
self.module_logger = ModuleLogger(sender_id="AsyncMQSubscriber")
async def process_incoming_message(self, message):
"""Processing incoming message from RabbitMQ"""
await message.ack()
body = message.body
if body:
async with self.consumer_callbacks_lock: # Use async lock for safe concurrent access
for registry_key, callback_info in self.consumer_callbacks.items():
try:
await callback_info["method"](
registry_key, json.loads(body), callback_info["args"]
)
except Exception as err:
# Log each exception that occurs within callback processing
await self.module_logger.log_exception(
exception=err,
text=f"Error processing message for consumer '{registry_key}'",
)
async def subscribe(self, max_retries=10, event_loop: AbstractEventLoop = None):
"""Attempts to bind and consume messages, with retry mechanism."""
retries = 0
while retries < max_retries:
try:
await self.bind(max_retries=5, event_loop=event_loop)
await self.queue.consume(
no_ack=False, exclusive=True, callback=self.process_incoming_message
)
break # Exit loop if subscription is successful
except Exception as e:
await self.module_logger.log_exception(
exception=e,
text=f"Failed to subscribe at {retries} time, will retry",
)
retries += 1
await asyncio.sleep(5) # Delay before retrying
else:
await self.module_logger.log_exception(
exception=ConnectionError(
f"Exceeded max retries ({max_retries}) for subscription."
),
text=f"Subscription failed for {self.channel_name} after {max_retries} attempts.",
)
async def register_consumer(
self,
registry_key: str, # a unique string to identify the callback
callback_method,
args: dict,
):
"""Register a consumer callback with a unique key."""
async with self.consumer_callbacks_lock:
self.consumer_callbacks[registry_key] = {
"method": callback_method,
"args": args,
}
async def unregister_consumer(
self,
registry_key: str, # a unique string to identify the callback
):
"""Unregister a consumer callback by its key."""
async with self.consumer_callbacks_lock:
if registry_key in self.consumer_callbacks:
del self.consumer_callbacks[registry_key]
async def clear_all_consumers(self):
"""Unregister all consumer callbacks."""
async with self.consumer_callbacks_lock:
self.consumer_callbacks.clear()

View File

@ -0,0 +1,127 @@
from enum import Enum
from pydantic import BaseModel
from infra.models.constants import UserRegion
class NotificationChannel(Enum):
IN_APP = 0
CHAT_MESSAGE = 1
EMAIL = 2
SMS = 3
class NotificationMessage(BaseModel):
sender_id: str
receiver_id: str
subject: str
event: str
properties: dict
SystemNotifications = {
UserRegion.OTHER: {
"request": { # Subject
"quoted": { # Event
"message_subject": "update for your request",
"message": "There is a quote for your request",
},
"unquoted": {
"message_subject": "update for your request",
"message": "A quote for your request has been withdrawed",
},
"invite": {
"message_subject": "You are invited",
"message": "You are invited to provide proposal for a request.",
},
"chat_created": {
"message_subject": "new chat created",
"message": "This chat group is created for the project.",
},
},
"quote": { # Subject
"accepted": { # Event
"message_subject": "update for your quote",
"message": "Your quote has been accepted",
},
"rejected": {
"message_subject": "update for your quote",
"message": "Your quote has been rejected",
},
},
"milestone": { # Subject
"state-change": { # Event
"message_subject": "update for your milestone",
"message": "Your project milestone has a state change.",
}
},
"email": { # Subject
"authentication": { # Event
"message_subject": "Freeleaps Support",
"message": "The auth code is: {} . \r\nDo not share this to anyone.",
}
},
"mobile": { # Subject
"authentication": { # Event
"message_subject": "Freeleaps Support",
"message": "The auth code is: {} . \r\nDo not share this to anyone.",
}
},
"message": { # Subject
"new": { # Event
"message_subject": "new messages",
"message": "There are new messages.",
}
},
},
UserRegion.ZH_CN: {
"request": { # Subject
"quoted": { # Event
"message_subject": "你的请求有更新",
"message": "这里有一条关于你的请求的报价",
},
"unquoted": {
"message_subject": "你的请求有更新",
"message": "一条你的请求的报价被拒绝",
},
"invite": {
"message_subject": "有人邀请你",
"message": "你被邀请为一个请求提供报价",
},
"chat_created": {
"message_subject": "创建了聊天群组",
"message": "本项目的聊天群组以及建立完成.",
},
},
"quote": { # Subject
"accepted": {
"message_subject": "你的报价有更新",
"message": "你的报价已被接受",
}, # Event
"rejected": {
"message_subject": "你的报价有更新",
"message": "你的报价被拒绝",
},
},
"milestone": { # Subject
"state-change": { # Event
"message_subject": "你的里程碑有更新",
"message": "你的项目里程碑发生了状态改变",
}
},
"email": { # Subject
"authentication": { # Event
"message_subject": "自由跳跃技术支持",
"message": "安全认证码为: {} . \r\n切勿分享给他人",
}
},
"mobile": { # Subject
"authentication": { # Event
"message_subject": "自由跳跃技术支持",
"message": "安全认证码为: {} . \r\n切勿分享给他人",
}
},
"message": { # Subject
"new": {"message_subject": "新消息", "message": "你有新的会话消息"} # Event
},
},
}

View File

@ -0,0 +1,16 @@
from app.notification.backend.models.constants import NotificationChannel
from backend.infra.rabbitmq.async_publisher import AsyncMQPublisher
class NotificationPublisherService:
def __init__(self, channel: NotificationChannel) -> None:
self.mq_client = AsyncMQPublisher(channel.name)
async def bind(self):
await self.mq_client.bind(max_retries=5)
async def publish(self, message: str | bytes):
await self.mq_client.publish(message=message)
async def close(self):
await self.mq_client.close()

View File

@ -5,10 +5,12 @@ from pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
NAME: str = "notification"
AZURE_STORAGE_DOCUMENT_API_ENDPOINT: str = (
"https://freeleaps1document.blob.core.windows.net/"
)
AZURE_STORAGE_DOCUMENT_API_KEY: str = ""
RABBITMQ_HOST: str = "rabbitmq"
RABBITMQ_PORT: int = 5672
SYSTEM_USER_ID: str = ""
SMS_FROM: str = ""
EMAIL_FROM: str = ""
class Config:
env_file = ".myapp.env"

View File

@ -0,0 +1,50 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
from app.notification.backend.application.notification_hub import NotificationHub
from pydantic import BaseModel
from app.notification.backend.models.constants import NotificationChannel
from typing import Dict
router = APIRouter()
# Define the request body schema
class NotificationRequest(BaseModel):
sender_id: str
channels: list[str]
receiver_id: str
subject: str
event: str
properties: Dict
# Web API
# Send notification to user for a certain channel
@router.post(
"/enqueue_notification",
operation_id="enqueue_notification",
summary="Send notification to user for a certain channel",
description="Send notification to user for a channel (e.g., sms, email, in-app, etc.)",
response_description="Success/failure response in processing the notification send request",
)
# API route to enqueue notifications
@router.post("/enqueue_notification")
async def enqueue_notification(request: NotificationRequest):
try:
notification_hub = NotificationHub()
await notification_hub.enqueue_notification(
sender_id=request.sender_id, # Pass sender_id
channels=request.channels,
receiver_id=request.receiver_id,
subject=request.subject,
event=request.event,
properties=request.properties,
)
return JSONResponse(
content={"message": "Notifications queued successfully."}, status_code=200
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail="Failed to enqueue notification")

View File

@ -1,17 +1,17 @@
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # RabbitMQ communication port
- "15672:15672" # RabbitMQ management port
networks:
- freeleaps_service_hub_network
healthcheck:
test: [ "CMD", "rabbitmq-diagnostics", "ping" ]
interval: 30s
retries: 5
start_period: 10s
timeout: 10s
# rabbitmq:
# image: rabbitmq:3-management
# ports:
# - "5672:5672" # RabbitMQ communication port
# - "15672:15672" # RabbitMQ management port
# networks:
# - freeleaps_service_hub_network
# healthcheck:
# test: [ "CMD", "rabbitmq-diagnostics", "ping" ]
# interval: 30s
# retries: 5
# start_period: 10s
# timeout: 10s
central_storage:
build: