87 lines
3.5 KiB
Python
87 lines
3.5 KiB
Python
from backend.business.payment_manager import PaymentManager
|
|
from backend.business.stripe_manager import StripeManager
|
|
from typing import Dict, Optional, Tuple
|
|
from decimal import Decimal
|
|
|
|
class PaymentHub:
|
|
def __init__(self) -> None:
|
|
self.payment_manager = PaymentManager()
|
|
self.stripe_manager = StripeManager()
|
|
return
|
|
|
|
async def fetch_wechat_qr_code(self, project_id: str) -> Optional[Dict[str, any]]:
|
|
return await self.payment_manager.fetch_wechat_qr_code(project_id)
|
|
|
|
async def fetch_stripe_account_id(self, user_id: str) -> Optional[str]:
|
|
return await self.payment_manager.fetch_stripe_account_id(user_id)
|
|
|
|
async def create_stripe_account(self) -> Optional[str]:
|
|
return await self.stripe_manager.create_stripe_account()
|
|
|
|
async def create_account_link(self, account_id: str, link_type: str = "account_onboarding") -> Optional[str]:
|
|
return await self.stripe_manager.create_account_link(account_id, link_type)
|
|
|
|
async def can_account_receive_payments(self, account_id: str) -> bool:
|
|
return await self.stripe_manager.can_account_receive_payments(account_id)
|
|
|
|
async def fetch_transaction_by_id(
|
|
self, transaction_id: str
|
|
) -> Optional[Dict[str, any]]:
|
|
return await self.stripe_manager.fetch_transaction_by_id(transaction_id)
|
|
|
|
async def fetch_transaction_by_session_id(
|
|
self, session_id: str
|
|
) -> Optional[Dict[str, any]]:
|
|
return await self.stripe_manager.fetch_transaction_by_session_id(session_id)
|
|
|
|
async def fetch_stripe_transaction_for_milestone(
|
|
self, project_id: str, milestone_index: int
|
|
) -> Optional[Dict[str, any]]:
|
|
return await self.stripe_manager.fetch_stripe_transaction_for_milestone(
|
|
project_id, milestone_index
|
|
)
|
|
|
|
async def create_stripe_transaction_for_milestone(
|
|
self,
|
|
project_id: str,
|
|
milestone_index: int,
|
|
currency: str,
|
|
expected_payment: Decimal,
|
|
from_user: str,
|
|
to_user: str,
|
|
to_stripe_account_id: str,
|
|
) -> Optional[str]:
|
|
return await self.stripe_manager.create_stripe_transaction_for_milestone(
|
|
project_id,
|
|
milestone_index,
|
|
currency,
|
|
expected_payment,
|
|
from_user,
|
|
to_user,
|
|
to_stripe_account_id,
|
|
)
|
|
|
|
async def create_payment_link(self, transaction_id: str) -> Optional[str]:
|
|
return await self.stripe_manager.create_payment_link(transaction_id)
|
|
|
|
async def create_checkout_session(
|
|
self, transaction_id: str
|
|
) -> Tuple[Optional[str], Optional[str]]:
|
|
return await self.stripe_manager.create_checkout_session(transaction_id)
|
|
|
|
async def fetch_payment_link(self, transaction_id: str) -> Optional[str]:
|
|
return await self.stripe_manager.fetch_payment_link(transaction_id)
|
|
|
|
async def fetch_checkout_session_id(self, transaction_id: str) -> Optional[str]:
|
|
return await self.stripe_manager.fetch_checkout_session_id(transaction_id)
|
|
|
|
async def fetch_checkout_session_url(self, transaction_id: str) -> Optional[str]:
|
|
return await self.stripe_manager.fetch_checkout_session_url(transaction_id)
|
|
|
|
async def invoke_checkout_session_webhook(
|
|
self, payload: str, stripe_signature: str
|
|
) -> Tuple[bool, Optional[str], Optional[str]]:
|
|
return await self.stripe_manager.invoke_checkout_session_webhook(
|
|
payload, stripe_signature
|
|
)
|