From 9dcd27bc8a558eeccd8dce663808a47434339d8c Mon Sep 17 00:00:00 2001 From: icecheng Date: Tue, 22 Jul 2025 11:30:16 +0800 Subject: [PATCH] feat(role_management): Add logic for creating temporary users to apitest. --- .../tests/api_tests/permission/conftest.py | 14 +++- .../permission/test_create_permission.py | 2 +- .../tests/base/authentication_web.py | 73 ++++++++++++++++++ apps/authentication/tests/util/__init__.py | 0 .../tests/util/temporary_email.py | 76 +++++++++++++++++++ 5 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 apps/authentication/tests/util/__init__.py create mode 100644 apps/authentication/tests/util/temporary_email.py diff --git a/apps/authentication/tests/api_tests/permission/conftest.py b/apps/authentication/tests/api_tests/permission/conftest.py index f2aad5a..fbf1fa7 100644 --- a/apps/authentication/tests/api_tests/permission/conftest.py +++ b/apps/authentication/tests/api_tests/permission/conftest.py @@ -3,8 +3,18 @@ import pytest from tests.base.authentication_web import AuthenticationWeb -@pytest.fixture -def authentication_web()->AuthenticationWeb: +@pytest.fixture(scope="session") +def authentication_web() -> AuthenticationWeb: authentication_web = AuthenticationWeb() authentication_web.login() return authentication_web + + +@pytest.fixture(scope="session") +def authentication_web_of_temp_user1() -> AuthenticationWeb: + authentication_web = AuthenticationWeb() + user = authentication_web.create_temporary_user() + authentication_web.user_email = user["email"] + authentication_web.password = user["password"] + authentication_web.login() + return authentication_web diff --git a/apps/authentication/tests/api_tests/permission/test_create_permission.py b/apps/authentication/tests/api_tests/permission/test_create_permission.py index 783a341..08a7e03 100644 --- a/apps/authentication/tests/api_tests/permission/test_create_permission.py +++ b/apps/authentication/tests/api_tests/permission/test_create_permission.py @@ -101,4 +101,4 @@ class TestCreatePermission: if __name__ == '__main__': - pytest.main([__file__]) \ No newline at end of file + pytest.main([__file__]) diff --git a/apps/authentication/tests/base/authentication_web.py b/apps/authentication/tests/base/authentication_web.py index e0da081..725aa48 100644 --- a/apps/authentication/tests/base/authentication_web.py +++ b/apps/authentication/tests/base/authentication_web.py @@ -1,6 +1,9 @@ +import asyncio + import httpx from typing import Optional from tests.base.config import USER_EMAIL, USER_PASSWORD, BASE_URL +from tests.util.temporary_email import * class AuthenticationWeb: @@ -10,6 +13,63 @@ class AuthenticationWeb: self.base_url = base_url self.token: Optional[str] = None + def create_temporary_user(self) -> dict[str, str]: + """Create a temporary user.""" + # generate temporary user email + email = generate_email() + print("temporary user email:", email) + # call try-signin-with-email api + response1 = self.try_signin_with_email(params={"email": email, "host": self.base_url}) + print("try_signin_with_email", response1.json()) + # query auth code + auth_code = get_auth_code(email) + print("temporary user auth code:", auth_code) + response2 = self.signin_with_email_and_code( + params={"email": email, "code": auth_code, "host": self.base_url}) + print("signin_with_email_and_code", response2.json()) + access_token = response2.json()["access_token"] + + response3 = self.update_new_user_flid(token=access_token, params={'flid': response2.json()['flid']}) + print("update_new_user_flid", response3.json()) + + password = "Kdwy12#$" + # set password + response4 = self.update_user_password(token=access_token, params={ + 'password': password, + 'password2': password + }) + print("update_user_password", response4.json()) + return { + "email": email, + "password": password, + } + + def update_new_user_flid(self, params: dict, token: str = None): + """Update the user's FLID.""" + if token is None: + token = self.token + headers = {"Authorization": f"Bearer {token}"} + with httpx.Client(base_url=self.base_url) as client: + resp = client.request("POST", "/api/auth/signin/update-new-user-flid", headers=headers, json=params) + return resp + + def update_user_password(self, params: dict, token: str = None): + """Update the user's password.""" + if token is None: + token = self.token + headers = {"Authorization": f"Bearer {token}"} + with httpx.Client(base_url=self.base_url) as client: + resp = client.request("POST", "/api/auth/signin/update-user-password", headers=headers, json=params) + return resp + + def try_signin_with_email(self, params): + """try signin with email.""" + return self.request_sync("POST", "/api/auth/signin/try-signin-with-email", json=params) + + def signin_with_email_and_code(self, params): + """try signin with email and code.""" + return self.request_sync("POST", "/api/auth/signin/signin-with-email-and-code", json=params) + def login(self): """Login and store JWT token""" with httpx.Client(base_url=self.base_url) as client: @@ -26,6 +86,15 @@ class AuthenticationWeb: }) return resp + def request_sync(self, method: str, url: str, **kwargs): + """Send authenticated request""" + headers = kwargs.pop("headers", {}) + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + with httpx.Client(base_url=self.base_url) as client: + resp = client.request(method, url, headers=headers, **kwargs) + return resp + async def request(self, method: str, url: str, **kwargs): """Send authenticated request""" headers = kwargs.pop("headers", {}) @@ -76,3 +145,7 @@ class AuthenticationWeb: return await self.request("POST", "/api/auth/role/assign-permissions", json=data) +if __name__ == '__main__': + authentication = AuthenticationWeb() + user = authentication.create_temporary_user() + print(user) diff --git a/apps/authentication/tests/util/__init__.py b/apps/authentication/tests/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/authentication/tests/util/temporary_email.py b/apps/authentication/tests/util/temporary_email.py new file mode 100644 index 0000000..79d8e47 --- /dev/null +++ b/apps/authentication/tests/util/temporary_email.py @@ -0,0 +1,76 @@ +import re +from time import sleep + +import requests +from faker import Faker + +TEMPORARY_EMAIL_DOMAIN = "https://api.mail.cx/api/v1" + + +def generate_email() -> str: + fake = Faker('en_US') + while True: + name = fake.name().replace(' ', '_') + if len(name) <= 10: + break + return f"{name}@nqmo.com" + + +def get_auth_email_token() -> str: + url = TEMPORARY_EMAIL_DOMAIN + "/auth/authorize_token" + headers = { + 'accept': 'application/json', + 'Authorization': 'Bearer undefined', + } + response = requests.post(url, headers=headers) + return str(response.json()) + + +def get_mail_id(address, token): + url = TEMPORARY_EMAIL_DOMAIN + f"/mailbox/{address}" + headers = { + 'accept': 'application/json', + 'Authorization': f'Bearer {token}', + } + response = requests.get(url, headers=headers) + body = response.json() + return body[0]['id'] if len(body) and len(body[0]['id']) > 0 else None + + +def get_auth_code(email): + # get token + token = get_auth_email_token() + print(f"token: {token}") + + # Waiting for verification code email + id_ = None + for _ in range(30): + id_ = get_mail_id(email, token) + if id_ is not None: + break + sleep(1) + if id_ is None: + raise Exception(f"Could not get auth code for {email}") + # get code + url = TEMPORARY_EMAIL_DOMAIN + f'/mailbox/{email}/{id_}' + headers = { + 'accept': 'application/json', + 'Authorization': f'Bearer {token}', + } + + response = requests.get(url, headers=headers) + print(response.json()) + + # Regular matching captcha, here the regular expression matching captcha is changed to its own + captcha = re.search(r'The auth code is:\s+(\d+)', response.json()['body']['html']) + if captcha: + print("code:", captcha.group(1)) + else: + print("Unable to find verification code") + return captcha.group(1) + + +if __name__ == '__main__': + email = generate_email() + code = get_auth_code(email) + print(code)