Adding online platform notification piece
This commit is contained in:
parent
6a9b876b71
commit
6b286a4433
@ -9,7 +9,6 @@ class NotificationHub:
|
||||
|
||||
async def enqueue_notification(
|
||||
self,
|
||||
sender_id: str, # Added sender_id here
|
||||
channels: list[str], # Accept multiple channels as a list of strings
|
||||
receiver_id: str,
|
||||
subject: str,
|
||||
@ -25,7 +24,7 @@ class NotificationHub:
|
||||
raise ValueError(f"Unsupported notification channel: {channel_str}")
|
||||
|
||||
# Initialize NotificationManager with sender_id
|
||||
notification_manager = NotificationManager(sender_id=sender_id)
|
||||
notification_manager = NotificationManager()
|
||||
|
||||
# Call the enqueue_notification method in NotificationManager
|
||||
return await notification_manager.enqueue_notification(
|
||||
|
||||
@ -26,12 +26,10 @@ class NotificationManager:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sender_id: str, # Require sender_id in the constructor
|
||||
) -> None:
|
||||
self.sms_service = SmsService()
|
||||
self.in_app_notif_service = InAppNotifService()
|
||||
self.email_service = EmailService()
|
||||
self.sender_id = sender_id
|
||||
self.notification_queues = NotificationManager.__create_notification_queues__()
|
||||
|
||||
@staticmethod
|
||||
@ -174,7 +172,7 @@ class NotificationManager:
|
||||
await self.__publish_notification__(
|
||||
channel=NotificationChannel.EMAIL,
|
||||
message=NotificationMessage(
|
||||
sender_id="aaabbbcccddd", # app_settings.SYSTEM_USER_ID,
|
||||
sender_id=app_settings.SYSTEM_USER_ID,
|
||||
receiver_id=receiver_id,
|
||||
subject=subject,
|
||||
event=event,
|
||||
@ -188,7 +186,7 @@ class NotificationManager:
|
||||
print(
|
||||
"this is message",
|
||||
NotificationMessage(
|
||||
sender_id="aaabbbcccddd", # app_settings.SYSTEM_USER_ID,
|
||||
sender_id=app_settings.SYSTEM_USER_ID,
|
||||
receiver_id=receiver_id,
|
||||
subject=subject,
|
||||
event=event,
|
||||
|
||||
@ -15,4 +15,5 @@ class SmsHandler:
|
||||
message = await self.twillo_client.messages.create_async(
|
||||
to=receiver, from_=sender, body=message
|
||||
)
|
||||
print("this is message", message)
|
||||
return message.status
|
||||
|
||||
@ -9,8 +9,10 @@ class AppSettings(BaseSettings):
|
||||
RABBITMQ_PORT: int = 5672
|
||||
|
||||
SYSTEM_USER_ID: str = "117f191e810c19729de860aa"
|
||||
SMS_FROM: str = "DDDDD"
|
||||
EMAIL_FROM: str = "qifei.lu1994@gmail.com"
|
||||
SMS_FROM: str = "+16898887156"
|
||||
EMAIL_FROM: str = "freeleaps@freeleaps.com"
|
||||
|
||||
SECRET_KEY: str = ""
|
||||
|
||||
SENDGRID_API_KEY: str = (
|
||||
"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"
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
from fastapi import APIRouter
|
||||
from .send_notification import router as sn_router
|
||||
from .online_platform_notification import router as ws_router
|
||||
|
||||
api_router = APIRouter()
|
||||
api_router.include_router(sn_router, tags=["notification"])
|
||||
|
||||
websocket_router = APIRouter()
|
||||
websocket_router.include_router(ws_router, prefix="/downstream", tags=["downstream"])
|
||||
|
||||
111
app/notification/webapi/routes/online_platform_notification.py
Normal file
111
app/notification/webapi/routes/online_platform_notification.py
Normal file
@ -0,0 +1,111 @@
|
||||
import jwt
|
||||
from jwt.exceptions import PyJWTError, InvalidTokenError
|
||||
from fastapi import (
|
||||
APIRouter,
|
||||
HTTPException,
|
||||
WebSocket,
|
||||
WebSocketException,
|
||||
WebSocketDisconnect,
|
||||
)
|
||||
from starlette.websockets import WebSocketState
|
||||
from infra.log.module_logger import ModuleLogger
|
||||
from app.notification.common.config.app_settings import app_settings
|
||||
from app.notification.backend.business.notification_manager import NotificationManager
|
||||
|
||||
|
||||
async def consume_message(requester_key, message, args):
|
||||
# user_id = requester_key
|
||||
websocket = args["websocket"]
|
||||
await websocket.send_json(message)
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
module_logger = ModuleLogger(sender_id="online_platform_notification")
|
||||
|
||||
# Web API
|
||||
# web_socket
|
||||
#
|
||||
|
||||
|
||||
@router.websocket_route("/online_platform_notification")
|
||||
async def online_platform_notification(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
try:
|
||||
websocket_closing_code = 1000
|
||||
websocket_closing_reason = "Normal Reason"
|
||||
token = (
|
||||
websocket.query_params["token"]
|
||||
if "token" in websocket.query_params
|
||||
else None
|
||||
)
|
||||
if not token:
|
||||
raise InvalidTokenError("required token is missing")
|
||||
decoded_token = jwt.decode(
|
||||
jwt=token,
|
||||
key=app_settings.SECRET_KEY,
|
||||
verify=True,
|
||||
algorithms=["HS256"],
|
||||
options={"verify_signature": True, "verify_exp": True, "require_exp": True},
|
||||
)
|
||||
if not decoded_token:
|
||||
raise InvalidTokenError("invalid token is provided")
|
||||
user_id = decoded_token["subject"]["id"]
|
||||
await websocket.send_json(
|
||||
{
|
||||
"sender_id": app_settings.SYSTEM_USER_ID,
|
||||
"receiver_id": user_id,
|
||||
"subject": "websocket",
|
||||
"event": "connected",
|
||||
"properties": {
|
||||
"content_text": "Welcome to freeleaps online platform notification channel. "
|
||||
"This channel is for downstream notification. All upstream messages will be ignored."
|
||||
},
|
||||
}
|
||||
)
|
||||
mq_client = websocket.app.in_app_mq_client
|
||||
await mq_client.register_consumer(
|
||||
user_id, consume_message, {"websocket": websocket}
|
||||
)
|
||||
notification_manager = NotificationManager()
|
||||
async with notification_manager:
|
||||
await notification_manager.send_in_app_notification(
|
||||
receiver_id=user_id,
|
||||
subject="in_app_notification",
|
||||
event="test",
|
||||
properties={
|
||||
"content_text": "This is a self-test on verifying in-app downstream channel is working."
|
||||
},
|
||||
)
|
||||
while True:
|
||||
await websocket.receive()
|
||||
except PyJWTError as err:
|
||||
websocket_closing_code = 1008
|
||||
websocket_closing_reason = "Invalid authentication token"
|
||||
await module_logger.log_exception(err)
|
||||
except WebSocketException as err:
|
||||
websocket_closing_code = err.code
|
||||
websocket_closing_reason = err.reason
|
||||
await module_logger.log_exception(err)
|
||||
except HTTPException as err:
|
||||
websocket_closing_code = 1006
|
||||
websocket_closing_reason = err.detail
|
||||
await module_logger.log_exception(err)
|
||||
except WebSocketDisconnect:
|
||||
await module_logger.log_warning(warning="web socket disconnected.")
|
||||
except RuntimeError as err:
|
||||
websocket_closing_code = 1011
|
||||
websocket_closing_reason = "Server error. Bye"
|
||||
await module_logger.log_exception(err)
|
||||
except Exception as err:
|
||||
websocket_closing_code = 1011
|
||||
websocket_closing_reason = "Server error. Bye"
|
||||
await module_logger.log_exception(err)
|
||||
finally:
|
||||
try:
|
||||
if websocket.application_state == WebSocketState.CONNECTED:
|
||||
await websocket.close(
|
||||
code=websocket_closing_code, reason=websocket_closing_reason
|
||||
)
|
||||
except Exception as err:
|
||||
await module_logger.log_exception(err)
|
||||
@ -35,7 +35,6 @@ async def send_notification(request: NotificationRequest):
|
||||
try:
|
||||
notification_hub = NotificationHub()
|
||||
await notification_hub.enqueue_notification(
|
||||
sender_id="freeleaps@freeleaps.com",
|
||||
channels=request.channels,
|
||||
receiver_id=request.receiver_id,
|
||||
subject=request.subject,
|
||||
|
||||
@ -7,7 +7,7 @@ from infra.log.module_logger import ModuleLogger
|
||||
class SmsMQConsumer:
|
||||
@staticmethod
|
||||
async def message_handler(register_key: str, message: dict, args: dict):
|
||||
register_key
|
||||
print("should getting here")
|
||||
sender_id = message["sender_id"]
|
||||
receiver_id = message["receiver_id"]
|
||||
sender_mobile = message["properties"]["sender_mobile"]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user