From 1f551ad161628eda42711ddcddcb57ebdf6cf66f Mon Sep 17 00:00:00 2001 From: Jet Li Date: Fri, 25 Oct 2024 05:20:39 +0000 Subject: [PATCH] Add mq consumer for email, sms --- .../backend/infra/email_handler.py | 42 +++++++++++++++++- app/notification/backend/infra/sms_handler.py | 18 +++++++- .../common/config/app_settings.py | 5 +++ .../webapi/bootstrap/freeleaps_app.py | 9 ++++ .../webapi/utils/email_consumer.py | 25 +++++++++++ app/notification/webapi/utils/sms_consumer.py | 44 +++++++++++++++++++ 6 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 app/notification/webapi/utils/email_consumer.py create mode 100644 app/notification/webapi/utils/sms_consumer.py diff --git a/app/notification/backend/infra/email_handler.py b/app/notification/backend/infra/email_handler.py index a91276c..112532b 100644 --- a/app/notification/backend/infra/email_handler.py +++ b/app/notification/backend/infra/email_handler.py @@ -1,2 +1,42 @@ +from app.notification.common.config.app_settings import app_settings +from sendgrid import SendGridAPIClient +from sendgrid.helpers.mail import Mail +from infra.log.module_logger import ModuleLogger + + class EmailHandler: - pass + def __init__(self) -> None: + pass + + async def send_email(self, message: dict): + receiver_id = message["receiver_id"] + sender_email = message["properties"]["sender_email"] + content_text = message["properties"]["content_text"] + content_subject = message["properties"]["content_subject"] + receiver_type = message["properties"]["receiver_type"] + module_logger = ModuleLogger(sender_id="EmailHandler") + if receiver_type == "email": + receiver_email = receiver_id + else: + receiver_email = None + module_logger.log_info( + "unsupported receiver_type: '{}'".format(receiver_type) + ) + return + + mail = Mail( + from_email=sender_email, + to_emails=receiver_email, + subject=content_subject, + html_content=content_text, + ) + + try: + sg = SendGridAPIClient(app_settings.SENDGRID_API_KEY) + response = sg.send(mail) + await module_logger.log_info( + info=f"SendGridAPIClient:response:status_code:{response.status_code} | body:{response.body}", + properties=message["properties"], + ) + except Exception as e: + await module_logger.log_exception(e) diff --git a/app/notification/backend/infra/sms_handler.py b/app/notification/backend/infra/sms_handler.py index 95c84dc..9ab37a1 100644 --- a/app/notification/backend/infra/sms_handler.py +++ b/app/notification/backend/infra/sms_handler.py @@ -1,2 +1,18 @@ +from app.notification.common.config.app_settings import app_settings +from twilio.http.async_http_client import AsyncTwilioHttpClient +from twilio.rest import Client + + class SmsHandler: - pass + def __init__(self) -> None: + self.twillo_client = Client( + app_settings.TWILIO_ACCOUNT_SID, + app_settings.TWILIO_AUTH_TOKEN, + http_client=AsyncTwilioHttpClient(), + ) + + async def send_sms(self, sender: str, receiver: str, message: str): + message = await self.twillo_client.messages.create_async( + to=receiver, from_=sender, body=message + ) + return message.status diff --git a/app/notification/common/config/app_settings.py b/app/notification/common/config/app_settings.py index 83042aa..ea68460 100644 --- a/app/notification/common/config/app_settings.py +++ b/app/notification/common/config/app_settings.py @@ -12,6 +12,11 @@ class AppSettings(BaseSettings): SMS_FROM: str = "" EMAIL_FROM: str = "" + SENDGRID_API_KEY: str = "" + + TWILIO_ACCOUNT_SID: str = "" + TWILIO_AUTH_TOKEN: str = "" + class Config: env_file = ".myapp.env" env_file_encoding = "utf-8" diff --git a/app/notification/webapi/bootstrap/freeleaps_app.py b/app/notification/webapi/bootstrap/freeleaps_app.py index 496633a..c055dd0 100644 --- a/app/notification/webapi/bootstrap/freeleaps_app.py +++ b/app/notification/webapi/bootstrap/freeleaps_app.py @@ -1,6 +1,15 @@ from fastapi import FastAPI +from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber +from backend.models.constants import NotificationChannel +from webapi.utils.email_consumer import EmailMQConsumer +from webapi.utils.sms_consumer import SmsMQConsumer class FreeleapsApp(FastAPI): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.email_mq_client = AsyncMQSubscriber(NotificationChannel.EMAIL.name) + self.sms_mq_client = AsyncMQSubscriber(NotificationChannel.SMS.name) + self.in_app_mq_client = AsyncMQSubscriber(NotificationChannel.IN_APP.name) + self.email_handler = EmailMQConsumer(self.email_mq_client) + self.sms_handler = SmsMQConsumer(self.sms_mq_client) diff --git a/app/notification/webapi/utils/email_consumer.py b/app/notification/webapi/utils/email_consumer.py new file mode 100644 index 0000000..140b76d --- /dev/null +++ b/app/notification/webapi/utils/email_consumer.py @@ -0,0 +1,25 @@ +from app.notification.common.config.app_settings import app_settings +from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber +from backend.infra.email_handler import EmailHandler + + +class EmailMQConsumer: + @staticmethod + async def mail_handler(register_key: str, message: dict, args: dict): + email_consumer: EmailMQConsumer = args["message_handler"] + return await email_consumer.email_handler.send_email(message) + + def __init__(self, mq_client: AsyncMQSubscriber) -> None: + self.sender_id = app_settings.EMAIL_FROM + self.email_handler = EmailHandler() + self.mq_client = mq_client + + async def register_consumer(self): + await self.mq_client.register_consumer( + registry_key=self.sender_id, + callback_method=EmailMQConsumer.mail_handler, + args={"email_handler": self}, + ) + + async def unregister_consumer(self): + await self.mq_client.unregister_consumer(registry_key=self.sender_id) diff --git a/app/notification/webapi/utils/sms_consumer.py b/app/notification/webapi/utils/sms_consumer.py new file mode 100644 index 0000000..b27664a --- /dev/null +++ b/app/notification/webapi/utils/sms_consumer.py @@ -0,0 +1,44 @@ +from app.notification.common.config.app_settings import app_settings +from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber +from backend.infra.sms_handler import SmsHandler +from infra.log.module_logger import ModuleLogger + + +class SmsMQConsumer: + @staticmethod + async def message_handler(register_key: str, message: dict, args: dict): + register_key + sender_id = message["sender_id"] + receiver_id = message["receiver_id"] + sender_mobile = message["properties"]["sender_mobile"] + content_text = message["properties"]["content_text"] + receiver_type = message["properties"]["receiver_type"] + sms_consumer: SmsMQConsumer = args["message_handler"] + module_logger = ModuleLogger(sender_id="SmsMQConsumer") + if receiver_type == "mobile": + receiver_mobile = receiver_id + else: + receiver_mobile = None + module_logger.warning( + "unsupported receiver_type: '{}'".format(receiver_type) + ) + return + + await sms_consumer.sms_handler.send_sms( + sender_mobile, receiver_mobile, content_text + ) + + def __init__(self, mq_client: AsyncMQSubscriber) -> None: + self.sender_id = app_settings.SMS_FROM + self.sms_handler = SmsHandler() + self.mq_client = mq_client + + async def register_consumer(self): + await self.mq_client.register_consumer( + registry_key=self.sender_id, + callback_method=SmsMQConsumer.message_handler, + args={"message_handler": self}, + ) + + async def unregister_consumer(self): + await self.mq_client.unregister_consumer(registry_key=self.sender_id)