# app/modules/analytics/services/usage_service.py """ Usage and limits service. This is the canonical location for the usage service. Provides methods for: - Getting current usage vs limits - Calculating upgrade recommendations - Checking limits before actions """ import logging from dataclasses import dataclass from sqlalchemy import func from sqlalchemy.orm import Session from app.modules.catalog.models import Product from app.modules.billing.models import SubscriptionTier, VendorSubscription from models.database.vendor import VendorUser logger = logging.getLogger(__name__) @dataclass class UsageMetricData: """Usage metric data.""" name: str current: int limit: int | None percentage: float is_unlimited: bool is_at_limit: bool is_approaching_limit: bool @dataclass class TierInfoData: """Tier information.""" code: str name: str price_monthly_cents: int is_highest_tier: bool @dataclass class UpgradeTierData: """Upgrade tier information.""" code: str name: str price_monthly_cents: int price_increase_cents: int benefits: list[str] @dataclass class UsageData: """Full usage data.""" tier: TierInfoData usage: list[UsageMetricData] has_limits_approaching: bool has_limits_reached: bool upgrade_available: bool upgrade_tier: UpgradeTierData | None upgrade_reasons: list[str] @dataclass class LimitCheckData: """Limit check result.""" limit_type: str can_proceed: bool current: int limit: int | None percentage: float message: str | None upgrade_tier_code: str | None upgrade_tier_name: str | None class UsageService: """Service for usage and limits management.""" def get_vendor_usage(self, db: Session, vendor_id: int) -> UsageData: """ Get comprehensive usage data for a vendor. Returns current usage, limits, and upgrade recommendations. """ from app.services.subscription_service import subscription_service # Get subscription subscription = subscription_service.get_or_create_subscription(db, vendor_id) # Get current tier tier = self._get_tier(db, subscription) # Calculate usage metrics usage_metrics = self._calculate_usage_metrics(db, vendor_id, subscription) # Check for approaching/reached limits has_limits_approaching = any(m.is_approaching_limit for m in usage_metrics) has_limits_reached = any(m.is_at_limit for m in usage_metrics) # Get upgrade info next_tier = self._get_next_tier(db, tier) is_highest_tier = next_tier is None # Build upgrade info upgrade_tier_info = None upgrade_reasons = [] if next_tier: upgrade_tier_info = self._build_upgrade_tier_info(tier, next_tier) upgrade_reasons = self._build_upgrade_reasons( usage_metrics, has_limits_reached, has_limits_approaching ) return UsageData( tier=TierInfoData( code=tier.code if tier else subscription.tier, name=tier.name if tier else subscription.tier.title(), price_monthly_cents=tier.price_monthly_cents if tier else 0, is_highest_tier=is_highest_tier, ), usage=usage_metrics, has_limits_approaching=has_limits_approaching, has_limits_reached=has_limits_reached, upgrade_available=not is_highest_tier, upgrade_tier=upgrade_tier_info, upgrade_reasons=upgrade_reasons, ) def check_limit( self, db: Session, vendor_id: int, limit_type: str ) -> LimitCheckData: """ Check a specific limit before performing an action. Args: db: Database session vendor_id: Vendor ID limit_type: One of "orders", "products", "team_members" Returns: LimitCheckData with proceed status and upgrade info """ from app.services.subscription_service import subscription_service if limit_type == "orders": can_proceed, message = subscription_service.can_create_order(db, vendor_id) subscription = subscription_service.get_subscription(db, vendor_id) current = subscription.orders_this_period if subscription else 0 limit = subscription.orders_limit if subscription else 0 elif limit_type == "products": can_proceed, message = subscription_service.can_add_product(db, vendor_id) subscription = subscription_service.get_subscription(db, vendor_id) current = self._get_product_count(db, vendor_id) limit = subscription.products_limit if subscription else 0 elif limit_type == "team_members": can_proceed, message = subscription_service.can_add_team_member(db, vendor_id) subscription = subscription_service.get_subscription(db, vendor_id) current = self._get_team_member_count(db, vendor_id) limit = subscription.team_members_limit if subscription else 0 else: return LimitCheckData( limit_type=limit_type, can_proceed=True, current=0, limit=None, percentage=0, message=f"Unknown limit type: {limit_type}", upgrade_tier_code=None, upgrade_tier_name=None, ) # Calculate percentage is_unlimited = limit is None or limit < 0 percentage = 0 if is_unlimited else (current / limit * 100 if limit > 0 else 100) # Get upgrade info if at limit upgrade_tier_code = None upgrade_tier_name = None if not can_proceed: subscription = subscription_service.get_subscription(db, vendor_id) current_tier = subscription.tier_obj if subscription else None if current_tier: next_tier = self._get_next_tier(db, current_tier) if next_tier: upgrade_tier_code = next_tier.code upgrade_tier_name = next_tier.name return LimitCheckData( limit_type=limit_type, can_proceed=can_proceed, current=current, limit=None if is_unlimited else limit, percentage=percentage, message=message, upgrade_tier_code=upgrade_tier_code, upgrade_tier_name=upgrade_tier_name, ) # ========================================================================= # Private Helper Methods # ========================================================================= def _get_tier( self, db: Session, subscription: VendorSubscription ) -> SubscriptionTier | None: """Get tier from subscription or query by code.""" tier = subscription.tier_obj if not tier: tier = ( db.query(SubscriptionTier) .filter(SubscriptionTier.code == subscription.tier) .first() ) return tier def _get_product_count(self, db: Session, vendor_id: int) -> int: """Get product count for vendor.""" return ( db.query(func.count(Product.id)) .filter(Product.vendor_id == vendor_id) .scalar() or 0 ) def _get_team_member_count(self, db: Session, vendor_id: int) -> int: """Get active team member count for vendor.""" return ( db.query(func.count(VendorUser.id)) .filter(VendorUser.vendor_id == vendor_id, VendorUser.is_active == True) # noqa: E712 .scalar() or 0 ) def _calculate_usage_metrics( self, db: Session, vendor_id: int, subscription: VendorSubscription ) -> list[UsageMetricData]: """Calculate all usage metrics for a vendor.""" metrics = [] # Orders this period orders_current = subscription.orders_this_period or 0 orders_limit = subscription.orders_limit orders_unlimited = orders_limit is None or orders_limit < 0 orders_percentage = ( 0 if orders_unlimited else (orders_current / orders_limit * 100 if orders_limit > 0 else 100) ) metrics.append( UsageMetricData( name="orders", current=orders_current, limit=None if orders_unlimited else orders_limit, percentage=orders_percentage, is_unlimited=orders_unlimited, is_at_limit=not orders_unlimited and orders_current >= orders_limit, is_approaching_limit=not orders_unlimited and orders_percentage >= 80, ) ) # Products products_count = self._get_product_count(db, vendor_id) products_limit = subscription.products_limit products_unlimited = products_limit is None or products_limit < 0 products_percentage = ( 0 if products_unlimited else (products_count / products_limit * 100 if products_limit > 0 else 100) ) metrics.append( UsageMetricData( name="products", current=products_count, limit=None if products_unlimited else products_limit, percentage=products_percentage, is_unlimited=products_unlimited, is_at_limit=not products_unlimited and products_count >= products_limit, is_approaching_limit=not products_unlimited and products_percentage >= 80, ) ) # Team members team_count = self._get_team_member_count(db, vendor_id) team_limit = subscription.team_members_limit team_unlimited = team_limit is None or team_limit < 0 team_percentage = ( 0 if team_unlimited else (team_count / team_limit * 100 if team_limit > 0 else 100) ) metrics.append( UsageMetricData( name="team_members", current=team_count, limit=None if team_unlimited else team_limit, percentage=team_percentage, is_unlimited=team_unlimited, is_at_limit=not team_unlimited and team_count >= team_limit, is_approaching_limit=not team_unlimited and team_percentage >= 80, ) ) return metrics def _get_next_tier( self, db: Session, current_tier: SubscriptionTier | None ) -> SubscriptionTier | None: """Get next tier for upgrade.""" current_tier_order = current_tier.display_order if current_tier else 0 return ( db.query(SubscriptionTier) .filter( SubscriptionTier.is_active == True, # noqa: E712 SubscriptionTier.display_order > current_tier_order, ) .order_by(SubscriptionTier.display_order) .first() ) def _build_upgrade_tier_info( self, current_tier: SubscriptionTier | None, next_tier: SubscriptionTier ) -> UpgradeTierData: """Build upgrade tier information with benefits.""" benefits = [] # Numeric limit benefits if next_tier.orders_per_month and ( not current_tier or ( current_tier.orders_per_month and next_tier.orders_per_month > current_tier.orders_per_month ) ): if next_tier.orders_per_month < 0: benefits.append("Unlimited orders per month") else: benefits.append(f"{next_tier.orders_per_month:,} orders/month") if next_tier.products_limit and ( not current_tier or ( current_tier.products_limit and next_tier.products_limit > current_tier.products_limit ) ): if next_tier.products_limit < 0: benefits.append("Unlimited products") else: benefits.append(f"{next_tier.products_limit:,} products") if next_tier.team_members and ( not current_tier or ( current_tier.team_members and next_tier.team_members > current_tier.team_members ) ): if next_tier.team_members < 0: benefits.append("Unlimited team members") else: benefits.append(f"{next_tier.team_members} team members") # Feature benefits current_features = ( set(current_tier.features) if current_tier and current_tier.features else set() ) next_features = set(next_tier.features) if next_tier.features else set() new_features = next_features - current_features feature_names = { "analytics_dashboard": "Advanced Analytics", "api_access": "API Access", "automation_rules": "Automation Rules", "team_roles": "Team Roles & Permissions", "custom_domain": "Custom Domain", "webhooks": "Webhooks", "accounting_export": "Accounting Export", } for feature in list(new_features)[:3]: if feature in feature_names: benefits.append(feature_names[feature]) current_price = current_tier.price_monthly_cents if current_tier else 0 return UpgradeTierData( code=next_tier.code, name=next_tier.name, price_monthly_cents=next_tier.price_monthly_cents, price_increase_cents=next_tier.price_monthly_cents - current_price, benefits=benefits, ) def _build_upgrade_reasons( self, usage_metrics: list[UsageMetricData], has_limits_reached: bool, has_limits_approaching: bool, ) -> list[str]: """Build upgrade reasons based on usage.""" reasons = [] if has_limits_reached: for m in usage_metrics: if m.is_at_limit: reasons.append(f"You've reached your {m.name.replace('_', ' ')} limit") elif has_limits_approaching: for m in usage_metrics: if m.is_approaching_limit: reasons.append( f"You're approaching your {m.name.replace('_', ' ')} limit ({int(m.percentage)}%)" ) return reasons # Singleton instance usage_service = UsageService() __all__ = [ "usage_service", "UsageService", "UsageData", "UsageMetricData", "TierInfoData", "UpgradeTierData", "LimitCheckData", ]