112 lines
3.9 KiB
Python
112 lines
3.9 KiB
Python
import jwt
|
|
from jwt.exceptions import PyJWTError, InvalidTokenError
|
|
from fastapi import (
|
|
APIRouter,
|
|
HTTPException,
|
|
WebSocket,
|
|
WebSocketException,
|
|
WebSocketDisconnect,
|
|
)
|
|
from starlette.websockets import WebSocketState
|
|
from common.log.module_logger import ModuleLogger
|
|
from common.config.app_settings import app_settings
|
|
from backend.business.notification_manager import NotificationManager
|
|
|
|
|
|
async def consume_message(requester_key, message, args):
|
|
# user_id = requester_key
|
|
websocket = args["websocket"]
|
|
await websocket.send_json(message)
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
module_logger = ModuleLogger(sender_id="online_platform_notification")
|
|
|
|
# Web API
|
|
# web_socket
|
|
#
|
|
|
|
|
|
@router.websocket_route("/online_platform_notification")
|
|
async def online_platform_notification(websocket: WebSocket):
|
|
await websocket.accept()
|
|
try:
|
|
websocket_closing_code = 1000
|
|
websocket_closing_reason = "Normal Reason"
|
|
token = (
|
|
websocket.query_params["token"]
|
|
if "token" in websocket.query_params
|
|
else None
|
|
)
|
|
if not token:
|
|
raise InvalidTokenError("required token is missing")
|
|
decoded_token = jwt.decode(
|
|
jwt=token,
|
|
key=app_settings.SECRET_KEY,
|
|
verify=True,
|
|
algorithms=["HS256"],
|
|
options={"verify_signature": True, "verify_exp": True, "require_exp": True},
|
|
)
|
|
if not decoded_token:
|
|
raise InvalidTokenError("invalid token is provided")
|
|
user_id = decoded_token["subject"]["id"]
|
|
await websocket.send_json(
|
|
{
|
|
"sender_id": app_settings.SYSTEM_USER_ID,
|
|
"receiver_id": user_id,
|
|
"subject": "websocket",
|
|
"event": "connected",
|
|
"properties": {
|
|
"content_text": "Welcome to freeleaps online platform notification channel. "
|
|
"This channel is for downstream notification. All upstream messages will be ignored."
|
|
},
|
|
}
|
|
)
|
|
mq_client = websocket.app.in_app_mq_client
|
|
await mq_client.register_consumer(
|
|
user_id, consume_message, {"websocket": websocket}
|
|
)
|
|
notification_manager = NotificationManager()
|
|
async with notification_manager:
|
|
await notification_manager.send_in_app_notification(
|
|
receiver_id=user_id,
|
|
subject="in_app_notification",
|
|
event="test",
|
|
properties={
|
|
"content_text": "This is a self-test on verifying in-app downstream channel is working."
|
|
},
|
|
)
|
|
while True:
|
|
await websocket.receive()
|
|
except PyJWTError as err:
|
|
websocket_closing_code = 1008
|
|
websocket_closing_reason = "Invalid authentication token"
|
|
await module_logger.log_exception(err)
|
|
except WebSocketException as err:
|
|
websocket_closing_code = err.code
|
|
websocket_closing_reason = err.reason
|
|
await module_logger.log_exception(err)
|
|
except HTTPException as err:
|
|
websocket_closing_code = 1006
|
|
websocket_closing_reason = err.detail
|
|
await module_logger.log_exception(err)
|
|
except WebSocketDisconnect:
|
|
await module_logger.log_warning(warning="web socket disconnected.")
|
|
except RuntimeError as err:
|
|
websocket_closing_code = 1011
|
|
websocket_closing_reason = "Server error. Bye"
|
|
await module_logger.log_exception(err)
|
|
except Exception as err:
|
|
websocket_closing_code = 1011
|
|
websocket_closing_reason = "Server error. Bye"
|
|
await module_logger.log_exception(err)
|
|
finally:
|
|
try:
|
|
if websocket.application_state == WebSocketState.CONNECTED:
|
|
await websocket.close(
|
|
code=websocket_closing_code, reason=websocket_closing_reason
|
|
)
|
|
except Exception as err:
|
|
await module_logger.log_exception(err)
|