36 lines
1.0 KiB
Python
36 lines
1.0 KiB
Python
import httpx
|
|
from pydantic import BaseModel
|
|
from typing import Dict, List
|
|
|
|
|
|
class NotificationService:
|
|
def __init__(self, base_url: str = "http://localhost:8003"):
|
|
self.base_url = base_url
|
|
|
|
async def send_notification(
|
|
self,
|
|
sender_id: str,
|
|
channels: List[str],
|
|
receiver_id: str,
|
|
subject: str,
|
|
event: str,
|
|
properties: Dict,
|
|
) -> bool:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/send_notification",
|
|
json={
|
|
"sender_id": sender_id,
|
|
"channels": channels,
|
|
"receiver_id": receiver_id,
|
|
"subject": subject,
|
|
"event": event,
|
|
"properties": properties,
|
|
},
|
|
)
|
|
if response.status_code == 200:
|
|
return True
|
|
else:
|
|
# Optionally log or handle errors here
|
|
return False
|