import jwt from jwt.exceptions import PyJWTError, InvalidTokenError from fastapi import ( APIRouter, HTTPException, WebSocket, WebSocketException, WebSocketDisconnect, ) from starlette.websockets import WebSocketState from common.log.module_logger import ModuleLogger from common.config.app_settings import app_settings from backend.business.notification_manager import NotificationManager async def consume_message(requester_key, message, args): # user_id = requester_key websocket = args["websocket"] await websocket.send_json(message) router = APIRouter() module_logger = ModuleLogger(sender_id="online_platform_notification") # Web API # web_socket # @router.websocket_route("/online_platform_notification") async def online_platform_notification(websocket: WebSocket): await websocket.accept() try: websocket_closing_code = 1000 websocket_closing_reason = "Normal Reason" token = ( websocket.query_params["token"] if "token" in websocket.query_params else None ) if not token: raise InvalidTokenError("required token is missing") decoded_token = jwt.decode( jwt=token, key=app_settings.SECRET_KEY, verify=True, algorithms=["HS256"], options={"verify_signature": True, "verify_exp": True, "require_exp": True}, ) if not decoded_token: raise InvalidTokenError("invalid token is provided") user_id = decoded_token["subject"]["id"] await websocket.send_json( { "sender_id": app_settings.SYSTEM_USER_ID, "receiver_id": user_id, "subject": "websocket", "event": "connected", "properties": { "content_text": "Welcome to freeleaps online platform notification channel. " "This channel is for downstream notification. All upstream messages will be ignored." }, } ) mq_client = websocket.app.in_app_mq_client await mq_client.register_consumer( user_id, consume_message, {"websocket": websocket} ) notification_manager = NotificationManager() async with notification_manager: await notification_manager.send_in_app_notification( receiver_id=user_id, subject="in_app_notification", event="test", properties={ "content_text": "This is a self-test on verifying in-app downstream channel is working." }, ) while True: await websocket.receive() except PyJWTError as err: websocket_closing_code = 1008 websocket_closing_reason = "Invalid authentication token" await module_logger.log_exception(err) except WebSocketException as err: websocket_closing_code = err.code websocket_closing_reason = err.reason await module_logger.log_exception(err) except HTTPException as err: websocket_closing_code = 1006 websocket_closing_reason = err.detail await module_logger.log_exception(err) except WebSocketDisconnect: await module_logger.log_warning(warning="web socket disconnected.") except RuntimeError as err: websocket_closing_code = 1011 websocket_closing_reason = "Server error. Bye" await module_logger.log_exception(err) except Exception as err: websocket_closing_code = 1011 websocket_closing_reason = "Server error. Bye" await module_logger.log_exception(err) finally: try: if websocket.application_state == WebSocketState.CONNECTED: await websocket.close( code=websocket_closing_code, reason=websocket_closing_reason ) except Exception as err: await module_logger.log_exception(err)