# app/modules/billing/services/billing_service.py """ Billing service for subscription and payment operations. Provides: - Subscription status and usage queries (merchant-level) - Tier management - Invoice history - Add-on management - Stripe checkout and portal session management """ import logging from datetime import datetime from sqlalchemy.orm import Session from app.modules.billing.exceptions import ( BillingException, NoActiveSubscriptionException, PaymentSystemNotConfiguredException, StripePriceNotConfiguredException, SubscriptionNotCancelledException, TierNotFoundException, ) from app.modules.billing.models import ( AddOnProduct, BillingHistory, MerchantSubscription, StoreAddOn, SubscriptionTier, ) from app.modules.billing.services.stripe_service import stripe_service from app.modules.billing.services.subscription_service import subscription_service logger = logging.getLogger(__name__) class BillingService: """Service for billing operations.""" def get_subscription_with_tier( self, db: Session, merchant_id: int, platform_id: int ) -> tuple[MerchantSubscription, SubscriptionTier | None]: """ Get merchant subscription and its tier info. Returns: Tuple of (subscription, tier) where tier may be None """ subscription = subscription_service.get_or_create_subscription( db, merchant_id, platform_id ) return subscription, subscription.tier def get_available_tiers( self, db: Session, current_tier_id: int | None, platform_id: int | None = None ) -> tuple[list[dict], dict[str, int]]: """ Get all available tiers with upgrade/downgrade flags. Returns: Tuple of (tier_list, tier_order_map) """ tiers = subscription_service.get_all_tiers(db, platform_id=platform_id) tier_order = {t.code: t.display_order for t in tiers} current_order = 0 for t in tiers: if t.id == current_tier_id: current_order = t.display_order break tier_list = [] for tier in tiers: feature_codes = tier.get_feature_codes() tier_list.append({ "code": tier.code, "name": tier.name, "description": tier.description, "price_monthly_cents": tier.price_monthly_cents, "price_annual_cents": tier.price_annual_cents, "feature_codes": sorted(feature_codes), "is_current": tier.id == current_tier_id, "can_upgrade": tier.display_order > current_order, "can_downgrade": tier.display_order < current_order, }) return tier_list, tier_order def get_tier_by_code( self, db: Session, tier_code: str, platform_id: int | None = None ) -> SubscriptionTier: """ Get a tier by its code, optionally scoped to a platform. Raises: TierNotFoundException: If tier doesn't exist """ query = db.query(SubscriptionTier).filter( SubscriptionTier.code == tier_code, SubscriptionTier.is_active == True, # noqa: E712 ) if platform_id is not None: query = query.filter(SubscriptionTier.platform_id == platform_id) tier = query.first() if not tier: raise TierNotFoundException(tier_code) return tier def create_checkout_session( self, db: Session, merchant_id: int, platform_id: int, tier_code: str, is_annual: bool, success_url: str, cancel_url: str, ) -> dict: """ Create a Stripe checkout session for a merchant subscription. Returns: Dict with checkout_url and session_id Raises: PaymentSystemNotConfiguredException: If Stripe not configured TierNotFoundException: If tier doesn't exist StripePriceNotConfiguredException: If price not configured """ if not stripe_service.is_configured: raise PaymentSystemNotConfiguredException() tier = self.get_tier_by_code(db, tier_code, platform_id=platform_id) price_id = ( tier.stripe_price_annual_id if is_annual and tier.stripe_price_annual_id else tier.stripe_price_monthly_id ) if not price_id: raise StripePriceNotConfiguredException(tier_code) # Check if this is a new subscription (for trial) existing_sub = subscription_service.get_merchant_subscription( db, merchant_id, platform_id ) trial_days = None if not existing_sub or not existing_sub.stripe_subscription_id: from app.core.config import settings trial_days = settings.stripe_trial_days # Get merchant for Stripe customer creation from app.modules.tenancy.models import Merchant merchant = db.query(Merchant).filter(Merchant.id == merchant_id).first() session = stripe_service.create_checkout_session( db=db, store=merchant, # Stripe service uses store for customer creation price_id=price_id, success_url=success_url, cancel_url=cancel_url, trial_days=trial_days, ) # Update subscription with tier info subscription = subscription_service.get_or_create_subscription( db, merchant_id, platform_id ) subscription.tier_id = tier.id subscription.is_annual = is_annual return { "checkout_url": session.url, "session_id": session.id, } def create_portal_session( self, db: Session, merchant_id: int, platform_id: int, return_url: str ) -> dict: """ Create a Stripe customer portal session. Returns: Dict with portal_url Raises: PaymentSystemNotConfiguredException: If Stripe not configured NoActiveSubscriptionException: If no subscription with customer ID """ if not stripe_service.is_configured: raise PaymentSystemNotConfiguredException() subscription = subscription_service.get_merchant_subscription( db, merchant_id, platform_id ) if not subscription or not subscription.stripe_customer_id: raise NoActiveSubscriptionException() session = stripe_service.create_portal_session( customer_id=subscription.stripe_customer_id, return_url=return_url, ) return {"portal_url": session.url} def get_invoices( self, db: Session, merchant_id: int, skip: int = 0, limit: int = 20 ) -> tuple[list[BillingHistory], int]: """ Get invoice history for a merchant. Returns: Tuple of (invoices, total_count) """ query = db.query(BillingHistory).filter( BillingHistory.merchant_id == merchant_id ) total = query.count() invoices = ( query.order_by(BillingHistory.invoice_date.desc()) .offset(skip) .limit(limit) .all() ) return invoices, total def get_available_addons( self, db: Session, category: str | None = None ) -> list[AddOnProduct]: """Get available add-on products.""" query = db.query(AddOnProduct).filter(AddOnProduct.is_active == True) # noqa: E712 if category: query = query.filter(AddOnProduct.category == category) return query.order_by(AddOnProduct.display_order).all() def get_store_addons(self, db: Session, store_id: int) -> list[StoreAddOn]: """Get store's purchased add-ons.""" return ( db.query(StoreAddOn) .filter(StoreAddOn.store_id == store_id) .all() ) def cancel_subscription( self, db: Session, merchant_id: int, platform_id: int, reason: str | None, immediately: bool, ) -> dict: """ Cancel a subscription. Returns: Dict with message and effective_date Raises: NoActiveSubscriptionException: If no subscription to cancel """ subscription = subscription_service.get_merchant_subscription( db, merchant_id, platform_id ) if not subscription or not subscription.stripe_subscription_id: raise NoActiveSubscriptionException() if stripe_service.is_configured: stripe_service.cancel_subscription( subscription_id=subscription.stripe_subscription_id, immediately=immediately, cancellation_reason=reason, ) subscription.cancelled_at = datetime.utcnow() subscription.cancellation_reason = reason effective_date = ( datetime.utcnow().isoformat() if immediately else subscription.period_end.isoformat() if subscription.period_end else datetime.utcnow().isoformat() ) return { "message": "Subscription cancelled successfully", "effective_date": effective_date, } def reactivate_subscription( self, db: Session, merchant_id: int, platform_id: int ) -> dict: """ Reactivate a cancelled subscription. Returns: Dict with success message Raises: NoActiveSubscriptionException: If no subscription SubscriptionNotCancelledException: If not cancelled """ subscription = subscription_service.get_merchant_subscription( db, merchant_id, platform_id ) if not subscription or not subscription.stripe_subscription_id: raise NoActiveSubscriptionException() if not subscription.cancelled_at: raise SubscriptionNotCancelledException() if stripe_service.is_configured: stripe_service.reactivate_subscription(subscription.stripe_subscription_id) subscription.cancelled_at = None subscription.cancellation_reason = None return {"message": "Subscription reactivated successfully"} def get_upcoming_invoice( self, db: Session, merchant_id: int, platform_id: int ) -> dict: """ Get upcoming invoice preview. Returns: Dict with amount_due_cents, currency, next_payment_date, line_items Raises: NoActiveSubscriptionException: If no subscription with customer ID """ subscription = subscription_service.get_merchant_subscription( db, merchant_id, platform_id ) if not subscription or not subscription.stripe_customer_id: raise NoActiveSubscriptionException() if not stripe_service.is_configured: return { "amount_due_cents": 0, "currency": "EUR", "next_payment_date": None, "line_items": [], } invoice = stripe_service.get_upcoming_invoice(subscription.stripe_customer_id) if not invoice: return { "amount_due_cents": 0, "currency": "EUR", "next_payment_date": None, "line_items": [], } line_items = [] if invoice.lines and invoice.lines.data: for line in invoice.lines.data: line_items.append({ "description": line.description or "", "amount_cents": line.amount, "quantity": line.quantity or 1, }) return { "amount_due_cents": invoice.amount_due, "currency": invoice.currency.upper(), "next_payment_date": datetime.fromtimestamp(invoice.next_payment_attempt).isoformat() if invoice.next_payment_attempt else None, "line_items": line_items, } def change_tier( self, db: Session, merchant_id: int, platform_id: int, new_tier_code: str, is_annual: bool, ) -> dict: """ Change subscription tier (upgrade/downgrade). Returns: Dict with message, new_tier, effective_immediately Raises: TierNotFoundException: If tier doesn't exist NoActiveSubscriptionException: If no subscription StripePriceNotConfiguredException: If price not configured """ subscription = subscription_service.get_merchant_subscription( db, merchant_id, platform_id ) if not subscription or not subscription.stripe_subscription_id: raise NoActiveSubscriptionException() tier = self.get_tier_by_code(db, new_tier_code, platform_id=platform_id) price_id = ( tier.stripe_price_annual_id if is_annual and tier.stripe_price_annual_id else tier.stripe_price_monthly_id ) if not price_id: raise StripePriceNotConfiguredException(new_tier_code) # Update in Stripe if stripe_service.is_configured: stripe_service.update_subscription( subscription_id=subscription.stripe_subscription_id, new_price_id=price_id, ) # Update local subscription old_tier_id = subscription.tier_id subscription.tier_id = tier.id subscription.is_annual = is_annual subscription.updated_at = datetime.utcnow() is_upgrade = self._is_upgrade(db, old_tier_id, tier.id) return { "message": f"Subscription {'upgraded' if is_upgrade else 'changed'} to {tier.name}", "new_tier": new_tier_code, "effective_immediately": True, } def _is_upgrade(self, db: Session, old_tier_id: int | None, new_tier_id: int | None) -> bool: """Check if tier change is an upgrade based on display_order.""" if not old_tier_id or not new_tier_id: return False old = db.query(SubscriptionTier).filter(SubscriptionTier.id == old_tier_id).first() new = db.query(SubscriptionTier).filter(SubscriptionTier.id == new_tier_id).first() if not old or not new: return False return new.display_order > old.display_order def purchase_addon( self, db: Session, store_id: int, addon_code: str, domain_name: str | None, quantity: int, success_url: str, cancel_url: str, ) -> dict: """ Create checkout session for add-on purchase. Returns: Dict with checkout_url and session_id Raises: PaymentSystemNotConfiguredException: If Stripe not configured BillingException: If addon doesn't exist """ if not stripe_service.is_configured: raise PaymentSystemNotConfiguredException() addon = ( db.query(AddOnProduct) .filter( AddOnProduct.code == addon_code, AddOnProduct.is_active == True, # noqa: E712 ) .first() ) if not addon: raise BillingException(f"Add-on '{addon_code}' not found") if not addon.stripe_price_id: raise BillingException(f"Stripe price not configured for add-on '{addon_code}'") from app.modules.tenancy.models import Store store = db.query(Store).filter(Store.id == store_id).first() session = stripe_service.create_checkout_session( db=db, store=store, price_id=addon.stripe_price_id, success_url=success_url, cancel_url=cancel_url, quantity=quantity, metadata={ "addon_code": addon_code, "domain_name": domain_name or "", }, ) return { "checkout_url": session.url, "session_id": session.id, } def cancel_addon(self, db: Session, store_id: int, addon_id: int) -> dict: """ Cancel a purchased add-on. Returns: Dict with message and addon_code Raises: BillingException: If addon not found or not owned by store """ store_addon = ( db.query(StoreAddOn) .filter( StoreAddOn.id == addon_id, StoreAddOn.store_id == store_id, ) .first() ) if not store_addon: raise BillingException("Add-on not found") addon_code = store_addon.addon_product.code # Cancel in Stripe if applicable if stripe_service.is_configured and store_addon.stripe_subscription_item_id: try: stripe_service.cancel_subscription_item(store_addon.stripe_subscription_item_id) except Exception as e: # noqa: EXC003 logger.warning(f"Failed to cancel addon in Stripe: {e}") # Mark as cancelled store_addon.status = "cancelled" store_addon.cancelled_at = datetime.utcnow() return { "message": "Add-on cancelled successfully", "addon_code": addon_code, } # Create service instance billing_service = BillingService()