from datetime import datetime, timezone from typing import Dict, Optional, Tuple from backend.infra.payment.models import StripeTransactionDoc from backend.infra.payment.constants import TransactionStatus from common.config.app_settings import app_settings import stripe from stripe.error import SignatureVerificationError from common.log.module_logger import ModuleLogger from decimal import Decimal import json import httpx stripe.api_key = app_settings.STRIPE_API_KEY class StripeManager: def __init__(self) -> None: self.site_url_root = app_settings.SITE_URL_ROOT.rstrip("/") self.module_logger = ModuleLogger(sender_id="StripeManager") async def create_stripe_account(self) -> Optional[str]: account = stripe.Account.create(type="standard") return account.id async def create_account_link(self, account_id: str, link_type: str = "account_onboarding") -> Optional[str]: account = stripe.Account.retrieve(account_id) # For account_update, try to show dashboard if TOS is accepted if link_type == "account_update" and account.tos_acceptance.date: login_link = stripe.Account.create_login_link( account_id, redirect_url="{}/work".format(self.site_url_root) ) return login_link.url # Otherwise show onboarding account_link = stripe.AccountLink.create( account=account_id, refresh_url="{}/front-door".format(self.site_url_root), return_url="{}/work".format(self.site_url_root), type="account_onboarding", ) return account_link.url async def can_account_receive_payments(self, account_id: str) -> bool: account = stripe.Account.retrieve(account_id) if account.capabilities and account.capabilities["transfers"] == "active": return True else: return False async def __fetch_transaction_by_id( self, transaction_id: str ) -> Optional[StripeTransactionDoc]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction: return transaction return None async def fetch_transaction_by_id( self, transaction_id: str ) -> Optional[Dict[str, any]]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction: return transaction.model_dump() return None async def __fetch_transaction_by_session_id( self, session_id: str ) -> Optional[StripeTransactionDoc]: transactions = await StripeTransactionDoc.find( StripeTransactionDoc.stripe_checkout_session_id == session_id ).to_list() if len(transactions) > 1: await self.module_logger.log_error( error="More than one transaction found for session_id: {}".format( session_id ), properties={"session_id": session_id}, ) elif len(transactions) == 0: await self.module_logger.log_error( error="No transaction found for session_id: {}".format(session_id), properties={"session_id": session_id}, ) return None return transactions[0] async def fetch_transaction_by_session_id( self, session_id: str ) -> Optional[Dict[str, any]]: transaction = await StripeTransactionDoc.find( StripeTransactionDoc.stripe_checkout_session_id == session_id ).to_list() if len(transaction) > 1: await self.module_logger.log_error( error="More than one transaction found for session_id: {}".format( session_id ), properties={"session_id": session_id}, ) elif len(transaction) == 0: return None return transaction[0].model_dump() async def fetch_stripe_transaction_for_milestone( self, project_id: str, milestone_index: int ) -> Optional[Dict[str, any]]: transaction = await StripeTransactionDoc.find( StripeTransactionDoc.project_id == project_id, StripeTransactionDoc.milestone_index == milestone_index, ).to_list() if len(transaction) > 1: await self.module_logger.log_error( error="More than one transaction found for project_id: {} and milestone_index: {}".format( project_id, milestone_index ), properties={ "project_id": project_id, "milestone_index": milestone_index, }, ) elif len(transaction) == 0: return None return transaction[0].model_dump() async def create_stripe_transaction_for_milestone( self, project_id: str, milestone_index: int, currency: str, expected_payment: Decimal, from_user: str, to_user: str, to_stripe_account_id: str, ) -> Optional[str]: transactions = await StripeTransactionDoc.find( StripeTransactionDoc.project_id == project_id, StripeTransactionDoc.milestone_index == milestone_index, ).to_list() if len(transactions) == 0: transaction_doc = StripeTransactionDoc( project_id=project_id, milestone_index=milestone_index, currency=currency, unit_amount=int(expected_payment * 100), from_user=from_user, to_user=to_user, to_stripe_account_id=to_stripe_account_id, created_time=datetime.now(timezone.utc), updated_time=datetime.now(timezone.utc), status=TransactionStatus.PENDING, ) transaction = await transaction_doc.create() return transaction.id else: await self.module_logger.log_error( error="Transaction already exists for project_id: {} and milestone_index: {}".format( project_id, milestone_index ), properties={ "project_id": project_id, "milestone_index": milestone_index, }, ) res = transactions[0].id return res async def create_payment_link(self, transaction_id: str) -> Optional[str]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction: if transaction.stripe_payment_link: return transaction.stripe_payment_link if not transaction.stripe_product_id: product = stripe.Product.create( name="{}-{}".format( transaction.project_id, transaction.milestone_index ) ) transaction.stripe_product_id = product.id await transaction.save() if not transaction.stripe_price_id: price = stripe.Price.create( unit_amount=transaction.unit_amount, currency=transaction.currency, product=transaction.stripe_product_id, ) transaction.stripe_price_id = price.id await transaction.save() # Prepare payment link parameters with conditional application_fee_amount payment_link_params = { "line_items": [ { "price": transaction.stripe_price_id, "quantity": 1, } ], "on_behalf_of": transaction.to_stripe_account_id, "transfer_data": { "destination": transaction.to_stripe_account_id, }, } # Only add application_fee_amount if it's greater than 0 if transaction.application_fee_amount and transaction.application_fee_amount > 0: payment_link_params["application_fee_amount"] = transaction.application_fee_amount payment_link = stripe.PaymentLink.create(**payment_link_params) if payment_link: transaction.stripe_payment_link = payment_link.url transaction.updated_time = datetime.now(timezone.utc) await transaction.save() return payment_link.url else: return None async def create_checkout_session( self, transaction_id: str ) -> Tuple[Optional[str], Optional[str]]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction: if transaction.stripe_checkout_session_id: session = stripe.checkout.Session.retrieve( transaction.stripe_checkout_session_id ) expires_at_timestamp = session.expires_at expires_at_utc = datetime.fromtimestamp( expires_at_timestamp, tz=timezone.utc ) if datetime.now(timezone.utc) < expires_at_utc: return ( transaction.stripe_checkout_session_id, transaction.stripe_checkout_session_url, ) # Check connected account capabilities connected_account = stripe.Account.retrieve( transaction.to_stripe_account_id ) # if ( # connected_account.capabilities.get("card_payments") != "active" # or connected_account.capabilities.get("transfers") != "active" # ): # raise Exception( # f"Connected account {transaction.to_stripe_account_id} lacks required capabilities." # ) if not transaction.stripe_product_id: product = stripe.Product.create( name="{}-{}".format( transaction.project_id, transaction.milestone_index ) ) transaction.stripe_product_id = product.id await transaction.save() if not transaction.stripe_price_id: price = stripe.Price.create( unit_amount=transaction.unit_amount, currency=transaction.currency, product=transaction.stripe_product_id, ) transaction.stripe_price_id = price.id await transaction.save() # Prepare payment_intent_data with conditional application_fee_amount payment_intent_data = { "on_behalf_of": transaction.to_stripe_account_id, "transfer_data": { "destination": transaction.to_stripe_account_id, }, } # Only add application_fee_amount if it's greater than 0 if transaction.application_fee_amount and transaction.application_fee_amount > 0: payment_intent_data["application_fee_amount"] = transaction.application_fee_amount session_params = { "payment_method_types": ["card"], "line_items": [ { "price": transaction.stripe_price_id, "quantity": 1, } ], "payment_intent_data": payment_intent_data, "mode": "payment", "success_url": "{}/projects".format(self.site_url_root), "cancel_url": "{}/projects".format(self.site_url_root), } session = stripe.checkout.Session.create(**session_params) if session: transaction.stripe_checkout_session_id = session.id transaction.stripe_checkout_session_url = session.url transaction.updated_time = datetime.now(timezone.utc) await transaction.save() return session.id, session.url else: return None, None async def fetch_payment_link(self, transaction_id: str) -> Optional[str]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction and transaction.stripe_payment_link: return transaction.stripe_payment_link return None async def fetch_checkout_session_id(self, transaction_id: str) -> Optional[str]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction and transaction.stripe_checkout_session_id: return transaction.stripe_checkout_session_id return None async def fetch_checkout_session_url(self, transaction_id: str) -> Optional[str]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction and transaction.stripe_checkout_session_url: return transaction.stripe_checkout_session_url return None async def invoke_checkout_session_webhook( self, event: dict ) -> Tuple[bool, Optional[str], Optional[str]]: # Handle the checkout.session.completed event if event["type"] == "checkout.session.completed": session = event["data"]["object"] transaction = await self.__fetch_transaction_by_session_id(session["id"]) if not transaction: await self.module_logger.log_error( error="Transaction not found for session_id: {}".format(session["id"]), properties={"session_id": session["id"]}, ) return False, None, None # Update transaction status transaction.status = TransactionStatus.COMPLETED transaction.updated_time = datetime.now(timezone.utc) await transaction.save() # Save payment method information payment_method_saved = False try: # Get the Stripe session to extract payment method details stripe_session = stripe.checkout.Session.retrieve(session["id"]) payment_intent_id = stripe_session.get('payment_intent') if payment_intent_id: payment_intent = stripe.PaymentIntent.retrieve(payment_intent_id) payment_method_id = payment_intent.get('payment_method') if payment_method_id: payment_method = stripe.PaymentMethod.retrieve(payment_method_id) card_details = payment_method.get('card', {}) # Get user email (use a fallback since we don't have access to user profile) user_email = f"user_{transaction.from_user}@freeleaps.com" # Get or create customer for the user customer_id = None try: # Search for existing customers by email customers = stripe.Customer.list(email=user_email, limit=1) if customers.data: customer_id = customers.data[0].id else: # Create new customer customer = stripe.Customer.create( email=user_email, metadata={"user_id": transaction.from_user} ) customer_id = customer.id except Exception as customer_error: # Use a fallback customer ID or skip payment method saving customer_id = None if customer_id: try: # Check if payment method is already attached to a customer payment_method_obj = stripe.PaymentMethod.retrieve(payment_method_id) if payment_method_obj.customer: # Use the existing customer ID customer_id = payment_method_obj.customer else: # Try to attach payment method to customer in Stripe stripe.PaymentMethod.attach( payment_method_id, customer=customer_id ) # Check if payment method already exists in our database from backend.infra.payment.models import StripePaymentMethodDoc existing_payment_method = await StripePaymentMethodDoc.find_one( StripePaymentMethodDoc.stripe_payment_method_id == payment_method_id ) if existing_payment_method: payment_method_saved = True else: # Save to our database only if it doesn't exist payment_method_doc = StripePaymentMethodDoc( user_id=transaction.from_user, stripe_customer_id=customer_id, stripe_payment_method_id=payment_method_id, card_last4=card_details.get('last4'), card_brand=card_details.get('brand'), card_exp_month=card_details.get('exp_month'), card_exp_year=card_details.get('exp_year'), created_time=datetime.now(timezone.utc), updated_time=datetime.now(timezone.utc), ) await payment_method_doc.save() payment_method_saved = True except stripe.error.InvalidRequestError as attach_error: if "already attached" in str(attach_error).lower(): # Check if payment method already exists in our database from backend.infra.payment.models import StripePaymentMethodDoc existing_payment_method = await StripePaymentMethodDoc.find_one( StripePaymentMethodDoc.stripe_payment_method_id == payment_method_id ) if existing_payment_method: payment_method_saved = True else: # Still save to our database since it's already attached payment_method_doc = StripePaymentMethodDoc( user_id=transaction.from_user, stripe_customer_id=customer_id, stripe_payment_method_id=payment_method_id, card_last4=card_details.get('last4'), card_brand=card_details.get('brand'), card_exp_month=card_details.get('exp_month'), card_exp_year=card_details.get('exp_year'), created_time=datetime.now(timezone.utc), updated_time=datetime.now(timezone.utc), ) await payment_method_doc.save() payment_method_saved = True elif "may not be used again" in str(attach_error).lower(): # Check if payment method already exists in our database from backend.infra.payment.models import StripePaymentMethodDoc existing_payment_method = await StripePaymentMethodDoc.find_one( StripePaymentMethodDoc.stripe_payment_method_id == payment_method_id ) if existing_payment_method: payment_method_saved = True else: # Save to our database even though it can't be attached payment_method_doc = StripePaymentMethodDoc( user_id=transaction.from_user, stripe_customer_id=customer_id, stripe_payment_method_id=payment_method_id, card_last4=card_details.get('last4'), card_brand=card_details.get('brand'), card_exp_month=card_details.get('exp_month'), card_exp_year=card_details.get('exp_year'), created_time=datetime.now(timezone.utc), updated_time=datetime.now(timezone.utc), ) await payment_method_doc.save() payment_method_saved = True except Exception as save_error: await self.module_logger.log_error( error=f"Error saving payment method to database: {save_error}", properties={"payment_method_id": payment_method_id, "user_id": transaction.from_user} ) except Exception as payment_method_error: await self.module_logger.log_error( error=f"Error processing payment method: {payment_method_error}", properties={"session_id": session["id"], "user_id": transaction.from_user} ) # Don't fail the webhook if payment method saving fails, but log it return True, transaction.project_id, transaction.milestone_index return False, None, None