freeleaps-service-hub/apps/notification/webapi/routes/online_platform_notification.py
2024-10-30 20:49:50 -07:00

112 lines
3.9 KiB
Python

import jwt
from jwt.exceptions import PyJWTError, InvalidTokenError
from fastapi import (
APIRouter,
HTTPException,
WebSocket,
WebSocketException,
WebSocketDisconnect,
)
from starlette.websockets import WebSocketState
from common.log.module_logger import ModuleLogger
from common.config.app_settings import app_settings
from backend.business.notification_manager import NotificationManager
async def consume_message(requester_key, message, args):
# user_id = requester_key
websocket = args["websocket"]
await websocket.send_json(message)
router = APIRouter()
module_logger = ModuleLogger(sender_id="online_platform_notification")
# Web API
# web_socket
#
@router.websocket_route("/online_platform_notification")
async def online_platform_notification(websocket: WebSocket):
await websocket.accept()
try:
websocket_closing_code = 1000
websocket_closing_reason = "Normal Reason"
token = (
websocket.query_params["token"]
if "token" in websocket.query_params
else None
)
if not token:
raise InvalidTokenError("required token is missing")
decoded_token = jwt.decode(
jwt=token,
key=app_settings.SECRET_KEY,
verify=True,
algorithms=["HS256"],
options={"verify_signature": True, "verify_exp": True, "require_exp": True},
)
if not decoded_token:
raise InvalidTokenError("invalid token is provided")
user_id = decoded_token["subject"]["id"]
await websocket.send_json(
{
"sender_id": app_settings.SYSTEM_USER_ID,
"receiver_id": user_id,
"subject": "websocket",
"event": "connected",
"properties": {
"content_text": "Welcome to freeleaps online platform notification channel. "
"This channel is for downstream notification. All upstream messages will be ignored."
},
}
)
mq_client = websocket.app.in_app_mq_client
await mq_client.register_consumer(
user_id, consume_message, {"websocket": websocket}
)
notification_manager = NotificationManager()
async with notification_manager:
await notification_manager.send_in_app_notification(
receiver_id=user_id,
subject="in_app_notification",
event="test",
properties={
"content_text": "This is a self-test on verifying in-app downstream channel is working."
},
)
while True:
await websocket.receive()
except PyJWTError as err:
websocket_closing_code = 1008
websocket_closing_reason = "Invalid authentication token"
await module_logger.log_exception(err)
except WebSocketException as err:
websocket_closing_code = err.code
websocket_closing_reason = err.reason
await module_logger.log_exception(err)
except HTTPException as err:
websocket_closing_code = 1006
websocket_closing_reason = err.detail
await module_logger.log_exception(err)
except WebSocketDisconnect:
await module_logger.log_warning(warning="web socket disconnected.")
except RuntimeError as err:
websocket_closing_code = 1011
websocket_closing_reason = "Server error. Bye"
await module_logger.log_exception(err)
except Exception as err:
websocket_closing_code = 1011
websocket_closing_reason = "Server error. Bye"
await module_logger.log_exception(err)
finally:
try:
if websocket.application_state == WebSocketState.CONNECTED:
await websocket.close(
code=websocket_closing_code, reason=websocket_closing_reason
)
except Exception as err:
await module_logger.log_exception(err)