feat(role_management): Add logic for creating temporary users to apitest.

This commit is contained in:
icecheng 2025-07-22 11:30:16 +08:00
parent beba0f5fe5
commit 9dcd27bc8a
5 changed files with 162 additions and 3 deletions

View File

@ -3,8 +3,18 @@ import pytest
from tests.base.authentication_web import AuthenticationWeb from tests.base.authentication_web import AuthenticationWeb
@pytest.fixture @pytest.fixture(scope="session")
def authentication_web()->AuthenticationWeb: def authentication_web() -> AuthenticationWeb:
authentication_web = AuthenticationWeb() authentication_web = AuthenticationWeb()
authentication_web.login() authentication_web.login()
return authentication_web return authentication_web
@pytest.fixture(scope="session")
def authentication_web_of_temp_user1() -> AuthenticationWeb:
authentication_web = AuthenticationWeb()
user = authentication_web.create_temporary_user()
authentication_web.user_email = user["email"]
authentication_web.password = user["password"]
authentication_web.login()
return authentication_web

View File

@ -101,4 +101,4 @@ class TestCreatePermission:
if __name__ == '__main__': if __name__ == '__main__':
pytest.main([__file__]) pytest.main([__file__])

View File

@ -1,6 +1,9 @@
import asyncio
import httpx import httpx
from typing import Optional from typing import Optional
from tests.base.config import USER_EMAIL, USER_PASSWORD, BASE_URL from tests.base.config import USER_EMAIL, USER_PASSWORD, BASE_URL
from tests.util.temporary_email import *
class AuthenticationWeb: class AuthenticationWeb:
@ -10,6 +13,63 @@ class AuthenticationWeb:
self.base_url = base_url self.base_url = base_url
self.token: Optional[str] = None self.token: Optional[str] = None
def create_temporary_user(self) -> dict[str, str]:
"""Create a temporary user."""
# generate temporary user email
email = generate_email()
print("temporary user email:", email)
# call try-signin-with-email api
response1 = self.try_signin_with_email(params={"email": email, "host": self.base_url})
print("try_signin_with_email", response1.json())
# query auth code
auth_code = get_auth_code(email)
print("temporary user auth code:", auth_code)
response2 = self.signin_with_email_and_code(
params={"email": email, "code": auth_code, "host": self.base_url})
print("signin_with_email_and_code", response2.json())
access_token = response2.json()["access_token"]
response3 = self.update_new_user_flid(token=access_token, params={'flid': response2.json()['flid']})
print("update_new_user_flid", response3.json())
password = "Kdwy12#$"
# set password
response4 = self.update_user_password(token=access_token, params={
'password': password,
'password2': password
})
print("update_user_password", response4.json())
return {
"email": email,
"password": password,
}
def update_new_user_flid(self, params: dict, token: str = None):
"""Update the user's FLID."""
if token is None:
token = self.token
headers = {"Authorization": f"Bearer {token}"}
with httpx.Client(base_url=self.base_url) as client:
resp = client.request("POST", "/api/auth/signin/update-new-user-flid", headers=headers, json=params)
return resp
def update_user_password(self, params: dict, token: str = None):
"""Update the user's password."""
if token is None:
token = self.token
headers = {"Authorization": f"Bearer {token}"}
with httpx.Client(base_url=self.base_url) as client:
resp = client.request("POST", "/api/auth/signin/update-user-password", headers=headers, json=params)
return resp
def try_signin_with_email(self, params):
"""try signin with email."""
return self.request_sync("POST", "/api/auth/signin/try-signin-with-email", json=params)
def signin_with_email_and_code(self, params):
"""try signin with email and code."""
return self.request_sync("POST", "/api/auth/signin/signin-with-email-and-code", json=params)
def login(self): def login(self):
"""Login and store JWT token""" """Login and store JWT token"""
with httpx.Client(base_url=self.base_url) as client: with httpx.Client(base_url=self.base_url) as client:
@ -26,6 +86,15 @@ class AuthenticationWeb:
}) })
return resp return resp
def request_sync(self, method: str, url: str, **kwargs):
"""Send authenticated request"""
headers = kwargs.pop("headers", {})
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
with httpx.Client(base_url=self.base_url) as client:
resp = client.request(method, url, headers=headers, **kwargs)
return resp
async def request(self, method: str, url: str, **kwargs): async def request(self, method: str, url: str, **kwargs):
"""Send authenticated request""" """Send authenticated request"""
headers = kwargs.pop("headers", {}) headers = kwargs.pop("headers", {})
@ -76,3 +145,7 @@ class AuthenticationWeb:
return await self.request("POST", "/api/auth/role/assign-permissions", json=data) return await self.request("POST", "/api/auth/role/assign-permissions", json=data)
if __name__ == '__main__':
authentication = AuthenticationWeb()
user = authentication.create_temporary_user()
print(user)

View File

@ -0,0 +1,76 @@
import re
from time import sleep
import requests
from faker import Faker
TEMPORARY_EMAIL_DOMAIN = "https://api.mail.cx/api/v1"
def generate_email() -> str:
fake = Faker('en_US')
while True:
name = fake.name().replace(' ', '_')
if len(name) <= 10:
break
return f"{name}@nqmo.com"
def get_auth_email_token() -> str:
url = TEMPORARY_EMAIL_DOMAIN + "/auth/authorize_token"
headers = {
'accept': 'application/json',
'Authorization': 'Bearer undefined',
}
response = requests.post(url, headers=headers)
return str(response.json())
def get_mail_id(address, token):
url = TEMPORARY_EMAIL_DOMAIN + f"/mailbox/{address}"
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {token}',
}
response = requests.get(url, headers=headers)
body = response.json()
return body[0]['id'] if len(body) and len(body[0]['id']) > 0 else None
def get_auth_code(email):
# get token
token = get_auth_email_token()
print(f"token: {token}")
# Waiting for verification code email
id_ = None
for _ in range(30):
id_ = get_mail_id(email, token)
if id_ is not None:
break
sleep(1)
if id_ is None:
raise Exception(f"Could not get auth code for {email}")
# get code
url = TEMPORARY_EMAIL_DOMAIN + f'/mailbox/{email}/{id_}'
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {token}',
}
response = requests.get(url, headers=headers)
print(response.json())
# Regular matching captcha, here the regular expression matching captcha is changed to its own
captcha = re.search(r'The auth code is:\s+(\d+)', response.json()['body']['html'])
if captcha:
print("code:", captcha.group(1))
else:
print("Unable to find verification code")
return captcha.group(1)
if __name__ == '__main__':
email = generate_email()
code = get_auth_code(email)
print(code)