491 lines
20 KiB
Python
491 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Real Integration Test for Email Sender API
|
|
Tests email sender configuration management with real API calls
|
|
Data is actually written to MongoDB for comprehensive testing
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import sys
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
# Import existing configuration
|
|
from common.config.app_settings import app_settings
|
|
from webapi.config.site_settings import site_settings
|
|
from common.token.token_manager import TokenManager
|
|
|
|
# API base URL - use site_settings
|
|
host = 'localhost' if site_settings.SERVER_HOST == '0.0.0.0' else site_settings.SERVER_HOST
|
|
port = site_settings.SERVER_PORT
|
|
BASE_URL = f"http://{host}:{port}/api/notification"
|
|
|
|
class EmailSenderRealIntegrationTest:
|
|
def __init__(self):
|
|
self.base_url = BASE_URL
|
|
# Use tenant_id from token instead of generating random one
|
|
self.test_tenant_id = "test_tenant_user" # Keep consistent with token tenant_id
|
|
self.test_email_sender = "freeleaps@freeleaps.com"
|
|
|
|
# Generate tokens using existing TokenManager
|
|
print("🔑 Generating tokens using existing TokenManager...")
|
|
self.admin_token, self.tenant_token = self._generate_tokens()
|
|
|
|
print(f"🧪 Starting Email Sender Real Token Integration Test - Tenant ID: {self.test_tenant_id}")
|
|
print(f"📊 Test data will be written to MongoDB")
|
|
print(f"📧 Test email sender: {self.test_email_sender}")
|
|
print(f"🌐 API Base URL: {self.base_url}")
|
|
|
|
def _generate_tokens(self):
|
|
"""Generate admin and tenant tokens using existing TokenManager"""
|
|
# Load environment variables from local.env file
|
|
import os
|
|
import subprocess
|
|
|
|
# Load environment variables from local.env
|
|
try:
|
|
with open('local.env', 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#') and '=' in line:
|
|
if line.startswith('export '):
|
|
line = line[7:] # Remove 'export '
|
|
key, value = line.split('=', 1)
|
|
# Remove quotes if present
|
|
value = value.strip('"\'')
|
|
os.environ[key] = value
|
|
except FileNotFoundError:
|
|
print("⚠️ local.env file not found, using hardcoded SECRET_KEY")
|
|
os.environ['SECRET_KEY'] = "ea84edf152976b2fcec12b78aa8e45bc26a5cf0ef61bf16f5c317ae33b3fd8b0"
|
|
|
|
# Now import app_settings after environment is loaded
|
|
from common.config.app_settings import app_settings
|
|
|
|
token_manager = TokenManager()
|
|
|
|
# Generate admin token with subject format (like authentication service)
|
|
admin_subject = {"subject": {"id": "admin_user", "role": 8}} # ADMINISTRATOR = 8
|
|
admin_token = token_manager.create_access_token(admin_subject)
|
|
|
|
# Generate tenant token with subject format (like authentication service)
|
|
tenant_subject = {"subject": {"id": self.test_tenant_id, "role": 2}} # BUSINESS = 2
|
|
tenant_token = token_manager.create_access_token(tenant_subject)
|
|
|
|
print(f"✅ Generated admin token: {admin_token[:20]}...")
|
|
print(f"✅ Generated tenant token: {tenant_token[:20]}...")
|
|
|
|
return admin_token, tenant_token
|
|
|
|
def _get_headers(self, token):
|
|
"""Get request headers with authorization token"""
|
|
return {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {token}"
|
|
}
|
|
|
|
def test_set_email_sender(self):
|
|
"""Test setting email sender for tenant"""
|
|
print("\n🚀 Testing email sender setting...")
|
|
|
|
# Test data for setting email sender
|
|
email_sender_data = {
|
|
"email_sender": self.test_email_sender
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.base_url}/email_sender/set",
|
|
headers=self._get_headers(self.tenant_token),
|
|
json=email_sender_data
|
|
)
|
|
|
|
print(f"📤 Set Email Sender Response Status: {response.status_code}")
|
|
print(f"📤 Set Email Sender Response: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
print(f"✅ Email sender set successfully!")
|
|
print(f"📧 Email sender: {result.get('email_sender')}")
|
|
print(f"📧 Success: {result.get('success')}")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to set email sender: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during email sender setting: {str(e)}")
|
|
return False
|
|
|
|
def test_get_email_sender(self):
|
|
"""Test retrieving email sender for tenant"""
|
|
print("\n📖 Testing email sender retrieval...")
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{self.base_url}/email_sender/get",
|
|
headers=self._get_headers(self.tenant_token)
|
|
)
|
|
|
|
print(f"📤 Get Email Sender Response Status: {response.status_code}")
|
|
print(f"📤 Get Email Sender Response: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
print(f"✅ Email sender retrieved successfully!")
|
|
print(f"📧 Email sender: {result.get('email_sender')}")
|
|
print(f"📧 Success: {result.get('success')}")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to get email sender: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during email sender retrieval: {str(e)}")
|
|
return False
|
|
|
|
def test_update_email_sender(self):
|
|
"""Test updating email sender for tenant"""
|
|
print("\n✏️ Testing email sender update...")
|
|
|
|
# Updated email sender data
|
|
updated_email_sender = "freeleaps@freeleaps.com"
|
|
update_data = {
|
|
"email_sender": updated_email_sender
|
|
}
|
|
|
|
try:
|
|
response = requests.put(
|
|
f"{self.base_url}/email_sender/update",
|
|
headers=self._get_headers(self.tenant_token),
|
|
json=update_data
|
|
)
|
|
|
|
print(f"📤 Update Email Sender Response Status: {response.status_code}")
|
|
print(f"📤 Update Email Sender Response: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
print(f"✅ Email sender updated successfully!")
|
|
print(f"📧 Updated email sender: {result.get('email_sender')}")
|
|
print(f"📧 Success: {result.get('success')}")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to update email sender: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during email sender update: {str(e)}")
|
|
return False
|
|
|
|
def test_set_email_sender_again(self):
|
|
"""Test setting email sender again (should replace existing)"""
|
|
print("\n🔄 Testing email sender setting again...")
|
|
|
|
# Test data for setting email sender again
|
|
email_sender_data = {
|
|
"email_sender": self.test_email_sender
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.base_url}/email_sender/set",
|
|
headers=self._get_headers(self.tenant_token),
|
|
json=email_sender_data
|
|
)
|
|
|
|
print(f"📤 Set Email Sender Again Response Status: {response.status_code}")
|
|
print(f"📤 Set Email Sender Again Response: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
print(f"✅ Email sender set again successfully!")
|
|
print(f"📧 Email sender: {result.get('email_sender')}")
|
|
print(f"📧 Success: {result.get('success')}")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to set email sender again: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during email sender setting again: {str(e)}")
|
|
return False
|
|
|
|
def test_admin_delete_email_sender(self):
|
|
"""Test deleting email sender configuration (Admin only)"""
|
|
print("\n🗑️ Testing admin email sender deletion...")
|
|
|
|
try:
|
|
response = requests.delete(
|
|
f"{self.base_url}/email_sender/delete/{self.test_tenant_id}",
|
|
headers=self._get_headers(self.admin_token)
|
|
)
|
|
|
|
print(f"📤 Admin Delete Email Sender Response Status: {response.status_code}")
|
|
print(f"📤 Admin Delete Email Sender Response: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
print(f"✅ Email sender configuration deleted successfully!")
|
|
print(f"📧 Success: {result.get('success')}")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to delete email sender configuration: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during email sender deletion: {str(e)}")
|
|
return False
|
|
|
|
def test_get_email_sender_after_deletion(self):
|
|
"""Test getting email sender after deletion (should return None)"""
|
|
print("\n📖 Testing email sender retrieval after deletion...")
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{self.base_url}/email_sender/get",
|
|
headers=self._get_headers(self.tenant_token)
|
|
)
|
|
|
|
print(f"📤 Get Email Sender After Deletion Response Status: {response.status_code}")
|
|
print(f"📤 Get Email Sender After Deletion Response: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
email_sender = result.get('email_sender')
|
|
print(f"✅ Email sender retrieved after deletion!")
|
|
print(f"📧 Email sender: {email_sender}")
|
|
print(f"📧 Success: {result.get('success')}")
|
|
|
|
# After deletion, email_sender should be None
|
|
if email_sender is None:
|
|
print("✅ Correctly returned None after deletion")
|
|
return True
|
|
else:
|
|
print("⚠️ Expected None but got email sender")
|
|
return False
|
|
else:
|
|
print(f"❌ Failed to get email sender after deletion: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during email sender retrieval after deletion: {str(e)}")
|
|
return False
|
|
|
|
def test_error_scenarios(self):
|
|
"""Test error scenarios and edge cases"""
|
|
print("\n⚠️ Testing error scenarios...")
|
|
|
|
# Test 1: Set email sender with invalid email format
|
|
print("\n📝 Test 1: Set email sender with invalid email format")
|
|
invalid_email_data = {
|
|
"email_sender": "invalid-email-format"
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.base_url}/email_sender/set",
|
|
headers=self._get_headers(self.tenant_token),
|
|
json=invalid_email_data
|
|
)
|
|
|
|
print(f"📤 Invalid Email Format Response Status: {response.status_code}")
|
|
if response.status_code == 400:
|
|
print("✅ Correctly rejected invalid email format")
|
|
else:
|
|
print(f"⚠️ Unexpected response for invalid email format: {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during invalid email format test: {str(e)}")
|
|
|
|
# Test 2: Set email sender with empty email
|
|
print("\n📝 Test 2: Set email sender with empty email")
|
|
empty_email_data = {
|
|
"email_sender": ""
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.base_url}/email_sender/set",
|
|
headers=self._get_headers(self.tenant_token),
|
|
json=empty_email_data
|
|
)
|
|
|
|
print(f"📤 Empty Email Response Status: {response.status_code}")
|
|
if response.status_code == 400:
|
|
print("✅ Correctly rejected empty email")
|
|
else:
|
|
print(f"⚠️ Unexpected response for empty email: {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during empty email test: {str(e)}")
|
|
|
|
# Test 3: Update email sender when none exists
|
|
print("\n📝 Test 3: Update email sender when none exists")
|
|
|
|
# First, delete the existing configuration to ensure it doesn't exist
|
|
try:
|
|
delete_response = requests.delete(
|
|
f"{self.base_url}/email_sender/delete/{self.test_tenant_id}",
|
|
headers=self._get_headers(self.admin_token)
|
|
)
|
|
print(f" 📤 Delete existing config: {delete_response.status_code}")
|
|
except Exception as e:
|
|
print(f" ⚠️ Could not delete existing config: {e}")
|
|
|
|
update_data = {
|
|
"email_sender": "update_test_nonexistent@freeleaps.com"
|
|
}
|
|
|
|
try:
|
|
response = requests.put(
|
|
f"{self.base_url}/email_sender/update",
|
|
headers=self._get_headers(self.tenant_token),
|
|
json=update_data
|
|
)
|
|
|
|
print(f"📤 Update When None Exists Response Status: {response.status_code}")
|
|
print(f"📤 Update When None Exists Response: {response.text}")
|
|
if response.status_code == 200:
|
|
print("✅ Update succeeded (may have created new configuration)")
|
|
else:
|
|
print(f"⚠️ Unexpected response for update when none exists: {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during update when none exists test: {str(e)}")
|
|
|
|
# Recreate the email sender configuration after Test 3
|
|
print("\n🔄 Recreating email sender configuration after Test 3...")
|
|
recreate_data = {
|
|
"email_sender": "freeleaps@freeleaps.com"
|
|
}
|
|
try:
|
|
recreate_response = requests.post(
|
|
f"{self.base_url}/email_sender/set",
|
|
headers=self._get_headers(self.tenant_token),
|
|
json=recreate_data
|
|
)
|
|
print(f" 📤 Recreate Response: {recreate_response.status_code}")
|
|
if recreate_response.status_code == 200:
|
|
print(" ✅ Email sender configuration recreated successfully")
|
|
else:
|
|
print(f" ❌ Failed to recreate configuration: {recreate_response.text}")
|
|
except Exception as e:
|
|
print(f" ❌ Exception during recreation: {str(e)}")
|
|
|
|
# Test 4: Tenant trying to access admin delete API
|
|
print("\n📝 Test 4: Tenant trying to access admin delete API")
|
|
try:
|
|
response = requests.delete(
|
|
f"{self.base_url}/email_sender/delete/{self.test_tenant_id}",
|
|
headers=self._get_headers(self.tenant_token) # Using tenant token instead of admin
|
|
)
|
|
|
|
print(f"📤 Unauthorized Admin Access Response Status: {response.status_code}")
|
|
if response.status_code == 403:
|
|
print("✅ Correctly rejected tenant access to admin API")
|
|
else:
|
|
print(f"⚠️ Unexpected response for unauthorized admin access: {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during unauthorized admin access: {str(e)}")
|
|
|
|
# Test 5: Admin trying to access tenant API
|
|
print("\n📝 Test 5: Admin trying to access tenant API")
|
|
admin_email_data = {
|
|
"email_sender": "admin_test_access@freeleaps.com"
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.base_url}/email_sender/set",
|
|
headers=self._get_headers(self.admin_token), # Using admin token
|
|
json=admin_email_data
|
|
)
|
|
|
|
print(f"📤 Admin Access Tenant API Response Status: {response.status_code}")
|
|
if response.status_code == 403:
|
|
print("✅ Correctly rejected admin access to tenant API")
|
|
else:
|
|
print(f"⚠️ Unexpected response for admin access to tenant API: {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exception during admin access to tenant API: {str(e)}")
|
|
|
|
def run_all_tests(self):
|
|
"""Run all email sender tests"""
|
|
print("🧪 Starting Email Sender Integration Tests...")
|
|
print("=" * 60)
|
|
|
|
test_results = []
|
|
|
|
# Run positive test cases
|
|
test_results.append(("Set Email Sender", self.test_set_email_sender()))
|
|
test_results.append(("Get Email Sender", self.test_get_email_sender()))
|
|
test_results.append(("Update Email Sender", self.test_update_email_sender()))
|
|
#test_results.append(("Set Email Sender Again", self.test_set_email_sender_again()))
|
|
#test_results.append(("Admin Delete Email Sender", self.test_admin_delete_email_sender()))
|
|
#test_results.append(("Get After Deletion", self.test_get_email_sender_after_deletion()))
|
|
|
|
# Run error scenario tests
|
|
self.test_error_scenarios()
|
|
|
|
# Print test summary
|
|
print("\n" + "=" * 60)
|
|
print("📊 TEST SUMMARY")
|
|
print("=" * 60)
|
|
|
|
passed = 0
|
|
failed = 0
|
|
|
|
for test_name, result in test_results:
|
|
status = "✅ PASS" if result else "❌ FAIL"
|
|
print(f"{test_name:<20} {status}")
|
|
if result:
|
|
passed += 1
|
|
else:
|
|
failed += 1
|
|
|
|
print("-" * 60)
|
|
print(f"Total Tests: {len(test_results)}")
|
|
print(f"Passed: {passed}")
|
|
print(f"Failed: {failed}")
|
|
print(f"Success Rate: {(passed/len(test_results)*100):.1f}%")
|
|
|
|
if failed == 0:
|
|
print("\n🎉 All tests passed! Email Sender API is working correctly.")
|
|
else:
|
|
print(f"\n⚠️ {failed} test(s) failed. Please check the implementation.")
|
|
|
|
return failed == 0
|
|
|
|
def main():
|
|
"""Main function to run the integration test"""
|
|
print("🚀 Email Sender Real Integration Test")
|
|
print("=" * 60)
|
|
|
|
try:
|
|
# Create test instance
|
|
test = EmailSenderRealIntegrationTest()
|
|
|
|
# Run all tests
|
|
success = test.run_all_tests()
|
|
|
|
if success:
|
|
print("\n✅ Integration test completed successfully!")
|
|
sys.exit(0)
|
|
else:
|
|
print("\n❌ Integration test failed!")
|
|
sys.exit(1)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n⚠️ Test interrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n❌ Unexpected error: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|