diff --git a/apps/notification/test_send_welcome_email.py b/apps/notification/test_send_welcome_email.py
deleted file mode 100644
index 0ce3681..0000000
--- a/apps/notification/test_send_welcome_email.py
+++ /dev/null
@@ -1,349 +0,0 @@
-#!/usr/bin/env python3
-"""
-use magicleaps english version welcome template
-I use email sender as freeleaps@freeleaps.com
-I use tenant id as magicleaps
-I use template id as welcome_email_tenant_magicleaps
-I use region as 0
-I use recipient emails as *@mathmast.com, *@163.com-> my own actual email address
-I use subject properties as company_name, new_employee_name
-I use body properties as new_employee_name, employee_id, position, department, email_address, start_date, company_name, company_logo, company_phone, company_address, manager_name, manager_email, hr_contact_name, hr_contact_email, it_support_email, initial_password, system_login_url, employee_handbook_url, company_policies_url, training_materials_url, onboarding_schedule, first_week_schedule
-and I recieved the welcome email successfully
-"""
-
-import asyncio
-import aiohttp
-import json
-import os
-import sys
-from datetime import datetime, timezone, timedelta
-from jose import jwt
-
-# import project config
-sys.path.append('.')
-
-# load environment variables
-def load_env_file(file_path):
- """load environment variables from .env file"""
- if os.path.exists(file_path):
- with open(file_path, 'r') as f:
- for line in f:
- line = line.strip()
- if line and not line.startswith('#') and '=' in line:
- if line.startswith('export '):
- line = line[7:]
- key, value = line.split('=', 1)
-
- value = value.strip('"\'')
- os.environ[key] = value
-
-# load local.env file
-load_env_file('local.env')
-
-from common.config.app_settings import app_settings
-from webapi.config.site_settings import site_settings
-
-class WelcomeEmailSender:
- def __init__(self):
- # é
ē½®
- host = 'localhost' if site_settings.SERVER_HOST == '0.0.0.0' else site_settings.SERVER_HOST
- port = site_settings.SERVER_PORT
- self.base_url = f"http://{host}:{port}/api/notification"
- self.tenant_id = "magicleaps"
- self.recipient_emails = ["*@mathmast.com", "*@163.com", *] # TODO: change to actual email addresses
- self.template_id = "welcome_email_tenant_magicleaps"
- self.region = 0
-
- self.tenant_token = None
-
- print(f"š§ config: {self.base_url}")
- print(f"šÆ tenant: {self.tenant_id}")
- print(f"š§ recipient: {', '.join(self.recipient_emails)}")
- print(f"š template: {self.template_id} (English)")
-
- async def initialize(self):
- """get token"""
- self.tenant_token = await self.get_tenant_token()
-
- async def get_token_from_auth_service(self):
- """get token from auth service"""
- print("\nš get token from auth service...")
-
- # in my local environment, the auth service is running on localhost:8103
- # TODO: change to actual auth service url
- auth_url = "http://localhost:8103/api/auth/token/generate-tokens"
- auth_data = {
- "id": self.tenant_id,
- "role": 2 # BUSINESS = 2
- }
-
- try:
- async with aiohttp.ClientSession() as session:
- async with session.post(auth_url, json=auth_data) as response:
- print(f"auth service response status code: {response.status}")
-
- if response.status == 200:
- result = await response.json()
- access_token = result.get("access_token")
-
- print("ā
get token from auth service successfully")
- print("=" * 60)
- print(f"Access Token: {access_token}")
- print("=" * 60)
-
- return access_token
- else:
- response_text = await response.text()
- print(f"ā get token from auth service failed: {response_text}")
- return None
- except Exception as e:
- print(f"ā connect to auth service failed: {e}")
- return None
-
-
-
- async def get_tenant_token(self):
- """get tenant token from auth service"""
- auth_token = await self.get_token_from_auth_service()
-
- if not auth_token:
- raise Exception("cannot get token from auth service, please ensure auth service is running")
-
- return auth_token
-
- async def verify_template_exists(self):
- """verify template exists"""
- print(f"\nš verify template {self.template_id} exists...")
-
- headers = {"Authorization": f"Bearer {self.tenant_token}"}
-
- # print debug info
- print(f"request URL: {self.base_url}/tenant/templates/list?region={self.region}")
- print(f"request headers: {headers}")
-
- async with aiohttp.ClientSession() as session:
- async with session.get(
- f"{self.base_url}/tenant/templates/list?region={self.region}",
- headers=headers
- ) as response:
- print(f"response status code: {response.status}")
- response_text = await response.text()
- print(f"response: {response_text}")
-
- if response.status == 200:
- data = json.loads(response_text)
- templates = data.get('templates', [])
-
- print(f"found {len(templates)} templates:")
- for template in templates:
- print(f" - {template.get('template_id')}: {template.get('subject')}")
-
- # find welcome email template
- welcome_template = None
- for template in templates:
- if template.get('template_id') == self.template_id:
- welcome_template = template
- break
-
- if welcome_template:
- print(f"ā
found template: {welcome_template.get('template_id')}")
- print(f" subject: {welcome_template.get('subject')}")
- return True
- else:
- print(f"ā not found template: {self.template_id}")
- return False
- else:
- print(f"ā get template list failed")
- return False
-
- async def verify_email_sender(self):
- """verify email sender is configured"""
- print(f"\nš verify email sender is configured...")
-
- headers = {"Authorization": f"Bearer {self.tenant_token}"}
-
- async with aiohttp.ClientSession() as session:
- async with session.get(
- f"{self.base_url}/email_senders/get",
- headers=headers
- ) as response:
- print(f"response status code: {response.status}")
- response_text = await response.text()
- print(f"response: {response_text}")
-
- if response.status == 200:
- data = json.loads(response_text)
- email_senders = data.get('email_senders', [])
-
- if email_senders:
- print(f"ā
email senders: {email_senders}")
- return email_senders[0]
- print(f"ā no email senders configured")
- return None
- else:
- print(f"ā get email senders failed")
- return None
-
- async def send_welcome_email(self):
- """send welcome email"""
- print(f"\nš§ send welcome email...")
-
- # prepare email data
- email_data = {
- "tenant_id": self.tenant_id,
- "template_id": self.template_id,
- "recipient_emails": self.recipient_emails,
- "subject_properties": {
- "company_name": "MagicLeaps",
- "new_employee_name": "Taniacao"
- },
- "body_properties": {
- # employee info
- "new_employee_name": "*", # TODO: change to test employee name
- "employee_id": "EMP001",
- "position": "Software Engineer",
- "department": "Engineering",
- "email_address": self.recipient_emails[0],
- "start_date": "2024-01-15",
-
- # company info
- "company_name": "MagicLeaps",
- "company_logo": "https://magicleaps.com/logo.png",
- "company_phone": "+1-555-0123",
- "company_address": "123 Innovation Street, Tech City, TC 12345",
-
- # manager info
- "manager_name": "John Smith",
- "manager_email": "john.smith@magicleaps.com",
-
- # HR info
- "hr_contact_name": "Sarah Johnson",
- "hr_contact_email": "hr@magicleaps.com",
-
- # IT support
- "it_support_email": "it-support@magicleaps.com",
- "initial_password": "Welcome2024!",
-
- # system access
- "system_login_url": "https://portal.magicleaps.com",
-
- # document links
- "employee_handbook_url": "https://docs.magicleaps.com/handbook",
- "company_policies_url": "https://docs.magicleaps.com/policies",
- "training_materials_url": "https://training.magicleaps.com",
-
- # time schedule
- "onboarding_schedule": f"Week 1: Orientation
Week 2: Training
Week 3: Project Assignment",
- "first_week_schedule": f"Monday: Welcome Meeting
Tuesday: System Setup
Wednesday: Team Introduction"
- },
- "region": self.region,
- "sender_emails": ["freeleaps@freeleaps.com"],
- "priority": "normal",
- "tracking_enabled": True
- }
-
- headers = {
- "Authorization": f"Bearer {self.tenant_token}",
- "Content-Type": "application/json"
- }
-
- print(f"š email data:")
- print(json.dumps(email_data, indent=2))
-
- async with aiohttp.ClientSession() as session:
- async with session.post(
- f"{self.base_url}/send_tenant_email",
- json=email_data,
- headers=headers
- ) as response:
- print(f"\nresponse status code: {response.status}")
- response_text = await response.text()
- print(f"response: {response_text}")
-
- if response.status == 200:
- result = json.loads(response_text)
- print(f"ā
email sending success!")
- print(f" message id: {result.get('message_id')}")
- print(f" email ids: {result.get('email_ids', [])}")
- return result
- else:
- print(f"ā email sending failed")
- return None
-
- async def check_email_status(self, email_id):
- """check email status"""
- print(f"\nš check email status: {email_id}")
-
- headers = {"Authorization": f"Bearer {self.tenant_token}"}
-
- async with aiohttp.ClientSession() as session:
- async with session.get(
- f"{self.base_url}/tenant_email_status/{self.tenant_id}?email_id={email_id}",
- headers=headers
- ) as response:
- print(f"response status code: {response.status}")
- response_text = await response.text()
- print(f"response: {response_text}")
-
- if response.status == 200:
- status_data = json.loads(response_text)
- print(f"ā
email status: {status_data}")
- return status_data
- else:
- print(f"ā get email status failed")
- return None
-
- async def run(self):
- """run full sending process"""
- print("š start sending welcome email...")
- print("=" * 60)
-
- try:
- # 0. initialize (get token)
- await self.initialize()
-
- # 1. verify template exists
- template_exists = await self.verify_template_exists()
- if not template_exists:
- print("ā template not found, cannot send email")
- return
-
- # 2. verify email sender is configured
- email_sender = await self.verify_email_sender()
- if not email_sender:
- print("ā email sender not configured, cannot send email")
- return
-
- # 3. send welcome email
- result = await self.send_welcome_email()
-
- if result:
- # 4. wait for email to be processed
- print("ā³ wait for email to be processed...")
- await asyncio.sleep(3)
-
- # 5. check email status
- email_ids = result.get('email_ids', [])
- if email_ids:
- for email_id in email_ids:
- await self.check_email_status(email_id)
-
- print(f"\nā
welcome email sending completed!")
- print(f"š§ recipient: {', '.join(self.recipient_emails)}")
- print(f"š template: {self.template_id}")
- print(f"šÆ tenant: {self.tenant_id}")
- else:
- print(f"\nā email sending failed")
-
- except Exception as e:
- print(f"ā error during sending: {e}")
- import traceback
- traceback.print_exc()
-
-async def main():
- """main function"""
- sender = WelcomeEmailSender()
- await sender.run()
-
-if __name__ == "__main__":
- asyncio.run(main())