From 6b286a443388c9b25166b8ecf5effcb15cc8bd79 Mon Sep 17 00:00:00 2001 From: Jet Li Date: Sat, 26 Oct 2024 05:10:21 +0000 Subject: [PATCH] Adding online platform notification piece --- .../backend/application/notification_hub.py | 3 +- .../backend/business/notification_manager.py | 6 +- app/notification/backend/infra/sms_handler.py | 1 + .../common/config/app_settings.py | 6 +- app/notification/webapi/routes/__init__.py | 3 + .../routes/online_platform_notification.py | 111 ++++++++++++++++++ .../webapi/routes/send_notification.py | 1 - app/notification/webapi/utils/sms_consumer.py | 2 +- 8 files changed, 123 insertions(+), 10 deletions(-) create mode 100644 app/notification/webapi/routes/online_platform_notification.py diff --git a/app/notification/backend/application/notification_hub.py b/app/notification/backend/application/notification_hub.py index 98a4855..1283240 100644 --- a/app/notification/backend/application/notification_hub.py +++ b/app/notification/backend/application/notification_hub.py @@ -9,7 +9,6 @@ class NotificationHub: async def enqueue_notification( self, - sender_id: str, # Added sender_id here channels: list[str], # Accept multiple channels as a list of strings receiver_id: str, subject: str, @@ -25,7 +24,7 @@ class NotificationHub: raise ValueError(f"Unsupported notification channel: {channel_str}") # Initialize NotificationManager with sender_id - notification_manager = NotificationManager(sender_id=sender_id) + notification_manager = NotificationManager() # Call the enqueue_notification method in NotificationManager return await notification_manager.enqueue_notification( diff --git a/app/notification/backend/business/notification_manager.py b/app/notification/backend/business/notification_manager.py index eb0789e..b86e11a 100644 --- a/app/notification/backend/business/notification_manager.py +++ b/app/notification/backend/business/notification_manager.py @@ -26,12 +26,10 @@ class NotificationManager: def __init__( self, - sender_id: str, # Require sender_id in the constructor ) -> None: self.sms_service = SmsService() self.in_app_notif_service = InAppNotifService() self.email_service = EmailService() - self.sender_id = sender_id self.notification_queues = NotificationManager.__create_notification_queues__() @staticmethod @@ -174,7 +172,7 @@ class NotificationManager: await self.__publish_notification__( channel=NotificationChannel.EMAIL, message=NotificationMessage( - sender_id="aaabbbcccddd", # app_settings.SYSTEM_USER_ID, + sender_id=app_settings.SYSTEM_USER_ID, receiver_id=receiver_id, subject=subject, event=event, @@ -188,7 +186,7 @@ class NotificationManager: print( "this is message", NotificationMessage( - sender_id="aaabbbcccddd", # app_settings.SYSTEM_USER_ID, + sender_id=app_settings.SYSTEM_USER_ID, receiver_id=receiver_id, subject=subject, event=event, diff --git a/app/notification/backend/infra/sms_handler.py b/app/notification/backend/infra/sms_handler.py index 9ab37a1..429e858 100644 --- a/app/notification/backend/infra/sms_handler.py +++ b/app/notification/backend/infra/sms_handler.py @@ -15,4 +15,5 @@ class SmsHandler: message = await self.twillo_client.messages.create_async( to=receiver, from_=sender, body=message ) + print("this is message", message) return message.status diff --git a/app/notification/common/config/app_settings.py b/app/notification/common/config/app_settings.py index 215bb2d..be3ebdd 100644 --- a/app/notification/common/config/app_settings.py +++ b/app/notification/common/config/app_settings.py @@ -9,8 +9,10 @@ class AppSettings(BaseSettings): RABBITMQ_PORT: int = 5672 SYSTEM_USER_ID: str = "117f191e810c19729de860aa" - SMS_FROM: str = "DDDDD" - EMAIL_FROM: str = "qifei.lu1994@gmail.com" + SMS_FROM: str = "+16898887156" + EMAIL_FROM: str = "freeleaps@freeleaps.com" + + SECRET_KEY: str = "" SENDGRID_API_KEY: str = ( "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" diff --git a/app/notification/webapi/routes/__init__.py b/app/notification/webapi/routes/__init__.py index 75ec0ac..24b738a 100644 --- a/app/notification/webapi/routes/__init__.py +++ b/app/notification/webapi/routes/__init__.py @@ -1,6 +1,9 @@ from fastapi import APIRouter from .send_notification import router as sn_router +from .online_platform_notification import router as ws_router api_router = APIRouter() api_router.include_router(sn_router, tags=["notification"]) + websocket_router = APIRouter() +websocket_router.include_router(ws_router, prefix="/downstream", tags=["downstream"]) diff --git a/app/notification/webapi/routes/online_platform_notification.py b/app/notification/webapi/routes/online_platform_notification.py new file mode 100644 index 0000000..54b0c89 --- /dev/null +++ b/app/notification/webapi/routes/online_platform_notification.py @@ -0,0 +1,111 @@ +import jwt +from jwt.exceptions import PyJWTError, InvalidTokenError +from fastapi import ( + APIRouter, + HTTPException, + WebSocket, + WebSocketException, + WebSocketDisconnect, +) +from starlette.websockets import WebSocketState +from infra.log.module_logger import ModuleLogger +from app.notification.common.config.app_settings import app_settings +from app.notification.backend.business.notification_manager import NotificationManager + + +async def consume_message(requester_key, message, args): + # user_id = requester_key + websocket = args["websocket"] + await websocket.send_json(message) + + +router = APIRouter() + +module_logger = ModuleLogger(sender_id="online_platform_notification") + +# Web API +# web_socket +# + + +@router.websocket_route("/online_platform_notification") +async def online_platform_notification(websocket: WebSocket): + await websocket.accept() + try: + websocket_closing_code = 1000 + websocket_closing_reason = "Normal Reason" + token = ( + websocket.query_params["token"] + if "token" in websocket.query_params + else None + ) + if not token: + raise InvalidTokenError("required token is missing") + decoded_token = jwt.decode( + jwt=token, + key=app_settings.SECRET_KEY, + verify=True, + algorithms=["HS256"], + options={"verify_signature": True, "verify_exp": True, "require_exp": True}, + ) + if not decoded_token: + raise InvalidTokenError("invalid token is provided") + user_id = decoded_token["subject"]["id"] + await websocket.send_json( + { + "sender_id": app_settings.SYSTEM_USER_ID, + "receiver_id": user_id, + "subject": "websocket", + "event": "connected", + "properties": { + "content_text": "Welcome to freeleaps online platform notification channel. " + "This channel is for downstream notification. All upstream messages will be ignored." + }, + } + ) + mq_client = websocket.app.in_app_mq_client + await mq_client.register_consumer( + user_id, consume_message, {"websocket": websocket} + ) + notification_manager = NotificationManager() + async with notification_manager: + await notification_manager.send_in_app_notification( + receiver_id=user_id, + subject="in_app_notification", + event="test", + properties={ + "content_text": "This is a self-test on verifying in-app downstream channel is working." + }, + ) + while True: + await websocket.receive() + except PyJWTError as err: + websocket_closing_code = 1008 + websocket_closing_reason = "Invalid authentication token" + await module_logger.log_exception(err) + except WebSocketException as err: + websocket_closing_code = err.code + websocket_closing_reason = err.reason + await module_logger.log_exception(err) + except HTTPException as err: + websocket_closing_code = 1006 + websocket_closing_reason = err.detail + await module_logger.log_exception(err) + except WebSocketDisconnect: + await module_logger.log_warning(warning="web socket disconnected.") + except RuntimeError as err: + websocket_closing_code = 1011 + websocket_closing_reason = "Server error. Bye" + await module_logger.log_exception(err) + except Exception as err: + websocket_closing_code = 1011 + websocket_closing_reason = "Server error. Bye" + await module_logger.log_exception(err) + finally: + try: + if websocket.application_state == WebSocketState.CONNECTED: + await websocket.close( + code=websocket_closing_code, reason=websocket_closing_reason + ) + except Exception as err: + await module_logger.log_exception(err) diff --git a/app/notification/webapi/routes/send_notification.py b/app/notification/webapi/routes/send_notification.py index 7390b39..6067728 100644 --- a/app/notification/webapi/routes/send_notification.py +++ b/app/notification/webapi/routes/send_notification.py @@ -35,7 +35,6 @@ async def send_notification(request: NotificationRequest): try: notification_hub = NotificationHub() await notification_hub.enqueue_notification( - sender_id="freeleaps@freeleaps.com", channels=request.channels, receiver_id=request.receiver_id, subject=request.subject, diff --git a/app/notification/webapi/utils/sms_consumer.py b/app/notification/webapi/utils/sms_consumer.py index 53444d0..077e4a0 100644 --- a/app/notification/webapi/utils/sms_consumer.py +++ b/app/notification/webapi/utils/sms_consumer.py @@ -7,7 +7,7 @@ from infra.log.module_logger import ModuleLogger class SmsMQConsumer: @staticmethod async def message_handler(register_key: str, message: dict, args: dict): - register_key + print("should getting here") sender_id = message["sender_id"] receiver_id = message["receiver_id"] sender_mobile = message["properties"]["sender_mobile"]