From 6a9b876b710f71e10728c00b4ddb08e9caeb200e Mon Sep 17 00:00:00 2001 From: Jet Li Date: Fri, 25 Oct 2024 22:37:47 +0000 Subject: [PATCH] Notification code finished with email function verified --- app/authentication/requirements.txt | 1 + .../webapi/bootstrap/application.py | 5 ++-- .../webapi/routes/upload_document.py | 7 ++--- .../webapi/routes/upload_file.py | 6 +--- .../backend/business/notification_manager.py | 23 +++++++++++---- .../backend/infra/email_handler.py | 4 +-- .../backend/infra/rabbitmq/async_client.py | 3 +- .../common/config/app_settings.py | 14 +++++---- .../webapi/bootstrap/application.py | 2 ++ .../webapi/bootstrap/freeleaps_app.py | 15 ++++++++++ .../webapi/providers/message_queue.py | 23 +++++++++++++++ .../webapi/routes/send_notification.py | 5 ++-- .../webapi/utils/email_consumer.py | 5 ++-- docker-compose.yaml | 29 ++++++++++--------- 14 files changed, 96 insertions(+), 46 deletions(-) create mode 100644 app/notification/webapi/providers/message_queue.py diff --git a/app/authentication/requirements.txt b/app/authentication/requirements.txt index f9cad3a..1105c5e 100644 --- a/app/authentication/requirements.txt +++ b/app/authentication/requirements.txt @@ -7,6 +7,7 @@ loguru==0.7.2 uvicorn==0.23.2 beanie==1.21.0 jieba==0.42.1 +sendgrid aio-pika httpx pydantic-settings diff --git a/app/authentication/webapi/bootstrap/application.py b/app/authentication/webapi/bootstrap/application.py index 6fdd4d0..d670c50 100644 --- a/app/authentication/webapi/bootstrap/application.py +++ b/app/authentication/webapi/bootstrap/application.py @@ -6,7 +6,8 @@ from app.authentication.webapi.providers import common from app.authentication.webapi.providers import logger from app.authentication.webapi.providers import router from app.authentication.webapi.providers import database -from app.authentication.webapi.providers import scheduler + +# from app.authentication.webapi.providers import scheduler from app.authentication.webapi.providers import exception_handler from .freeleaps_app import FreeleapsApp @@ -20,7 +21,7 @@ def create_app() -> FastAPI: register(app, database) register(app, logger) register(app, router) - register(app, scheduler) + # register(app, scheduler) register(app, common) # Call the custom_openapi function to change the OpenAPI version diff --git a/app/central_storage/webapi/routes/upload_document.py b/app/central_storage/webapi/routes/upload_document.py index bb8b471..637e4f4 100644 --- a/app/central_storage/webapi/routes/upload_document.py +++ b/app/central_storage/webapi/routes/upload_document.py @@ -9,19 +9,19 @@ from pydantic import BaseModel router = APIRouter() + class Item(BaseModel): associated_with: str name: str blob: bytes + @router.post( "/upload-document", summary="upload a document with a given associated_with id, document name and document data.", description="upload a document. If success, returning the document id", ) -async def upload_document( - item:Item -): +async def upload_document(item: Item): document_hub = DocumentHub() # File processing @@ -39,5 +39,4 @@ async def upload_document( ) except Exception as e: - print("this is exception", e) return JSONResponse(status_code=500, content={"error": "Internal server error"}) diff --git a/app/central_storage/webapi/routes/upload_file.py b/app/central_storage/webapi/routes/upload_file.py index 956c472..26e1515 100644 --- a/app/central_storage/webapi/routes/upload_file.py +++ b/app/central_storage/webapi/routes/upload_file.py @@ -16,10 +16,7 @@ token_manager = TokenManager() summary="upload a document with a given associated_with id.", description="upload a document. If success, returning the document id and file name", ) -async def upload_file( - associated_with: str = Form(...), - file: UploadFile = File(None) -): +async def upload_file(associated_with: str = Form(...), file: UploadFile = File(None)): document_hub = DocumentHub() # File processing @@ -38,5 +35,4 @@ async def upload_file( ) except Exception as e: - print("this is exception", e) return JSONResponse(status_code=500, content={"error": "Internal server error"}) diff --git a/app/notification/backend/business/notification_manager.py b/app/notification/backend/business/notification_manager.py index 4c6f24f..eb0789e 100644 --- a/app/notification/backend/business/notification_manager.py +++ b/app/notification/backend/business/notification_manager.py @@ -15,9 +15,6 @@ from datetime import datetime, timezone from typing import Optional, Type from types import TracebackType from infra.models.constants import UserRegion -from app.notification.backend.services.notification_publisher_service import ( - NotificationPublisherService, -) from app.notification.common.config.app_settings import app_settings from datetime import datetime, timezone @@ -31,7 +28,6 @@ class NotificationManager: self, sender_id: str, # Require sender_id in the constructor ) -> None: - self.notification_publisher_service = NotificationPublisherService() self.sms_service = SmsService() self.in_app_notif_service = InAppNotifService() self.email_service = EmailService() @@ -172,13 +168,27 @@ class NotificationManager: properties_dict = { "content_text": content_text, "content_subject": content_subject, - "sender_email": app_settings.EMAIL_FROM, + "sender_email": "qifei.lu1994@gmail.com", "receiver_type": "email", } await self.__publish_notification__( channel=NotificationChannel.EMAIL, message=NotificationMessage( - sender_id=app_settings.SYSTEM_USER_ID, + sender_id="aaabbbcccddd", # app_settings.SYSTEM_USER_ID, + receiver_id=receiver_id, + subject=subject, + event=event, + properties=( + properties_dict.update(properties) + if not properties + else properties_dict + ), + ), + ) + print( + "this is message", + NotificationMessage( + sender_id="aaabbbcccddd", # app_settings.SYSTEM_USER_ID, receiver_id=receiver_id, subject=subject, event=event, @@ -244,6 +254,7 @@ class NotificationManager: properties=properties, ) elif channel == NotificationChannel.EMAIL: + print("should get here") await self.send_email_notification( receiver_id=receiver_id, subject=subject, diff --git a/app/notification/backend/infra/email_handler.py b/app/notification/backend/infra/email_handler.py index 112532b..8dd84fb 100644 --- a/app/notification/backend/infra/email_handler.py +++ b/app/notification/backend/infra/email_handler.py @@ -10,7 +10,6 @@ class EmailHandler: async def send_email(self, message: dict): receiver_id = message["receiver_id"] - sender_email = message["properties"]["sender_email"] content_text = message["properties"]["content_text"] content_subject = message["properties"]["content_subject"] receiver_type = message["properties"]["receiver_type"] @@ -25,12 +24,11 @@ class EmailHandler: return mail = Mail( - from_email=sender_email, + from_email=app_settings.EMAIL_FROM, to_emails=receiver_email, subject=content_subject, html_content=content_text, ) - try: sg = SendGridAPIClient(app_settings.SENDGRID_API_KEY) response = sg.send(mail) diff --git a/app/notification/backend/infra/rabbitmq/async_client.py b/app/notification/backend/infra/rabbitmq/async_client.py index c480ca3..b0b6665 100644 --- a/app/notification/backend/infra/rabbitmq/async_client.py +++ b/app/notification/backend/infra/rabbitmq/async_client.py @@ -25,8 +25,7 @@ class AsyncMQClient: while retry_count < max_retries: try: self.connection = await aio_pika.connect_robust( - host=app_settings.RABBITMQ_HOST, - port=int(app_settings.RABBITMQ_PORT), + "amqp://guest:guest@rabbitmq:5672/", loop=event_loop, ) self.channel = await self.connection.channel() diff --git a/app/notification/common/config/app_settings.py b/app/notification/common/config/app_settings.py index ea68460..215bb2d 100644 --- a/app/notification/common/config/app_settings.py +++ b/app/notification/common/config/app_settings.py @@ -8,14 +8,16 @@ class AppSettings(BaseSettings): RABBITMQ_HOST: str = "rabbitmq" RABBITMQ_PORT: int = 5672 - SYSTEM_USER_ID: str = "" - SMS_FROM: str = "" - EMAIL_FROM: str = "" + SYSTEM_USER_ID: str = "117f191e810c19729de860aa" + SMS_FROM: str = "DDDDD" + EMAIL_FROM: str = "qifei.lu1994@gmail.com" - SENDGRID_API_KEY: str = "" + SENDGRID_API_KEY: str = ( + "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" + ) - TWILIO_ACCOUNT_SID: str = "" - TWILIO_AUTH_TOKEN: str = "" + TWILIO_ACCOUNT_SID: str = "ACf8c9283a6acda060258eadb29be58bc8" + TWILIO_AUTH_TOKEN: str = "ef160748cc22c8b7195b49df4b8eca7e" class Config: env_file = ".myapp.env" diff --git a/app/notification/webapi/bootstrap/application.py b/app/notification/webapi/bootstrap/application.py index 8fc2caf..e01d540 100644 --- a/app/notification/webapi/bootstrap/application.py +++ b/app/notification/webapi/bootstrap/application.py @@ -7,6 +7,7 @@ from app.notification.webapi.providers import logger from app.notification.webapi.providers import router from app.notification.webapi.providers import database from app.notification.webapi.providers import scheduler +from app.notification.webapi.providers import message_queue from app.notification.webapi.providers import exception_handler from .freeleaps_app import FreeleapsApp @@ -22,6 +23,7 @@ def create_app() -> FastAPI: register(app, router) register(app, scheduler) register(app, common) + register(app, message_queue) # Call the custom_openapi function to change the OpenAPI version customize_openapi_security(app) diff --git a/app/notification/webapi/bootstrap/freeleaps_app.py b/app/notification/webapi/bootstrap/freeleaps_app.py index 0405322..d1db11c 100644 --- a/app/notification/webapi/bootstrap/freeleaps_app.py +++ b/app/notification/webapi/bootstrap/freeleaps_app.py @@ -13,3 +13,18 @@ class FreeleapsApp(FastAPI): self.in_app_mq_client = AsyncMQSubscriber(NotificationChannel.IN_APP.name) self.email_handler = EmailMQConsumer(self.email_mq_client) self.sms_handler = SmsMQConsumer(self.sms_mq_client) + # Register the consumers on startup and shutdown + print("Registering startup/shutdown events") # Debugging line + self.register_startup_shutdown_events() + + def register_startup_shutdown_events(self): + @self.on_event("startup") + async def start_consumers(): + print("starting up!") + await self.sms_handler.register_consumer() + await self.email_handler.register_consumer() + + @self.on_event("shutdown") + async def stop_consumers(): + await self.sms_handler.unregister_consumer() + await self.email_handler.unregister_consumer() diff --git a/app/notification/webapi/providers/message_queue.py b/app/notification/webapi/providers/message_queue.py new file mode 100644 index 0000000..d7e2fd3 --- /dev/null +++ b/app/notification/webapi/providers/message_queue.py @@ -0,0 +1,23 @@ +import asyncio + + +def register(app): + @app.on_event("startup") + async def start_consumers(): + loop = asyncio.get_running_loop() + await loop.create_task( + app.in_app_mq_client.subscribe(max_retries=5, event_loop=loop) + ) + await loop.create_task( + app.email_mq_client.subscribe(max_retries=5, event_loop=loop) + ) + await loop.create_task( + app.sms_mq_client.subscribe(max_retries=5, event_loop=loop) + ) + + @app.on_event("shutdown") + async def stop_consumers(): + loop = asyncio.get_running_loop() + await loop.create_task(app.in_app_mq_client.close()) + await loop.create_task(app.email_mq_client.close()) + await loop.create_task(app.sms_mq_client.close()) diff --git a/app/notification/webapi/routes/send_notification.py b/app/notification/webapi/routes/send_notification.py index afb2d70..7390b39 100644 --- a/app/notification/webapi/routes/send_notification.py +++ b/app/notification/webapi/routes/send_notification.py @@ -1,4 +1,5 @@ from fastapi import APIRouter, Depends, HTTPException +import traceback from fastapi.responses import JSONResponse from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR from app.notification.backend.application.notification_hub import NotificationHub @@ -30,11 +31,11 @@ class NotificationRequest(BaseModel): ) # API route to enqueue notifications @router.post("/send_notification") -async def enqueue_notification(request: NotificationRequest): +async def send_notification(request: NotificationRequest): try: notification_hub = NotificationHub() await notification_hub.enqueue_notification( - sender_id=request.sender_id, # Pass sender_id + sender_id="freeleaps@freeleaps.com", channels=request.channels, receiver_id=request.receiver_id, subject=request.subject, diff --git a/app/notification/webapi/utils/email_consumer.py b/app/notification/webapi/utils/email_consumer.py index 760383f..60e7cc5 100644 --- a/app/notification/webapi/utils/email_consumer.py +++ b/app/notification/webapi/utils/email_consumer.py @@ -6,8 +6,7 @@ from app.notification.backend.infra.email_handler import EmailHandler class EmailMQConsumer: @staticmethod async def mail_handler(register_key: str, message: dict, args: dict): - email_consumer: EmailMQConsumer = args["message_handler"] - return await email_consumer.email_handler.send_email(message) + return await EmailHandler().send_email(message) def __init__(self, mq_client: AsyncMQSubscriber) -> None: self.sender_id = app_settings.EMAIL_FROM @@ -16,7 +15,7 @@ class EmailMQConsumer: async def register_consumer(self): await self.mq_client.register_consumer( - registry_key=self.sender_id, + registry_key=app_settings.EMAIL_FROM, callback_method=EmailMQConsumer.mail_handler, args={"email_handler": self}, ) diff --git a/docker-compose.yaml b/docker-compose.yaml index 53d5d38..8f53d39 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,17 +1,17 @@ services: - # rabbitmq: - # image: rabbitmq:3-management - # ports: - # - "5672:5672" # RabbitMQ communication port - # - "15672:15672" # RabbitMQ management port - # networks: - # - freeleaps_service_hub_network - # healthcheck: - # test: [ "CMD", "rabbitmq-diagnostics", "ping" ] - # interval: 30s - # retries: 5 - # start_period: 10s - # timeout: 10s + rabbitmq: + image: rabbitmq:3-management + ports: + - "5672:5672" # RabbitMQ communication port + - "15672:15672" # RabbitMQ management port + networks: + - freeleaps_service_hub_network + healthcheck: + test: [ "CMD", "rabbitmq-diagnostics", "ping" ] + interval: 30s + retries: 5 + start_period: 10s + timeout: 10s central_storage: build: @@ -54,6 +54,9 @@ services: - freeleaps_service_hub_network env_file: - sites/notification/.env + environment: + RABBITMQ_HOST: rabbitmq + SENDGRID_API_KEY: "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" volumes: - .:/app # Mount the current directory to /app in the container