Add retry logic in httpx request

This commit is contained in:
jetli 2024-12-24 18:37:48 +00:00
parent 363d399c26
commit bc49e1b60b

View File

@ -5,6 +5,38 @@ from typing import List, Dict, Optional
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
import httpx import httpx
import asyncio
async def fetch_with_retry(url, method="GET", retries=3, backoff=1.0, **kwargs):
"""
A generic function for making HTTP requests with retry logic.
Parameters:
url (str): The endpoint URL.
method (str): HTTP method ('GET', 'POST', etc.).
retries (int): Number of retry attempts.
backoff (float): Backoff time in seconds.
kwargs: Additional arguments for the request.
Returns:
httpx.Response: The response object.
"""
for attempt in range(retries):
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
if method.upper() == "GET":
response = await client.get(url, **kwargs)
elif method.upper() == "POST":
response = await client.post(url, **kwargs)
response.raise_for_status() # Check for HTTP errors
return response
except (httpx.ReadTimeout, httpx.RequestError) as exc:
if attempt < retries - 1:
await asyncio.sleep(backoff * (2**attempt)) # Exponential backoff
continue
else:
raise exc
class CodeDepotService: class CodeDepotService:
@ -12,133 +44,66 @@ class CodeDepotService:
self.depot_endpoint = ( self.depot_endpoint = (
app_settings.DEVSVC_WEBAPI_URL_BASE.rstrip("/") + "/depot/" app_settings.DEVSVC_WEBAPI_URL_BASE.rstrip("/") + "/depot/"
) )
self.module_logger = ModuleLogger(sender_id="CodeDepotService") self.module_logger = ModuleLogger(sender_id="CodeDepotService")
async def check_depot_name_availabe(self, code_depot_name: str) -> bool: async def check_depot_name_availabe(self, code_depot_name: str) -> bool:
"""Return True if the depot name is available, otherwise return False api_url = self.depot_endpoint + "check-depot-name-available/" + code_depot_name
response = await fetch_with_retry(api_url)
Parameters:
code_depot_name (str): the name of the code depot
Returns:
bool: True if the depot name is availabe, otherwise return False
"""
api_url = self.depot_endpoint + "check-depot-name-available/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json() return response.json()
async def create_code_depot(self, product_id, code_depot_name) -> Optional[str]: async def create_code_depot(self, product_id, code_depot_name) -> Optional[str]:
"""Create a new git code depot
Parameters:
product_id (str): the id of the product
code_depot_name (str): the name of the code depot
Returns:
str: return code depot id if it's created successfully, else return None
"""
api_url = self.depot_endpoint + "create-code-depot" api_url = self.depot_endpoint + "create-code-depot"
async with httpx.AsyncClient() as client: response = await fetch_with_retry(
response = await client.post(
api_url, api_url,
method="POST",
json={"product_id": product_id, "code_depot_name": code_depot_name}, json={"product_id": product_id, "code_depot_name": code_depot_name},
) )
return response.json() return response.json()
async def get_depot_ssh_url(self, code_depot_name: str) -> str: async def get_depot_ssh_url(self, code_depot_name: str) -> str:
"""Return the ssh url of the code depot api_url = self.depot_endpoint + "get-depot-ssh-url/" + code_depot_name
response = await fetch_with_retry(api_url)
Parameters:
depot_name (str): the name of the depot
Returns:
str: the ssh url of the code depot
"""
api_url = self.depot_endpoint + "get-depot-ssh-url/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json() return response.json()
async def get_depot_http_url(self, code_depot_name: str) -> str: async def get_depot_http_url(self, code_depot_name: str) -> str:
"""Return the http url of the code depot api_url = self.depot_endpoint + "get-depot-http-url/" + code_depot_name
response = await fetch_with_retry(api_url)
Parameters:
depot_name (str): the name of the depot
Returns:
str: the http url of the code depot
"""
api_url = self.depot_endpoint + "get-depot-http-url/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json() return response.json()
async def get_depot_http_url_with_user_name( async def get_depot_http_url_with_user_name(
self, code_depot_name: str, user_name: str self, code_depot_name: str, user_name: str
) -> str: ) -> str:
"""Return the http url of the code depot api_url = (
self.depot_endpoint
Parameters: + "get-depot-http-url-with-user-name/"
depot_name (str): the name of the depot + code_depot_name
+ "/"
Returns: + user_name
str: the http url of the code depot )
""" response = await fetch_with_retry(api_url)
api_url = self.depot_endpoint + "get-depot-http-url-with-user-name/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name + "/" + user_name)
return response.json() return response.json()
async def get_depot_users(self, code_depot_name: str) -> List[str]: async def get_depot_users(self, code_depot_name: str) -> List[str]:
"""Return list of user names have permission to access the depot api_url = self.depot_endpoint + "get-depot-users/" + code_depot_name
response = await fetch_with_retry(api_url)
Parameters:
depot_name (str): the name of the depot
Returns:
list: list of user names
"""
api_url = self.depot_endpoint + "get-depot-users/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json() return response.json()
async def update_depot_user_password(self, user_name: str, password: str) -> bool: async def update_depot_user_password(self, user_name: str, password: str) -> bool:
"""Update the password of the user in Gitea
Parameters:
user_name (str): the name of the user
password (str): the new password of the user
Returns:
bool: True if operations succeed, otherwise return False
"""
api_url = self.depot_endpoint + "update-depot-password-for-user" api_url = self.depot_endpoint + "update-depot-password-for-user"
async with httpx.AsyncClient() as client: response = await fetch_with_retry(
response = await client.post( api_url,
api_url, json={"user_name": user_name, "password": password} method="POST",
json={"user_name": user_name, "password": password},
) )
return response.json() return response.json()
async def create_depot_user( async def create_depot_user(
self, user_name: str, password: str, email: str self, user_name: str, password: str, email: str
) -> bool: ) -> bool:
"""Create a new user in Gitea
Parameters:
user_name (str): the name of the user
password (str): the password of the user
email (str): email address of the user
Returns:
bool: True if operations succeed, otherwise return False
"""
api_url = self.depot_endpoint + "create-depot-user" api_url = self.depot_endpoint + "create-depot-user"
async with httpx.AsyncClient() as client: response = await fetch_with_retry(
response = await client.post(
api_url, api_url,
method="POST",
json={"user_name": user_name, "password": password, "email": email}, json={"user_name": user_name, "password": password, "email": email},
) )
return response.json() return response.json()
@ -146,19 +111,10 @@ class CodeDepotService:
async def grant_user_depot_access( async def grant_user_depot_access(
self, user_name: str, code_depot_name: str self, user_name: str, code_depot_name: str
) -> bool: ) -> bool:
"""Grant user access to the code depot in Gitea
Parameters:
user_name (str): the name of the user
code_depot_name (str): the name of the code depot
Returns:
bool: True if operations succeed, otherwise return False
"""
api_url = self.depot_endpoint + "grant-user-depot-access" api_url = self.depot_endpoint + "grant-user-depot-access"
async with httpx.AsyncClient() as client: response = await fetch_with_retry(
response = await client.post(
api_url, api_url,
method="POST",
json={"user_name": user_name, "code_depot_name": code_depot_name}, json={"user_name": user_name, "code_depot_name": code_depot_name},
) )
return response.json() return response.json()
@ -166,22 +122,11 @@ class CodeDepotService:
async def generate_statistic_result( async def generate_statistic_result(
self, code_depot_name: str self, code_depot_name: str
) -> Optional[Dict[str, any]]: ) -> Optional[Dict[str, any]]:
"""Call Gitea API and collect statistic result of the repository. api_url = self.depot_endpoint + "generate-statistic-result/" + code_depot_name
response = await fetch_with_retry(api_url)
Args:
code_depot_name (str): the name of the code depot
Returns:
Dict[str, any]: statistic result
"""
api_url = self.depot_endpoint + "generate-statistic-result/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url + code_depot_name)
return response.json() return response.json()
async def fetch_code_depot(self, code_depot_id: str) -> Optional[Dict[str, any]]: async def fetch_code_depot(self, code_depot_id: str) -> Optional[Dict[str, any]]:
api_url = self.depot_endpoint + "fetch-code-depot/" api_url = self.depot_endpoint + "fetch-code-depot/" + code_depot_id
async with httpx.AsyncClient() as client: response = await fetch_with_retry(api_url)
response = await client.get(api_url + code_depot_id)
return response.json() return response.json()