from datetime import datetime, timezone from typing import Dict, Optional, Tuple from backend.infra.payment.models import StripeTransactionDoc from backend.infra.payment.constants import TransactionStatus from common.config.app_settings import app_settings import stripe from stripe.error import SignatureVerificationError from common.log.module_logger import ModuleLogger from decimal import Decimal import json import httpx stripe.api_key = app_settings.STRIPE_API_KEY class StripeManager: def __init__(self) -> None: self.site_url_root = app_settings.SITE_URL_ROOT.rstrip("/") self.module_logger = ModuleLogger(sender_id="StripeManager") async def create_stripe_account(self) -> Optional[str]: account = stripe.Account.create(type="standard") return account.id async def create_account_link(self, account_id: str, link_type: str = "account_onboarding") -> Optional[str]: account = stripe.Account.retrieve(account_id) # For account_update, try to show dashboard if TOS is accepted self.module_logger.log_info("create_account_link urls", { "redirect_url": "{}/work".format(self.site_url_root), "refresh_url": "{}/front-door".format(self.site_url_root), "return_url": "{}/work".format(self.site_url_root) } ) if link_type == "account_update" and account.tos_acceptance.date: login_link = stripe.Account.create_login_link( account_id, redirect_url="{}/work".format(self.site_url_root) ) return login_link.url # Otherwise show onboarding account_link = stripe.AccountLink.create( account=account_id, refresh_url="{}/front-door".format(self.site_url_root), return_url="{}/work".format(self.site_url_root), type="account_onboarding", ) return account_link.url async def can_account_receive_payments(self, account_id: str) -> bool: account = stripe.Account.retrieve(account_id) if account.capabilities and account.capabilities["transfers"] == "active": return True else: return False async def __fetch_transaction_by_id( self, transaction_id: str ) -> Optional[StripeTransactionDoc]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction: return transaction return None async def fetch_transaction_by_id( self, transaction_id: str ) -> Optional[Dict[str, any]]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction: return transaction.model_dump() return None async def __fetch_transaction_by_session_id( self, session_id: str ) -> Optional[StripeTransactionDoc]: transactions = await StripeTransactionDoc.find( StripeTransactionDoc.stripe_checkout_session_id == session_id ).to_list() if len(transactions) > 1: await self.module_logger.log_error( error="More than one transaction found for session_id: {}".format( session_id ), properties={"session_id": session_id}, ) elif len(transactions) == 0: await self.module_logger.log_error( error="No transaction found for session_id: {}".format(session_id), properties={"session_id": session_id}, ) return None return transactions[0] async def fetch_transaction_by_session_id( self, session_id: str ) -> Optional[Dict[str, any]]: transaction = await StripeTransactionDoc.find( StripeTransactionDoc.stripe_checkout_session_id == session_id ).to_list() if len(transaction) > 1: await self.module_logger.log_error( error="More than one transaction found for session_id: {}".format( session_id ), properties={"session_id": session_id}, ) elif len(transaction) == 0: return None return transaction[0].model_dump() async def fetch_stripe_transaction_for_milestone( self, project_id: str, milestone_index: int ) -> Optional[Dict[str, any]]: transaction = await StripeTransactionDoc.find( StripeTransactionDoc.project_id == project_id, StripeTransactionDoc.milestone_index == milestone_index, ).to_list() if len(transaction) > 1: await self.module_logger.log_error( error="More than one transaction found for project_id: {} and milestone_index: {}".format( project_id, milestone_index ), properties={ "project_id": project_id, "milestone_index": milestone_index, }, ) elif len(transaction) == 0: return None return transaction[0].model_dump() async def create_stripe_transaction_for_milestone( self, project_id: str, milestone_index: int, currency: str, expected_payment: Decimal, from_user: str, to_user: str, to_stripe_account_id: str, ) -> Optional[str]: transactions = await StripeTransactionDoc.find( StripeTransactionDoc.project_id == project_id, StripeTransactionDoc.milestone_index == milestone_index, ).to_list() if len(transactions) == 0: transaction_doc = StripeTransactionDoc( project_id=project_id, milestone_index=milestone_index, currency=currency, unit_amount=int(expected_payment * 100), from_user=from_user, to_user=to_user, to_stripe_account_id=to_stripe_account_id, created_time=datetime.now(timezone.utc), updated_time=datetime.now(timezone.utc), status=TransactionStatus.PENDING, ) transaction = await transaction_doc.create() return transaction.id else: await self.module_logger.log_error( error="Transaction already exists for project_id: {} and milestone_index: {}".format( project_id, milestone_index ), properties={ "project_id": project_id, "milestone_index": milestone_index, }, ) res = transactions[0].id return res async def create_payment_link(self, transaction_id: str) -> Optional[str]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction: if transaction.stripe_payment_link: return transaction.stripe_payment_link if not transaction.stripe_product_id: product = stripe.Product.create( name="{}-{}".format( transaction.project_id, transaction.milestone_index ) ) transaction.stripe_product_id = product.id await transaction.save() if not transaction.stripe_price_id: price = stripe.Price.create( unit_amount=transaction.unit_amount, currency=transaction.currency, product=transaction.stripe_product_id, ) transaction.stripe_price_id = price.id await transaction.save() # Prepare payment link parameters with conditional application_fee_amount payment_link_params = { "line_items": [ { "price": transaction.stripe_price_id, "quantity": 1, } ], "on_behalf_of": transaction.to_stripe_account_id, "transfer_data": { "destination": transaction.to_stripe_account_id, }, } # Only add application_fee_amount if it's greater than 0 if transaction.application_fee_amount and transaction.application_fee_amount > 0: payment_link_params["application_fee_amount"] = transaction.application_fee_amount payment_link = stripe.PaymentLink.create(**payment_link_params) if payment_link: transaction.stripe_payment_link = payment_link.url transaction.updated_time = datetime.now(timezone.utc) await transaction.save() return payment_link.url else: return None async def create_checkout_session( self, transaction_id: str ) -> Tuple[Optional[str], Optional[str]]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction: if transaction.stripe_checkout_session_id: session = stripe.checkout.Session.retrieve( transaction.stripe_checkout_session_id ) expires_at_timestamp = session.expires_at expires_at_utc = datetime.fromtimestamp( expires_at_timestamp, tz=timezone.utc ) if datetime.now(timezone.utc) < expires_at_utc: return ( transaction.stripe_checkout_session_id, transaction.stripe_checkout_session_url, ) # Check connected account capabilities connected_account = stripe.Account.retrieve( transaction.to_stripe_account_id ) # if ( # connected_account.capabilities.get("card_payments") != "active" # or connected_account.capabilities.get("transfers") != "active" # ): # raise Exception( # f"Connected account {transaction.to_stripe_account_id} lacks required capabilities." # ) if not transaction.stripe_product_id: product = stripe.Product.create( name="{}-{}".format( transaction.project_id, transaction.milestone_index ) ) transaction.stripe_product_id = product.id await transaction.save() if not transaction.stripe_price_id: price = stripe.Price.create( unit_amount=transaction.unit_amount, currency=transaction.currency, product=transaction.stripe_product_id, ) transaction.stripe_price_id = price.id await transaction.save() # Prepare payment_intent_data with conditional application_fee_amount payment_intent_data = { "on_behalf_of": transaction.to_stripe_account_id, "transfer_data": { "destination": transaction.to_stripe_account_id, }, } # Only add application_fee_amount if it's greater than 0 if transaction.application_fee_amount and transaction.application_fee_amount > 0: payment_intent_data["application_fee_amount"] = transaction.application_fee_amount session_params = { "payment_method_types": ["card"], "line_items": [ { "price": transaction.stripe_price_id, "quantity": 1, } ], "payment_intent_data": payment_intent_data, "mode": "payment", "success_url": "{}/projects".format(self.site_url_root), "cancel_url": "{}/projects".format(self.site_url_root), } session = stripe.checkout.Session.create(**session_params) if session: transaction.stripe_checkout_session_id = session.id transaction.stripe_checkout_session_url = session.url transaction.updated_time = datetime.now(timezone.utc) await transaction.save() return session.id, session.url else: return None, None async def fetch_payment_link(self, transaction_id: str) -> Optional[str]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction and transaction.stripe_payment_link: return transaction.stripe_payment_link return None async def fetch_checkout_session_id(self, transaction_id: str) -> Optional[str]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction and transaction.stripe_checkout_session_id: return transaction.stripe_checkout_session_id return None async def fetch_checkout_session_url(self, transaction_id: str) -> Optional[str]: transaction = await StripeTransactionDoc.get(transaction_id) if transaction and transaction.stripe_checkout_session_url: return transaction.stripe_checkout_session_url return None async def invoke_checkout_session_webhook( self, event: dict ) -> Tuple[bool, Optional[str], Optional[str]]: """ Handle checkout.session.completed webhook events from Stripe. Updates transaction status and saves payment method information for future use. """ if event["type"] == "checkout.session.completed": session = event["data"]["object"] # Find and validate the transaction transaction = await self.__fetch_transaction_by_session_id(session["id"]) if not transaction: await self.module_logger.log_error( error="Transaction not found for session_id: {}".format(session["id"]), properties={"session_id": session["id"]}, ) return False, None, None # Update transaction status to completed await self.__update_transaction_status(transaction) # Process and save payment method information await self.__process_payment_method(session, transaction) return True, transaction.project_id, transaction.milestone_index return False, None, None async def __update_transaction_status(self, transaction: StripeTransactionDoc) -> None: """ Update transaction status to completed and save to database. """ transaction.status = TransactionStatus.COMPLETED transaction.updated_time = datetime.now(timezone.utc) await transaction.save() async def __process_payment_method(self, session: dict, transaction: StripeTransactionDoc) -> None: """ Extract payment method details from Stripe session and save to database. Creates or finds customer and attaches payment method for future use. """ try: # Get payment method details from Stripe payment_method_info = await self.__extract_payment_method_info(session) if not payment_method_info: return payment_method_id, card_details = payment_method_info # Get or create Stripe customer for the user customer_id = await self.__get_or_create_customer(transaction.from_user) if not customer_id: return # Attach payment method to customer and save to database await self.__attach_and_save_payment_method( payment_method_id, card_details, customer_id, transaction.from_user ) except Exception as payment_method_error: await self.module_logger.log_error( error=f"Error processing payment method: {payment_method_error}", properties={"session_id": session["id"], "user_id": transaction.from_user} ) async def __extract_payment_method_info(self, session: dict) -> Optional[Tuple[str, dict]]: """ Extract payment method ID and card details from Stripe session. Returns tuple of (payment_method_id, card_details) or None if not found. """ try: # Get the Stripe session to extract payment method details stripe_session = stripe.checkout.Session.retrieve(session["id"]) payment_intent_id = stripe_session.get('payment_intent') if not payment_intent_id: return None payment_intent = stripe.PaymentIntent.retrieve(payment_intent_id) payment_method_id = payment_intent.get('payment_method') if not payment_method_id: return None payment_method = stripe.PaymentMethod.retrieve(payment_method_id) card_details = payment_method.get('card', {}) return payment_method_id, card_details except Exception as e: await self.module_logger.log_error( error=f"Error extracting payment method info: {e}", properties={"session_id": session["id"]} ) return None async def __get_or_create_customer(self, user_id: str) -> Optional[str]: """ Find existing Stripe customer by email or create new one. Returns customer ID or None if creation fails. """ try: # Generate email for user (fallback since we don't have access to user profile) user_email = f"user_{user_id}@freeleaps.com" # Search for existing customers by email customers = stripe.Customer.list(email=user_email, limit=1) if customers.data: return customers.data[0].id # Create new customer if not found customer = stripe.Customer.create( email=user_email, metadata={"user_id": user_id} ) return customer.id except Exception as customer_error: await self.module_logger.log_error( error=f"Error getting/creating customer: {customer_error}", properties={"user_id": user_id} ) return None async def __attach_and_save_payment_method( self, payment_method_id: str, card_details: dict, customer_id: str, user_id: str ) -> None: """ Attach payment method to Stripe customer and save details to database. Handles various error scenarios gracefully. """ try: # Check if payment method is already attached to a customer payment_method_obj = stripe.PaymentMethod.retrieve(payment_method_id) if payment_method_obj.customer: # Use the existing customer ID customer_id = payment_method_obj.customer else: # Try to attach payment method to customer in Stripe stripe.PaymentMethod.attach( payment_method_id, customer=customer_id ) # Save to database await self.__save_payment_method_to_db( payment_method_id, card_details, customer_id, user_id ) except stripe.error.InvalidRequestError as attach_error: # Handle specific Stripe attachment errors await self.__handle_attachment_error( attach_error, payment_method_id, card_details, customer_id, user_id ) except Exception as save_error: await self.module_logger.log_error( error=f"Error attaching payment method: {save_error}", properties={"payment_method_id": payment_method_id, "user_id": user_id} ) async def __save_payment_method_to_db( self, payment_method_id: str, card_details: dict, customer_id: str, user_id: str ) -> None: """ Save payment method details to database if it doesn't already exist. """ from backend.infra.payment.models import StripePaymentMethodDoc # Check if payment method already exists in our database existing_payment_method = await StripePaymentMethodDoc.find_one( StripePaymentMethodDoc.stripe_payment_method_id == payment_method_id ) if existing_payment_method: return # Already saved # Save to our database payment_method_doc = StripePaymentMethodDoc( user_id=user_id, stripe_customer_id=customer_id, stripe_payment_method_id=payment_method_id, card_last4=card_details.get('last4'), card_brand=card_details.get('brand'), card_exp_month=card_details.get('exp_month'), card_exp_year=card_details.get('exp_year'), created_time=datetime.now(timezone.utc), updated_time=datetime.now(timezone.utc), ) await payment_method_doc.save() async def __handle_attachment_error( self, attach_error: stripe.error.InvalidRequestError, payment_method_id: str, card_details: dict, customer_id: str, user_id: str ) -> None: """ Handle specific Stripe attachment errors and still save to database when possible. """ error_message = str(attach_error).lower() if "already attached" in error_message or "may not be used again" in error_message: # Payment method can't be attached but we can still save to database await self.__save_payment_method_to_db( payment_method_id, card_details, customer_id, user_id ) else: # Log other attachment errors await self.module_logger.log_error( error=f"Error attaching payment method: {attach_error}", properties={"payment_method_id": payment_method_id, "user_id": user_id} )