From 641281066cd16c83470379f00e8146a7d42afe86 Mon Sep 17 00:00:00 2001 From: icecheng Date: Mon, 21 Jul 2025 17:51:08 +0800 Subject: [PATCH] feat(role_management): Add pytest for auth services, and add api test case for role management --- .../tests/api_tests/__init__.py | 0 .../tests/api_tests/role/__init__.py | 0 .../tests/api_tests/role/conftest.py | 10 ++ .../tests/api_tests/role/test_create_role.py | 117 ++++++++++++++ .../tests/api_tests/role/test_delete_role.py | 31 ++++ .../tests/api_tests/role/test_query_role.py | 58 +++++++ .../tests/api_tests/role/test_update_role.py | 153 ++++++++++++++++++ .../tests/api_tests/siginin/__init__.py | 0 .../tests/api_tests/siginin/conftest.py | 10 ++ .../test_signin_with_email_and_password.py | 22 +++ apps/authentication/tests/base/__init__.py | 0 .../tests/base/authentication_web.py | 64 ++++++++ apps/authentication/tests/base/config.py | 3 + apps/authentication/tests/conftest.py | 10 ++ 14 files changed, 478 insertions(+) create mode 100644 apps/authentication/tests/api_tests/__init__.py create mode 100644 apps/authentication/tests/api_tests/role/__init__.py create mode 100644 apps/authentication/tests/api_tests/role/conftest.py create mode 100644 apps/authentication/tests/api_tests/role/test_create_role.py create mode 100644 apps/authentication/tests/api_tests/role/test_delete_role.py create mode 100644 apps/authentication/tests/api_tests/role/test_query_role.py create mode 100644 apps/authentication/tests/api_tests/role/test_update_role.py create mode 100644 apps/authentication/tests/api_tests/siginin/__init__.py create mode 100644 apps/authentication/tests/api_tests/siginin/conftest.py create mode 100644 apps/authentication/tests/api_tests/siginin/test_signin_with_email_and_password.py create mode 100644 apps/authentication/tests/base/__init__.py create mode 100644 apps/authentication/tests/base/authentication_web.py create mode 100644 apps/authentication/tests/base/config.py create mode 100644 apps/authentication/tests/conftest.py diff --git a/apps/authentication/tests/api_tests/__init__.py b/apps/authentication/tests/api_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/authentication/tests/api_tests/role/__init__.py b/apps/authentication/tests/api_tests/role/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/authentication/tests/api_tests/role/conftest.py b/apps/authentication/tests/api_tests/role/conftest.py new file mode 100644 index 0000000..f2aad5a --- /dev/null +++ b/apps/authentication/tests/api_tests/role/conftest.py @@ -0,0 +1,10 @@ +import pytest + +from tests.base.authentication_web import AuthenticationWeb + + +@pytest.fixture +def authentication_web()->AuthenticationWeb: + authentication_web = AuthenticationWeb() + authentication_web.login() + return authentication_web diff --git a/apps/authentication/tests/api_tests/role/test_create_role.py b/apps/authentication/tests/api_tests/role/test_create_role.py new file mode 100644 index 0000000..a52db06 --- /dev/null +++ b/apps/authentication/tests/api_tests/role/test_create_role.py @@ -0,0 +1,117 @@ +import pytest +import random +from tests.base.authentication_web import AuthenticationWeb + + +class TestCreateRole: + + @pytest.mark.asyncio + async def test_create_role_success(self, authentication_web: AuthenticationWeb): + """Test creating a role successfully with valid and unique role_key and role_name.""" + suffix = str(random.randint(10000, 99999)) + role_data = { + "role_key": f"test_role_key_success_{suffix}", + "role_name": f"Test Role Success {suffix}", + "role_description": "Role for testing success", + "role_level": 1 + } + response = await authentication_web.create_role(role_data) + assert response.status_code == 200 + json = response.json() + assert json["role_key"] == role_data["role_key"] + assert json["role_name"] == role_data["role_name"] + assert json["role_description"] == role_data["role_description"] + assert json["role_level"] == role_data["role_level"] + assert json["id"] is not None + assert json["created_at"] is not None + assert json["updated_at"] is not None + + @pytest.mark.asyncio + async def test_create_role_fail_duplicate_role_key(self, authentication_web: AuthenticationWeb): + """Test creating a role fails when role_key is duplicated.""" + suffix = str(random.randint(10000, 99999)) + role_data = { + "role_key": f"test_role_key_dup_{suffix}", + "role_name": f"Test Role DupKey {suffix}", + "role_description": "desc", + "role_level": 1 + } + await authentication_web.create_role(role_data) + role_data2 = { + "role_key": f"test_role_key_dup_{suffix}", + "role_name": f"Test Role DupKey2 {suffix}", + "role_description": "desc2", + "role_level": 2 + } + response = await authentication_web.create_role(role_data2) + assert response.status_code == 422 or response.status_code == 400 + + @pytest.mark.asyncio + async def test_create_role_fail_duplicate_role_name(self, authentication_web: AuthenticationWeb): + """Test creating a role fails when role_name is duplicated.""" + suffix = str(random.randint(10000, 99999)) + role_data = { + "role_key": f"test_role_key_dupname1_{suffix}", + "role_name": f"Test Role DupName {suffix}", + "role_description": "desc", + "role_level": 1 + } + await authentication_web.create_role(role_data) + role_data2 = { + "role_key": f"test_role_key_dupname2_{suffix}", + "role_name": f"Test Role DupName {suffix}", + "role_description": "desc2", + "role_level": 2 + } + response = await authentication_web.create_role(role_data2) + assert response.status_code == 422 or response.status_code == 400 + + @pytest.mark.asyncio + async def test_create_role_fail_empty_role_key(self, authentication_web: AuthenticationWeb): + """Test creating a role fails when role_key is empty.""" + suffix = str(random.randint(10000, 99999)) + role_data = { + "role_key": "", + "role_name": f"Test Role EmptyKey {suffix}", + "role_description": "desc", + "role_level": 1 + } + response = await authentication_web.create_role(role_data) + assert response.status_code == 422 or response.status_code == 400 + + @pytest.mark.asyncio + async def test_create_role_fail_empty_role_name(self, authentication_web: AuthenticationWeb): + """Test creating a role fails when role_name is empty.""" + suffix = str(random.randint(10000, 99999)) + role_data = { + "role_key": f"test_role_key_emptyname_{suffix}", + "role_name": "", + "role_description": "desc", + "role_level": 1 + } + response = await authentication_web.create_role(role_data) + assert response.status_code == 422 or response.status_code == 400 + + @pytest.mark.asyncio + async def test_create_role_success_empty_description(self, authentication_web: AuthenticationWeb): + """Test creating a role successfully when role_description is None (optional field).""" + suffix = str(random.randint(10000, 99999)) + role_data = { + "role_key": f"test_role_key_emptydesc_{suffix}", + "role_name": f"Test Role EmptyDesc {suffix}", + "role_description": None, + "role_level": 1 + } + response = await authentication_web.create_role(role_data) + assert response.status_code == 200 + json = response.json() + assert json["role_key"] == role_data["role_key"] + assert json["role_name"] == role_data["role_name"] + assert json["role_description"] is None or json["role_description"] == "" + assert json["role_level"] == role_data["role_level"] + + + + +if __name__ == '__main__': + pytest.main([__file__]) diff --git a/apps/authentication/tests/api_tests/role/test_delete_role.py b/apps/authentication/tests/api_tests/role/test_delete_role.py new file mode 100644 index 0000000..b6f6918 --- /dev/null +++ b/apps/authentication/tests/api_tests/role/test_delete_role.py @@ -0,0 +1,31 @@ +import pytest +import random +from tests.base.authentication_web import AuthenticationWeb + + +class TestDeleteRole: + + @pytest.mark.asyncio + async def test_delete_role_success(self, authentication_web: AuthenticationWeb): + """Test deleting a role successfully.""" + suffix = str(random.randint(10000, 99999)) + role = await authentication_web.create_role({ + "role_key": f"delrole_{suffix}", + "role_name": f"delrole_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + role_id = role.json()["id"] + resp = await authentication_web.delete_role(role_data={"role_id": role_id}) + assert resp.status_code == 200 + assert resp.json()["success"] is True + + @pytest.mark.asyncio + async def test_delete_role_fail_not_found(self, authentication_web: AuthenticationWeb): + """Test deleting a role fails when role_id does not exist.""" + resp = await authentication_web.delete_role(role_data={"role_id": "000000000000000000000000"}) + assert resp.status_code == 422 or resp.status_code == 400 + + +if __name__ == '__main__': + pytest.main([__file__]) diff --git a/apps/authentication/tests/api_tests/role/test_query_role.py b/apps/authentication/tests/api_tests/role/test_query_role.py new file mode 100644 index 0000000..574b138 --- /dev/null +++ b/apps/authentication/tests/api_tests/role/test_query_role.py @@ -0,0 +1,58 @@ +import random + +import pytest + +from tests.base.authentication_web import AuthenticationWeb + + +class TestQueryRole: + + @pytest.mark.asyncio + async def test_query_all_roles(self, authentication_web: AuthenticationWeb): + """Test querying all roles returns a list.""" + resp = await authentication_web.query_roles(params={}) + assert resp.status_code == 200 + json = resp.json() + assert "items" in json + assert "total" in json + + @pytest.mark.asyncio + async def test_query_roles_by_key(self, authentication_web: AuthenticationWeb): + """Test querying roles by role_key with fuzzy search.""" + suffix = str(random.randint(10000, 99999)) + await authentication_web.create_role({ + "role_key": f"querykey_{suffix}", + "role_name": f"querykey_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + resp = await authentication_web.query_roles(params={"role_key": f"querykey_{suffix}"}) + assert resp.status_code == 200 + json = resp.json() + assert any(f"querykey_{suffix}" in item["role_key"] for item in json["items"]) + + @pytest.mark.asyncio + async def test_query_roles_by_name(self, authentication_web: AuthenticationWeb): + """Test querying roles by role_name with fuzzy search.""" + suffix = str(random.randint(10000, 99999)) + await authentication_web.create_role({ + "role_key": f"queryname_{suffix}", + "role_name": f"queryname_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + resp = await authentication_web.query_roles(params={"role_name": f"queryname_{suffix}"}) + assert resp.status_code == 200 + json = resp.json() + assert any(f"queryname_{suffix}" in item["role_name"] for item in json["items"]) + + @pytest.mark.asyncio + async def test_query_roles_pagination(self, authentication_web: AuthenticationWeb): + """Test querying roles with pagination.""" + resp = await authentication_web.query_roles(params={"page": 1, "page_size": 2}) + assert resp.status_code == 200 + json = resp.json() + assert "items" in json + assert "total" in json + assert "page" in json + assert "page_size" in json diff --git a/apps/authentication/tests/api_tests/role/test_update_role.py b/apps/authentication/tests/api_tests/role/test_update_role.py new file mode 100644 index 0000000..848d7e0 --- /dev/null +++ b/apps/authentication/tests/api_tests/role/test_update_role.py @@ -0,0 +1,153 @@ +import pytest +import random +from tests.base.authentication_web import AuthenticationWeb + + +class TestUpdateRole: + + @pytest.mark.asyncio + async def test_update_role_success(self, authentication_web: AuthenticationWeb): + """Test updating a role successfully with valid and unique fields.""" + suffix = str(random.randint(10000, 99999)) + # create firstly + role_data = { + "role_key": f"update_role_key_{suffix}", + "role_name": f"Update Role {suffix}", + "role_description": "desc", + "role_level": 1 + } + create_resp = await authentication_web.create_role(role_data) + role_id = create_resp.json()["id"] + # update + update_data = { + "role_id": role_id, + "role_key": f"update_role_key_{suffix}_new", + "role_name": f"Update Role {suffix} New", + "role_description": "desc new", + "role_level": 2 + } + resp = await authentication_web.update_role(role_data=update_data) + assert resp.status_code == 200 + json = resp.json() + assert json["role_key"] == update_data["role_key"] + assert json["role_name"] == update_data["role_name"] + assert json["role_description"] == update_data["role_description"] + assert json["role_level"] == update_data["role_level"] + + @pytest.mark.asyncio + async def test_update_role_fail_not_found(self, authentication_web: AuthenticationWeb): + """Test updating a role fails when role_id does not exist.""" + suffix = str(random.randint(10000, 99999)) + update_data = { + "role_id": "000000000000000000000000", # 不存在的ObjectId + "role_key": f"notfound_key_{suffix}", + "role_name": f"NotFound Role {suffix}", + "role_description": "desc", + "role_level": 1 + } + resp = await authentication_web.update_role(role_data=update_data) + assert resp.status_code == 422 or resp.status_code == 400 + + @pytest.mark.asyncio + async def test_update_role_fail_duplicate_key(self, authentication_web: AuthenticationWeb): + """Test updating a role fails when role_key is duplicated.""" + suffix = str(random.randint(10000, 99999)) + # create two roles + role1 = await authentication_web.create_role({ + "role_key": f"dupkey1_{suffix}", + "role_name": f"dupkey1_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + role2 = await authentication_web.create_role({ + "role_key": f"dupkey2_{suffix}", + "role_name": f"dupkey2_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + role2_id = role2.json()["id"] + # modify role_key + update_data = { + "role_id": role2_id, + "role_key": f"dupkey1_{suffix}", + "role_name": f"dupkey2_{suffix}_new", + "role_description": "desc", + "role_level": 2 + } + resp = await authentication_web.update_role(role_data=update_data) + assert resp.status_code == 422 or resp.status_code == 400 + + @pytest.mark.asyncio + async def test_update_role_fail_duplicate_name(self, authentication_web: AuthenticationWeb): + """Test updating a role fails when role_name is duplicated.""" + suffix = str(random.randint(10000, 99999)) + # create two roles + role1 = await authentication_web.create_role({ + "role_key": f"dupname1_{suffix}", + "role_name": f"dupname1_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + role2 = await authentication_web.create_role({ + "role_key": f"dupname2_{suffix}", + "role_name": f"dupname2_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + role2_id = role2.json()["id"] + # modify role name + update_data = { + "role_id": role2_id, + "role_key": f"dupname2_{suffix}_new", + "role_name": f"dupname1_{suffix}", + "role_description": "desc", + "role_level": 2 + } + resp = await authentication_web.update_role(role_data=update_data) + assert resp.status_code == 422 or resp.status_code == 400 + + @pytest.mark.asyncio + async def test_update_role_fail_empty_key(self, authentication_web: AuthenticationWeb): + """Test updating a role fails when role_key is empty.""" + suffix = str(random.randint(10000, 99999)) + role = await authentication_web.create_role({ + "role_key": f"emptykey_{suffix}", + "role_name": f"emptykey_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + role_id = role.json()["id"] + update_data = { + "role_id": role_id, + "role_key": "", + "role_name": f"emptykey_{suffix}_new", + "role_description": "desc", + "role_level": 2 + } + resp = await authentication_web.update_role(role_data=update_data) + assert resp.status_code == 422 or resp.status_code == 400 + + @pytest.mark.asyncio + async def test_update_role_fail_empty_name(self, authentication_web: AuthenticationWeb): + """Test updating a role fails when role_name is empty.""" + suffix = str(random.randint(10000, 99999)) + role = await authentication_web.create_role({ + "role_key": f"emptyname_{suffix}", + "role_name": f"emptyname_{suffix}", + "role_description": "desc", + "role_level": 1 + }) + role_id = role.json()["id"] + update_data = { + "role_id": role_id, + "role_key": f"emptyname_{suffix}_new", + "role_name": "", + "role_description": "desc", + "role_level": 2 + } + resp = await authentication_web.update_role(role_data=update_data) + assert resp.status_code == 422 or resp.status_code == 400 + + +if __name__ == '__main__': + pytest.main([__file__]) diff --git a/apps/authentication/tests/api_tests/siginin/__init__.py b/apps/authentication/tests/api_tests/siginin/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/authentication/tests/api_tests/siginin/conftest.py b/apps/authentication/tests/api_tests/siginin/conftest.py new file mode 100644 index 0000000..5178696 --- /dev/null +++ b/apps/authentication/tests/api_tests/siginin/conftest.py @@ -0,0 +1,10 @@ +import pytest + +from tests.base.authentication_web import AuthenticationWeb + + +@pytest.fixture +def authentication_web() -> AuthenticationWeb: + authentication_web = AuthenticationWeb() + authentication_web.login() + return authentication_web diff --git a/apps/authentication/tests/api_tests/siginin/test_signin_with_email_and_password.py b/apps/authentication/tests/api_tests/siginin/test_signin_with_email_and_password.py new file mode 100644 index 0000000..29454d1 --- /dev/null +++ b/apps/authentication/tests/api_tests/siginin/test_signin_with_email_and_password.py @@ -0,0 +1,22 @@ +import pytest + +from tests.base.authentication_web import AuthenticationWeb + + +class TestSignInWithEmailAndPassword: + + @pytest.mark.asyncio + async def test_sign_in_with_email_and_password(self, authentication_web: AuthenticationWeb): + response = authentication_web.do_login() + assert response.status_code == 200 + json = response.json() + assert json["access_token"] is not None, "access_token should not be None" + assert json["refresh_token"] is not None, "refresh_token should not be None" + assert json["expires_in"] is not None, "expires_in should not be None" + assert json["identity"] is not None, "identity should not be None" + assert json["role_names"] is not None, "role_names should not be None" + assert json["user_permissions"] is not None, "user_permissions should not be None" + + +if __name__ == '__main__': + pytest.main([__file__]) diff --git a/apps/authentication/tests/base/__init__.py b/apps/authentication/tests/base/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/authentication/tests/base/authentication_web.py b/apps/authentication/tests/base/authentication_web.py new file mode 100644 index 0000000..4ce5d01 --- /dev/null +++ b/apps/authentication/tests/base/authentication_web.py @@ -0,0 +1,64 @@ +import httpx +from typing import Optional +from tests.base.config import USER_EMAIL, USER_PASSWORD, BASE_URL + + +class AuthenticationWeb: + def __init__(self, user_email: str = USER_EMAIL, password: str = USER_PASSWORD, base_url: str = BASE_URL): + self.user_email = user_email + self.password = password + self.base_url = base_url + self.token: Optional[str] = None + + def login(self): + """Login and store JWT token""" + with httpx.Client(base_url=self.base_url) as client: + resp = self.do_login(self.user_email, self.password) + self.token = resp.json()["access_token"] + return resp + + def do_login(self, user_email: str = USER_EMAIL, password: str = USER_PASSWORD): + """Login and store JWT token""" + with httpx.Client(base_url=self.base_url) as client: + resp = client.post("/api/auth/signin/signin-with-email-and-password", json={ + "email": user_email, + "password": password + }) + return resp + + async def request(self, method: str, url: str, **kwargs): + """Send authenticated request""" + headers = kwargs.pop("headers", {}) + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + async with httpx.AsyncClient(base_url=self.base_url) as client: + resp = await client.request(method, url, headers=headers, **kwargs) + return resp + + async def create_role(self, role_data: dict): + """Create a new role via API""" + return await self.request("POST", "/api/auth/role/create", json=role_data) + + async def delete_role(self, role_data: dict): + """Delete role via API""" + return await self.request("POST", "/api/auth/role/delete", json=role_data) + + async def update_role(self, role_data: dict): + """Update role via API""" + return await self.request("POST", "/api/auth/role/update", json=role_data) + + async def query_roles(self, params: dict = None): + """Query roles via API""" + if params is None: + params = {} + return await self.request("POST", "/api/auth/role/query", json=params) + + async def create_permission(self, perm_data: dict): + """Create a new permission via API""" + return await self.request("POST", "/api/auth/permission/create", json=perm_data) + + async def query_permissions(self, params: dict = None): + """Query permissions via API""" + return await self.request("GET", "/api/auth/permission/query", params=params) + + diff --git a/apps/authentication/tests/base/config.py b/apps/authentication/tests/base/config.py new file mode 100644 index 0000000..26b5455 --- /dev/null +++ b/apps/authentication/tests/base/config.py @@ -0,0 +1,3 @@ +USER_EMAIL = "icecheng@mathmast.com" +USER_PASSWORD = "@Cwb1535145760" +BASE_URL = "http://localhost:8103" \ No newline at end of file diff --git a/apps/authentication/tests/conftest.py b/apps/authentication/tests/conftest.py new file mode 100644 index 0000000..5178696 --- /dev/null +++ b/apps/authentication/tests/conftest.py @@ -0,0 +1,10 @@ +import pytest + +from tests.base.authentication_web import AuthenticationWeb + + +@pytest.fixture +def authentication_web() -> AuthenticationWeb: + authentication_web = AuthenticationWeb() + authentication_web.login() + return authentication_web