freeleaps-service-hub/apps/authentication/tests/base/authentication_web.py

158 lines
6.4 KiB
Python

import asyncio
import httpx
from typing import Optional
from tests.base.config import USER_EMAIL, USER_PASSWORD, BASE_URL
from tests.util.temporary_email import *
class AuthenticationWeb:
def __init__(self, user_email: str = USER_EMAIL, password: str = USER_PASSWORD, base_url: str = BASE_URL):
self.user_email = user_email
self.password = password
self.user_id = None
self.base_url = base_url
self.token: Optional[str] = None
def create_temporary_user(self) -> dict[str, str]:
"""Create a temporary user."""
# generate temporary user email
email = generate_email()
print("temporary user email:", email)
# call try-signin-with-email api
response1 = self.try_signin_with_email(params={"email": email, "host": self.base_url})
print("try_signin_with_email", response1.json())
# query auth code
auth_code = get_auth_code(email)
print("temporary user auth code:", auth_code)
response2 = self.signin_with_email_and_code(
params={"email": email, "code": auth_code, "host": self.base_url})
print("signin_with_email_and_code", response2.json())
access_token = response2.json()["access_token"]
response3 = self.update_new_user_flid(token=access_token, params={'flid': response2.json()['flid']})
print("update_new_user_flid", response3.json())
password = "Kdwy12#$"
# set password
response4 = self.update_user_password(token=access_token, params={
'password': password,
'password2': password
})
print("update_user_password", response4.json())
return {
"email": email,
"password": password,
"user_id": response2.json()["identity"]
}
def update_new_user_flid(self, params: dict, token: str = None):
"""Update the user's FLID."""
if token is None:
token = self.token
headers = {"Authorization": f"Bearer {token}"}
with httpx.Client(base_url=self.base_url) as client:
resp = client.request("POST", "/api/auth/signin/update-new-user-flid", headers=headers, json=params)
return resp
def update_user_password(self, params: dict, token: str = None):
"""Update the user's password."""
if token is None:
token = self.token
headers = {"Authorization": f"Bearer {token}"}
with httpx.Client(base_url=self.base_url) as client:
resp = client.request("POST", "/api/auth/signin/update-user-password", headers=headers, json=params)
return resp
def try_signin_with_email(self, params):
"""try signin with email."""
return self.request_sync("POST", "/api/auth/signin/try-signin-with-email", json=params)
def signin_with_email_and_code(self, params):
"""try signin with email and code."""
return self.request_sync("POST", "/api/auth/signin/signin-with-email-and-code", json=params)
def login(self):
"""Login and store JWT token"""
with httpx.Client(base_url=self.base_url) as client:
resp = self.do_login(self.user_email, self.password)
self.token = resp.json()["access_token"]
return resp
def do_login(self, user_email: str = USER_EMAIL, password: str = USER_PASSWORD):
"""Login and store JWT token"""
with httpx.Client(base_url=self.base_url) as client:
resp = client.post("/api/auth/signin/signin-with-email-and-password", json={
"email": user_email,
"password": password
})
return resp
def request_sync(self, method: str, url: str, **kwargs):
"""Send authenticated request"""
headers = kwargs.pop("headers", {})
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
with httpx.Client(base_url=self.base_url) as client:
resp = client.request(method, url, headers=headers, **kwargs)
return resp
async def request(self, method: str, url: str, **kwargs):
"""Send authenticated request"""
headers = kwargs.pop("headers", {})
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
async with httpx.AsyncClient(base_url=self.base_url) as client:
resp = await client.request(method, url, headers=headers, **kwargs)
return resp
async def create_role(self, role_data: dict):
"""Create a new role via API"""
return await self.request("POST", "/api/auth/role/create", json=role_data)
async def delete_role(self, role_data: dict):
"""Delete role via API"""
return await self.request("POST", "/api/auth/role/delete", json=role_data)
async def update_role(self, role_data: dict):
"""Update role via API"""
return await self.request("POST", "/api/auth/role/update", json=role_data)
async def query_roles(self, params: dict = None):
"""Query roles via API"""
if params is None:
params = {}
return await self.request("POST", "/api/auth/role/query", json=params)
async def create_permission(self, perm_data: dict):
"""Create a new permission via API"""
return await self.request("POST", "/api/auth/permission/create", json=perm_data)
async def delete_permission(self, perm_data: dict):
"""Delete a permission via API"""
return await self.request("POST", "/api/auth/permission/delete", json=perm_data)
async def update_permission(self, perm_data: dict):
"""Update a permission via API"""
return await self.request("POST", "/api/auth/permission/update", json=perm_data)
async def query_permissions(self, params: dict = None):
"""Query permissions via API"""
if params is None:
params = {}
return await self.request("POST", "/api/auth/permission/query", json=params)
async def assign_permissions_to_role(self, data: dict):
"""Assign permissions to a role via API"""
return await self.request("POST", "/api/auth/role/assign-permissions", json=data)
async def assign_roles_to_user(self, data: dict):
"""Assign roles to a user via API"""
return await self.request("POST", "/api/auth/user/assign-roles", json=data)
if __name__ == '__main__':
authentication = AuthenticationWeb()
user = authentication.create_temporary_user()
print(user)