Add mq consumer for email, sms

This commit is contained in:
jetli 2024-10-25 05:20:39 +00:00
parent 1510010091
commit 1f551ad161
6 changed files with 141 additions and 2 deletions

View File

@ -1,2 +1,42 @@
from app.notification.common.config.app_settings import app_settings
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from infra.log.module_logger import ModuleLogger
class EmailHandler: class EmailHandler:
pass def __init__(self) -> None:
pass
async def send_email(self, message: dict):
receiver_id = message["receiver_id"]
sender_email = message["properties"]["sender_email"]
content_text = message["properties"]["content_text"]
content_subject = message["properties"]["content_subject"]
receiver_type = message["properties"]["receiver_type"]
module_logger = ModuleLogger(sender_id="EmailHandler")
if receiver_type == "email":
receiver_email = receiver_id
else:
receiver_email = None
module_logger.log_info(
"unsupported receiver_type: '{}'".format(receiver_type)
)
return
mail = Mail(
from_email=sender_email,
to_emails=receiver_email,
subject=content_subject,
html_content=content_text,
)
try:
sg = SendGridAPIClient(app_settings.SENDGRID_API_KEY)
response = sg.send(mail)
await module_logger.log_info(
info=f"SendGridAPIClient:response:status_code:{response.status_code} | body:{response.body}",
properties=message["properties"],
)
except Exception as e:
await module_logger.log_exception(e)

View File

@ -1,2 +1,18 @@
from app.notification.common.config.app_settings import app_settings
from twilio.http.async_http_client import AsyncTwilioHttpClient
from twilio.rest import Client
class SmsHandler: class SmsHandler:
pass def __init__(self) -> None:
self.twillo_client = Client(
app_settings.TWILIO_ACCOUNT_SID,
app_settings.TWILIO_AUTH_TOKEN,
http_client=AsyncTwilioHttpClient(),
)
async def send_sms(self, sender: str, receiver: str, message: str):
message = await self.twillo_client.messages.create_async(
to=receiver, from_=sender, body=message
)
return message.status

View File

@ -12,6 +12,11 @@ class AppSettings(BaseSettings):
SMS_FROM: str = "" SMS_FROM: str = ""
EMAIL_FROM: str = "" EMAIL_FROM: str = ""
SENDGRID_API_KEY: str = ""
TWILIO_ACCOUNT_SID: str = ""
TWILIO_AUTH_TOKEN: str = ""
class Config: class Config:
env_file = ".myapp.env" env_file = ".myapp.env"
env_file_encoding = "utf-8" env_file_encoding = "utf-8"

View File

@ -1,6 +1,15 @@
from fastapi import FastAPI from fastapi import FastAPI
from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
from backend.models.constants import NotificationChannel
from webapi.utils.email_consumer import EmailMQConsumer
from webapi.utils.sms_consumer import SmsMQConsumer
class FreeleapsApp(FastAPI): class FreeleapsApp(FastAPI):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.email_mq_client = AsyncMQSubscriber(NotificationChannel.EMAIL.name)
self.sms_mq_client = AsyncMQSubscriber(NotificationChannel.SMS.name)
self.in_app_mq_client = AsyncMQSubscriber(NotificationChannel.IN_APP.name)
self.email_handler = EmailMQConsumer(self.email_mq_client)
self.sms_handler = SmsMQConsumer(self.sms_mq_client)

View File

@ -0,0 +1,25 @@
from app.notification.common.config.app_settings import app_settings
from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
from backend.infra.email_handler import EmailHandler
class EmailMQConsumer:
@staticmethod
async def mail_handler(register_key: str, message: dict, args: dict):
email_consumer: EmailMQConsumer = args["message_handler"]
return await email_consumer.email_handler.send_email(message)
def __init__(self, mq_client: AsyncMQSubscriber) -> None:
self.sender_id = app_settings.EMAIL_FROM
self.email_handler = EmailHandler()
self.mq_client = mq_client
async def register_consumer(self):
await self.mq_client.register_consumer(
registry_key=self.sender_id,
callback_method=EmailMQConsumer.mail_handler,
args={"email_handler": self},
)
async def unregister_consumer(self):
await self.mq_client.unregister_consumer(registry_key=self.sender_id)

View File

@ -0,0 +1,44 @@
from app.notification.common.config.app_settings import app_settings
from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
from backend.infra.sms_handler import SmsHandler
from infra.log.module_logger import ModuleLogger
class SmsMQConsumer:
@staticmethod
async def message_handler(register_key: str, message: dict, args: dict):
register_key
sender_id = message["sender_id"]
receiver_id = message["receiver_id"]
sender_mobile = message["properties"]["sender_mobile"]
content_text = message["properties"]["content_text"]
receiver_type = message["properties"]["receiver_type"]
sms_consumer: SmsMQConsumer = args["message_handler"]
module_logger = ModuleLogger(sender_id="SmsMQConsumer")
if receiver_type == "mobile":
receiver_mobile = receiver_id
else:
receiver_mobile = None
module_logger.warning(
"unsupported receiver_type: '{}'".format(receiver_type)
)
return
await sms_consumer.sms_handler.send_sms(
sender_mobile, receiver_mobile, content_text
)
def __init__(self, mq_client: AsyncMQSubscriber) -> None:
self.sender_id = app_settings.SMS_FROM
self.sms_handler = SmsHandler()
self.mq_client = mq_client
async def register_consumer(self):
await self.mq_client.register_consumer(
registry_key=self.sender_id,
callback_method=SmsMQConsumer.message_handler,
args={"message_handler": self},
)
async def unregister_consumer(self):
await self.mq_client.unregister_consumer(registry_key=self.sender_id)