freeleaps-service-hub/apps/authentication/backend/services/code_depot/code_depot_service.py

188 lines
6.5 KiB
Python

from common.log.module_logger import ModuleLogger
from typing import List, Dict, Optional
from common.config.app_settings import app_settings
import httpx
class CodeDepotService:
def __init__(self) -> None:
self.depot_endpoint = (
app_settings.DEVSVC_WEBAPI_URL_BASE.rstrip("/") + "/depot/"
)
self.module_logger = ModuleLogger(sender_id="CodeDepotService")
async def check_depot_name_availabe(self, code_depot_name: str) -> bool:
"""Return True if the depot name is available, otherwise return False
Parameters:
code_depot_name (str): the name of the code depot
Returns:
bool: True if the depot name is availabe, otherwise return False
"""
api_url = self.depot_endpoint + "check-depot-name-available/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json()
async def create_code_depot(self, product_id, code_depot_name) -> Optional[str]:
"""Create a new git code depot
Parameters:
product_id (str): the id of the product
code_depot_name (str): the name of the code depot
Returns:
str: return code depot id if it's created successfully, else return None
"""
api_url = self.depot_endpoint + "create-code-depot"
async with httpx.AsyncClient() as client:
response = await client.post(
api_url,
json={"product_id": product_id, "code_depot_name": code_depot_name},
)
return response.json()
async def get_depot_ssh_url(self, code_depot_name: str) -> str:
"""Return the ssh url of the code depot
Parameters:
depot_name (str): the name of the depot
Returns:
str: the ssh url of the code depot
"""
api_url = self.depot_endpoint + "get-depot-ssh-url/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json()
async def get_depot_http_url(self, code_depot_name: str) -> str:
"""Return the http url of the code depot
Parameters:
depot_name (str): the name of the depot
Returns:
str: the http url of the code depot
"""
api_url = self.depot_endpoint + "get-depot-http-url/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json()
async def get_depot_http_url_with_user_name(
self, code_depot_name: str, user_name: str
) -> str:
"""Return the http url of the code depot
Parameters:
depot_name (str): the name of the depot
Returns:
str: the http url of the code depot
"""
api_url = self.depot_endpoint + "get-depot-http-url-with-user-name/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name + "/" + user_name)
return response.json()
async def get_depot_users(self, code_depot_name: str) -> List[str]:
"""Return list of user names have permission to access the depot
Parameters:
depot_name (str): the name of the depot
Returns:
list: list of user names
"""
api_url = self.depot_endpoint + "get-depot-users/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json()
async def update_depot_user_password(self, user_name: str, password: str) -> bool:
"""Update the password of the user in Gitea
Parameters:
user_name (str): the name of the user
password (str): the new password of the user
Returns:
bool: True if operations succeed, otherwise return False
"""
api_url = self.depot_endpoint + "update-depot-password-for-user"
async with httpx.AsyncClient() as client:
response = await client.post(
api_url, json={"user_name": user_name, "password": password}
)
return response.json()
async def create_depot_user(
self, user_name: str, password: str, email: str
) -> bool:
"""Create a new user in Gitea
Parameters:
user_name (str): the name of the user
password (str): the password of the user
email (str): email address of the user
Returns:
bool: True if operations succeed, otherwise return False
"""
api_url = self.depot_endpoint + "create-depot-user"
async with httpx.AsyncClient() as client:
response = await client.post(
api_url,
json={"user_name": user_name, "password": password, "email": email},
)
return response.json()
async def grant_user_depot_access(
self, user_name: str, code_depot_name: str
) -> bool:
"""Grant user access to the code depot in Gitea
Parameters:
user_name (str): the name of the user
code_depot_name (str): the name of the code depot
Returns:
bool: True if operations succeed, otherwise return False
"""
api_url = self.depot_endpoint + "grant-user-depot-access"
async with httpx.AsyncClient() as client:
response = await client.post(
api_url,
json={"user_name": user_name, "code_depot_name": code_depot_name},
)
return response.json()
async def generate_statistic_result(
self, code_depot_name: str
) -> Optional[Dict[str, any]]:
"""Call Gitea API and collect statistic result of the repository.
Args:
code_depot_name (str): the name of the code depot
Returns:
Dict[str, any]: statistic result
"""
api_url = self.depot_endpoint + "generate-statistic-result/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json()
async def fetch_code_depot(self, code_depot_id: str) -> Optional[Dict[str, any]]:
api_url = self.depot_endpoint + "fetch-code-depot/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_id)
return response.json()