diff --git a/alembic/versions/l0a1b2c3d4e5_add_capacity_snapshots_table.py b/alembic/versions/l0a1b2c3d4e5_add_capacity_snapshots_table.py new file mode 100644 index 00000000..a87beb08 --- /dev/null +++ b/alembic/versions/l0a1b2c3d4e5_add_capacity_snapshots_table.py @@ -0,0 +1,65 @@ +"""Add capacity_snapshots table + +Revision ID: l0a1b2c3d4e5 +Revises: k9f0a1b2c3d4 +Create Date: 2025-12-26 + +Adds table for tracking daily platform capacity metrics for growth forecasting. +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "l0a1b2c3d4e5" +down_revision = "k9f0a1b2c3d4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "capacity_snapshots", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("snapshot_date", sa.DateTime(timezone=True), nullable=False), + # Vendor metrics + sa.Column("total_vendors", sa.Integer(), nullable=False, server_default="0"), + sa.Column("active_vendors", sa.Integer(), nullable=False, server_default="0"), + sa.Column("trial_vendors", sa.Integer(), nullable=False, server_default="0"), + # Subscription metrics + sa.Column("total_subscriptions", sa.Integer(), nullable=False, server_default="0"), + sa.Column("active_subscriptions", sa.Integer(), nullable=False, server_default="0"), + # Resource metrics + sa.Column("total_products", sa.Integer(), nullable=False, server_default="0"), + sa.Column("total_orders_month", sa.Integer(), nullable=False, server_default="0"), + sa.Column("total_team_members", sa.Integer(), nullable=False, server_default="0"), + # Storage metrics + sa.Column("storage_used_gb", sa.Numeric(10, 2), nullable=False, server_default="0"), + sa.Column("db_size_mb", sa.Numeric(10, 2), nullable=False, server_default="0"), + # Capacity metrics + sa.Column("theoretical_products_limit", sa.Integer(), nullable=True), + sa.Column("theoretical_orders_limit", sa.Integer(), nullable=True), + sa.Column("theoretical_team_limit", sa.Integer(), nullable=True), + # Tier distribution + sa.Column("tier_distribution", sa.JSON(), nullable=True), + # Performance metrics + sa.Column("avg_response_ms", sa.Integer(), nullable=True), + sa.Column("peak_cpu_percent", sa.Numeric(5, 2), nullable=True), + sa.Column("peak_memory_percent", sa.Numeric(5, 2), nullable=True), + # Timestamps + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + # Primary key + sa.PrimaryKeyConstraint("id"), + ) + + # Create indexes + op.create_index("ix_capacity_snapshots_id", "capacity_snapshots", ["id"], unique=False) + op.create_index("ix_capacity_snapshots_date", "capacity_snapshots", ["snapshot_date"], unique=True) + + +def downgrade() -> None: + op.drop_index("ix_capacity_snapshots_date", table_name="capacity_snapshots") + op.drop_index("ix_capacity_snapshots_id", table_name="capacity_snapshots") + op.drop_table("capacity_snapshots") diff --git a/app/api/v1/admin/platform_health.py b/app/api/v1/admin/platform_health.py index 8b2cc3da..19d7f02f 100644 --- a/app/api/v1/admin/platform_health.py +++ b/app/api/v1/admin/platform_health.py @@ -144,3 +144,71 @@ async def get_capacity_metrics( """Get capacity-focused metrics for planning.""" metrics = platform_health_service.get_capacity_metrics(db) return CapacityMetricsResponse(**metrics) + + +@router.get("/subscription-capacity") +async def get_subscription_capacity( + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """ + Get subscription-based capacity metrics. + + Shows theoretical vs actual capacity based on all vendor subscriptions. + """ + return platform_health_service.get_subscription_capacity(db) + + +@router.get("/trends") +async def get_growth_trends( + days: int = 30, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """ + Get growth trends over the specified period. + + Returns growth rates and projections for key metrics. + """ + from app.services.capacity_forecast_service import capacity_forecast_service + + return capacity_forecast_service.get_growth_trends(db, days=days) + + +@router.get("/recommendations") +async def get_scaling_recommendations( + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """ + Get scaling recommendations based on current capacity and growth. + + Returns prioritized list of recommendations. + """ + from app.services.capacity_forecast_service import capacity_forecast_service + + return capacity_forecast_service.get_scaling_recommendations(db) + + +@router.post("/snapshot") +async def capture_snapshot( + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """ + Manually capture a capacity snapshot. + + Normally run automatically by daily background job. + """ + from app.services.capacity_forecast_service import capacity_forecast_service + + snapshot = capacity_forecast_service.capture_daily_snapshot(db) + db.commit() + + return { + "id": snapshot.id, + "snapshot_date": snapshot.snapshot_date.isoformat(), + "total_vendors": snapshot.total_vendors, + "total_products": snapshot.total_products, + "message": "Snapshot captured successfully", + } diff --git a/app/api/v1/admin/subscriptions.py b/app/api/v1/admin/subscriptions.py index 6eda8718..2ba9dd37 100644 --- a/app/api/v1/admin/subscriptions.py +++ b/app/api/v1/admin/subscriptions.py @@ -12,12 +12,16 @@ Provides endpoints for platform administrators to manage: import logging from fastapi import APIRouter, Depends, Path, Query +from sqlalchemy import func from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db from app.services.admin_subscription_service import admin_subscription_service +from models.database.product import Product from models.database.user import User +from models.database.vendor import VendorUser +from app.services.subscription_service import subscription_service from models.schema.billing import ( BillingHistoryListResponse, BillingHistoryWithVendor, @@ -26,6 +30,7 @@ from models.schema.billing import ( SubscriptionTierListResponse, SubscriptionTierResponse, SubscriptionTierUpdate, + VendorSubscriptionCreate, VendorSubscriptionListResponse, VendorSubscriptionResponse, VendorSubscriptionUpdate, @@ -144,9 +149,6 @@ def list_vendor_subscriptions( **VendorSubscriptionResponse.model_validate(sub).model_dump(), "vendor_name": vendor.name, "vendor_code": vendor.subdomain, - "orders_limit": sub.orders_limit, - "products_limit": sub.products_limit, - "team_members_limit": sub.team_members_limit, } subscriptions.append(VendorSubscriptionWithVendor(**sub_dict)) @@ -231,6 +233,73 @@ def list_billing_history( # ============================================================================ +@router.post("/{vendor_id}", response_model=VendorSubscriptionWithVendor, status_code=201) +def create_vendor_subscription( + create_data: VendorSubscriptionCreate, + vendor_id: int = Path(..., description="Vendor ID"), + current_user: User = Depends(get_current_admin_api), + db: Session = Depends(get_db), +): + """ + Create a subscription for a vendor. + + Creates a new subscription with the specified tier and status. + Defaults to Essential tier with trial status. + """ + from models.database.vendor import Vendor + + # Verify vendor exists + vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() + if not vendor: + from app.exceptions import ResourceNotFoundException + raise ResourceNotFoundException("Vendor", str(vendor_id)) + + # Create subscription using the subscription service + sub = subscription_service.get_or_create_subscription( + db, + vendor_id=vendor_id, + tier=create_data.tier, + trial_days=create_data.trial_days, + ) + + # Update status if not trial + if create_data.status != "trial": + sub.status = create_data.status + + sub.is_annual = create_data.is_annual + + db.commit() + db.refresh(sub) + + # Get usage counts + products_count = ( + db.query(func.count(Product.id)) + .filter(Product.vendor_id == vendor_id) + .scalar() + or 0 + ) + + team_count = ( + db.query(func.count(VendorUser.id)) + .filter( + VendorUser.vendor_id == vendor_id, + VendorUser.is_active == True, # noqa: E712 + ) + .scalar() + or 0 + ) + + logger.info(f"Admin created subscription for vendor {vendor_id}: tier={create_data.tier}") + + return VendorSubscriptionWithVendor( + **VendorSubscriptionResponse.model_validate(sub).model_dump(), + vendor_name=vendor.name, + vendor_code=vendor.subdomain, + products_count=products_count, + team_count=team_count, + ) + + @router.get("/{vendor_id}", response_model=VendorSubscriptionWithVendor) def get_vendor_subscription( vendor_id: int = Path(..., description="Vendor ID"), @@ -240,13 +309,30 @@ def get_vendor_subscription( """Get subscription details for a specific vendor.""" sub, vendor = admin_subscription_service.get_subscription(db, vendor_id) + # Get usage counts + products_count = ( + db.query(func.count(Product.id)) + .filter(Product.vendor_id == vendor_id) + .scalar() + or 0 + ) + + team_count = ( + db.query(func.count(VendorUser.id)) + .filter( + VendorUser.vendor_id == vendor_id, + VendorUser.is_active == True, # noqa: E712 + ) + .scalar() + or 0 + ) + return VendorSubscriptionWithVendor( **VendorSubscriptionResponse.model_validate(sub).model_dump(), vendor_name=vendor.name, vendor_code=vendor.subdomain, - orders_limit=sub.orders_limit, - products_limit=sub.products_limit, - team_members_limit=sub.team_members_limit, + products_count=products_count, + team_count=team_count, ) @@ -271,11 +357,28 @@ def update_vendor_subscription( db.commit() db.refresh(sub) + # Get usage counts + products_count = ( + db.query(func.count(Product.id)) + .filter(Product.vendor_id == vendor_id) + .scalar() + or 0 + ) + + team_count = ( + db.query(func.count(VendorUser.id)) + .filter( + VendorUser.vendor_id == vendor_id, + VendorUser.is_active == True, # noqa: E712 + ) + .scalar() + or 0 + ) + return VendorSubscriptionWithVendor( **VendorSubscriptionResponse.model_validate(sub).model_dump(), vendor_name=vendor.name, vendor_code=vendor.subdomain, - orders_limit=sub.orders_limit, - products_limit=sub.products_limit, - team_members_limit=sub.team_members_limit, + products_count=products_count, + team_count=team_count, ) diff --git a/app/api/v1/admin/vendor_products.py b/app/api/v1/admin/vendor_products.py index 0318bcaa..d56abc57 100644 --- a/app/api/v1/admin/vendor_products.py +++ b/app/api/v1/admin/vendor_products.py @@ -19,6 +19,7 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db +from app.services.subscription_service import subscription_service from app.services.vendor_product_service import vendor_product_service from models.database.user import User from models.schema.vendor_product import ( @@ -119,6 +120,9 @@ def create_vendor_product( current_admin: User = Depends(get_current_admin_api), ): """Create a new vendor product.""" + # Check product limit before creating + subscription_service.check_product_limit(db, data.vendor_id) + product = vendor_product_service.create_product(db, data.model_dump()) db.commit() # ✅ ARCH: Commit at API level for transaction control return VendorProductCreateResponse( diff --git a/app/api/v1/vendor/billing.py b/app/api/v1/vendor/billing.py index edc911e5..5f627d55 100644 --- a/app/api/v1/vendor/billing.py +++ b/app/api/v1/vendor/billing.py @@ -179,6 +179,37 @@ class CancelResponse(BaseModel): effective_date: str +class UpcomingInvoiceResponse(BaseModel): + """Upcoming invoice preview.""" + + amount_due_cents: int + currency: str + next_payment_date: str | None = None + line_items: list[dict] = [] + + +class ChangeTierRequest(BaseModel): + """Request to change subscription tier.""" + + tier_code: str + is_annual: bool = False + + +class ChangeTierResponse(BaseModel): + """Response for tier change.""" + + message: str + new_tier: str + effective_immediately: bool + + +class AddOnCancelResponse(BaseModel): + """Response for add-on cancellation.""" + + message: str + addon_code: str + + # ============================================================================ # Endpoints # ============================================================================ @@ -403,3 +434,92 @@ def reactivate_subscription( db.commit() return result + + +@router.get("/upcoming-invoice", response_model=UpcomingInvoiceResponse) +def get_upcoming_invoice( + current_user: User = Depends(get_current_vendor_api), + db: Session = Depends(get_db), +): + """Preview the upcoming invoice.""" + vendor_id = current_user.token_vendor_id + + result = billing_service.get_upcoming_invoice(db, vendor_id) + + return UpcomingInvoiceResponse( + amount_due_cents=result.get("amount_due_cents", 0), + currency=result.get("currency", "EUR"), + next_payment_date=result.get("next_payment_date"), + line_items=result.get("line_items", []), + ) + + +@router.post("/change-tier", response_model=ChangeTierResponse) +def change_tier( + request: ChangeTierRequest, + current_user: User = Depends(get_current_vendor_api), + db: Session = Depends(get_db), +): + """Change subscription tier (upgrade/downgrade).""" + vendor_id = current_user.token_vendor_id + + result = billing_service.change_tier( + db=db, + vendor_id=vendor_id, + new_tier_code=request.tier_code, + is_annual=request.is_annual, + ) + db.commit() + + return ChangeTierResponse( + message=result["message"], + new_tier=result["new_tier"], + effective_immediately=result["effective_immediately"], + ) + + +@router.post("/addons/purchase") +def purchase_addon( + request: AddOnPurchaseRequest, + current_user: User = Depends(get_current_vendor_api), + db: Session = Depends(get_db), +): + """Purchase an add-on product.""" + vendor_id = current_user.token_vendor_id + vendor = billing_service.get_vendor(db, vendor_id) + + # Build URLs + base_url = f"https://{settings.platform_domain}" + success_url = f"{base_url}/vendor/{vendor.vendor_code}/billing?addon_success=true" + cancel_url = f"{base_url}/vendor/{vendor.vendor_code}/billing?addon_cancelled=true" + + result = billing_service.purchase_addon( + db=db, + vendor_id=vendor_id, + addon_code=request.addon_code, + domain_name=request.domain_name, + quantity=request.quantity, + success_url=success_url, + cancel_url=cancel_url, + ) + db.commit() + + return result + + +@router.delete("/addons/{addon_id}", response_model=AddOnCancelResponse) +def cancel_addon( + addon_id: int, + current_user: User = Depends(get_current_vendor_api), + db: Session = Depends(get_db), +): + """Cancel a purchased add-on.""" + vendor_id = current_user.token_vendor_id + + result = billing_service.cancel_addon(db, vendor_id, addon_id) + db.commit() + + return AddOnCancelResponse( + message=result["message"], + addon_code=result["addon_code"], + ) diff --git a/app/services/admin_subscription_service.py b/app/services/admin_subscription_service.py index afae0815..5f77fd62 100644 --- a/app/services/admin_subscription_service.py +++ b/app/services/admin_subscription_service.py @@ -177,7 +177,7 @@ class AdminSubscriptionService: ) if not result: - raise ResourceNotFoundException(f"Subscription for vendor {vendor_id} not found") + raise ResourceNotFoundException("Subscription", str(vendor_id)) return result diff --git a/app/services/billing_service.py b/app/services/billing_service.py index 0052f213..5c2da065 100644 --- a/app/services/billing_service.py +++ b/app/services/billing_service.py @@ -365,6 +365,224 @@ class BillingService: return {"message": "Subscription reactivated successfully"} + def get_upcoming_invoice(self, db: Session, vendor_id: int) -> dict: + """ + Get upcoming invoice preview. + + Returns: + Dict with amount_due_cents, currency, next_payment_date, line_items + + Raises: + NoActiveSubscriptionError: If no subscription with customer ID + """ + subscription = subscription_service.get_subscription(db, vendor_id) + + if not subscription or not subscription.stripe_customer_id: + raise NoActiveSubscriptionError() + + if not stripe_service.is_configured: + # Return empty preview if Stripe not configured + return { + "amount_due_cents": 0, + "currency": "EUR", + "next_payment_date": None, + "line_items": [], + } + + invoice = stripe_service.get_upcoming_invoice(subscription.stripe_customer_id) + + if not invoice: + return { + "amount_due_cents": 0, + "currency": "EUR", + "next_payment_date": None, + "line_items": [], + } + + line_items = [] + if invoice.lines and invoice.lines.data: + for line in invoice.lines.data: + line_items.append({ + "description": line.description or "", + "amount_cents": line.amount, + "quantity": line.quantity or 1, + }) + + return { + "amount_due_cents": invoice.amount_due, + "currency": invoice.currency.upper(), + "next_payment_date": datetime.fromtimestamp(invoice.next_payment_attempt).isoformat() + if invoice.next_payment_attempt + else None, + "line_items": line_items, + } + + def change_tier( + self, + db: Session, + vendor_id: int, + new_tier_code: str, + is_annual: bool, + ) -> dict: + """ + Change subscription tier (upgrade/downgrade). + + Returns: + Dict with message, new_tier, effective_immediately + + Raises: + TierNotFoundError: If tier doesn't exist + NoActiveSubscriptionError: If no subscription + StripePriceNotConfiguredError: If price not configured + """ + subscription = subscription_service.get_subscription(db, vendor_id) + + if not subscription or not subscription.stripe_subscription_id: + raise NoActiveSubscriptionError() + + tier = self.get_tier_by_code(db, new_tier_code) + + price_id = ( + tier.stripe_price_annual_id + if is_annual and tier.stripe_price_annual_id + else tier.stripe_price_monthly_id + ) + + if not price_id: + raise StripePriceNotConfiguredError(new_tier_code) + + # Update in Stripe + if stripe_service.is_configured: + stripe_service.update_subscription( + subscription_id=subscription.stripe_subscription_id, + new_price_id=price_id, + ) + + # Update local subscription + old_tier = subscription.tier + subscription.tier = new_tier_code + subscription.tier_id = tier.id + subscription.is_annual = is_annual + subscription.updated_at = datetime.utcnow() + + is_upgrade = self._is_upgrade(db, old_tier, new_tier_code) + + return { + "message": f"Subscription {'upgraded' if is_upgrade else 'changed'} to {tier.name}", + "new_tier": new_tier_code, + "effective_immediately": True, + } + + def _is_upgrade(self, db: Session, old_tier: str, new_tier: str) -> bool: + """Check if tier change is an upgrade.""" + old = db.query(SubscriptionTier).filter(SubscriptionTier.code == old_tier).first() + new = db.query(SubscriptionTier).filter(SubscriptionTier.code == new_tier).first() + + if not old or not new: + return False + + return new.display_order > old.display_order + + def purchase_addon( + self, + db: Session, + vendor_id: int, + addon_code: str, + domain_name: str | None, + quantity: int, + success_url: str, + cancel_url: str, + ) -> dict: + """ + Create checkout session for add-on purchase. + + Returns: + Dict with checkout_url and session_id + + Raises: + PaymentSystemNotConfiguredError: If Stripe not configured + AddonNotFoundError: If addon doesn't exist + """ + if not stripe_service.is_configured: + raise PaymentSystemNotConfiguredError() + + addon = ( + db.query(AddOnProduct) + .filter( + AddOnProduct.code == addon_code, + AddOnProduct.is_active == True, # noqa: E712 + ) + .first() + ) + + if not addon: + raise BillingServiceError(f"Add-on '{addon_code}' not found") + + if not addon.stripe_price_id: + raise BillingServiceError(f"Stripe price not configured for add-on '{addon_code}'") + + vendor = self.get_vendor(db, vendor_id) + subscription = subscription_service.get_or_create_subscription(db, vendor_id) + + # Create checkout session for add-on + session = stripe_service.create_checkout_session( + db=db, + vendor=vendor, + price_id=addon.stripe_price_id, + success_url=success_url, + cancel_url=cancel_url, + quantity=quantity, + metadata={ + "addon_code": addon_code, + "domain_name": domain_name or "", + }, + ) + + return { + "checkout_url": session.url, + "session_id": session.id, + } + + def cancel_addon(self, db: Session, vendor_id: int, addon_id: int) -> dict: + """ + Cancel a purchased add-on. + + Returns: + Dict with message and addon_code + + Raises: + BillingServiceError: If addon not found or not owned by vendor + """ + vendor_addon = ( + db.query(VendorAddOn) + .filter( + VendorAddOn.id == addon_id, + VendorAddOn.vendor_id == vendor_id, + ) + .first() + ) + + if not vendor_addon: + raise BillingServiceError("Add-on not found") + + addon_code = vendor_addon.addon_product.code + + # Cancel in Stripe if applicable + if stripe_service.is_configured and vendor_addon.stripe_subscription_item_id: + try: + stripe_service.cancel_subscription_item(vendor_addon.stripe_subscription_item_id) + except Exception as e: + logger.warning(f"Failed to cancel addon in Stripe: {e}") + + # Mark as cancelled + vendor_addon.status = "cancelled" + vendor_addon.cancelled_at = datetime.utcnow() + + return { + "message": "Add-on cancelled successfully", + "addon_code": addon_code, + } + # Create service instance billing_service = BillingService() diff --git a/app/services/capacity_forecast_service.py b/app/services/capacity_forecast_service.py new file mode 100644 index 00000000..de380c37 --- /dev/null +++ b/app/services/capacity_forecast_service.py @@ -0,0 +1,321 @@ +# app/services/capacity_forecast_service.py +""" +Capacity forecasting service for growth trends and scaling recommendations. + +Provides: +- Historical capacity trend analysis +- Growth rate calculations +- Days-until-threshold projections +- Scaling recommendations based on growth patterns +""" + +import logging +from datetime import UTC, datetime, timedelta +from decimal import Decimal + +from sqlalchemy import func +from sqlalchemy.orm import Session + +from models.database.product import Product +from models.database.subscription import ( + CapacitySnapshot, + SubscriptionStatus, + VendorSubscription, +) +from models.database.vendor import Vendor, VendorUser + +logger = logging.getLogger(__name__) + + +# Scaling thresholds based on capacity-planning.md +INFRASTRUCTURE_SCALING = [ + {"name": "Starter", "max_vendors": 50, "max_products": 10_000, "cost_monthly": 30}, + {"name": "Small", "max_vendors": 100, "max_products": 30_000, "cost_monthly": 80}, + {"name": "Medium", "max_vendors": 300, "max_products": 100_000, "cost_monthly": 150}, + {"name": "Large", "max_vendors": 500, "max_products": 250_000, "cost_monthly": 350}, + {"name": "Scale", "max_vendors": 1000, "max_products": 500_000, "cost_monthly": 700}, + {"name": "Enterprise", "max_vendors": None, "max_products": None, "cost_monthly": 1500}, +] + + +class CapacityForecastService: + """Service for capacity forecasting and trend analysis.""" + + def capture_daily_snapshot(self, db: Session) -> CapacitySnapshot: + """ + Capture a daily snapshot of platform capacity metrics. + + Should be called by a daily background job. + """ + from app.services.image_service import image_service + from app.services.platform_health_service import platform_health_service + + now = datetime.now(UTC) + today = now.replace(hour=0, minute=0, second=0, microsecond=0) + + # Check if snapshot already exists for today + existing = ( + db.query(CapacitySnapshot) + .filter(CapacitySnapshot.snapshot_date == today) + .first() + ) + if existing: + logger.info(f"Snapshot already exists for {today}") + return existing + + # Gather metrics + total_vendors = db.query(func.count(Vendor.id)).scalar() or 0 + active_vendors = ( + db.query(func.count(Vendor.id)) + .filter(Vendor.is_active == True) # noqa: E712 + .scalar() + or 0 + ) + + # Subscription metrics + total_subs = db.query(func.count(VendorSubscription.id)).scalar() or 0 + active_subs = ( + db.query(func.count(VendorSubscription.id)) + .filter(VendorSubscription.status.in_(["active", "trial"])) + .scalar() + or 0 + ) + trial_vendors = ( + db.query(func.count(VendorSubscription.id)) + .filter(VendorSubscription.status == SubscriptionStatus.TRIAL.value) + .scalar() + or 0 + ) + + # Resource metrics + total_products = db.query(func.count(Product.id)).scalar() or 0 + total_team = ( + db.query(func.count(VendorUser.id)) + .filter(VendorUser.is_active == True) # noqa: E712 + .scalar() + or 0 + ) + + # Orders this month + start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + total_orders = sum( + s.orders_this_period + for s in db.query(VendorSubscription).all() + ) + + # Storage metrics + try: + image_stats = image_service.get_storage_stats() + storage_gb = image_stats.get("total_size_gb", 0) + except Exception: + storage_gb = 0 + + try: + db_size = platform_health_service._get_database_size(db) + except Exception: + db_size = 0 + + # Theoretical capacity from subscriptions + capacity = platform_health_service.get_subscription_capacity(db) + theoretical_products = capacity["products"].get("theoretical_limit", 0) + theoretical_orders = capacity["orders_monthly"].get("theoretical_limit", 0) + theoretical_team = capacity["team_members"].get("theoretical_limit", 0) + + # Tier distribution + tier_distribution = capacity.get("tier_distribution", {}) + + # Create snapshot + snapshot = CapacitySnapshot( + snapshot_date=today, + total_vendors=total_vendors, + active_vendors=active_vendors, + trial_vendors=trial_vendors, + total_subscriptions=total_subs, + active_subscriptions=active_subs, + total_products=total_products, + total_orders_month=total_orders, + total_team_members=total_team, + storage_used_gb=Decimal(str(storage_gb)), + db_size_mb=Decimal(str(db_size)), + theoretical_products_limit=theoretical_products, + theoretical_orders_limit=theoretical_orders, + theoretical_team_limit=theoretical_team, + tier_distribution=tier_distribution, + ) + + db.add(snapshot) + db.flush() + db.refresh(snapshot) + + logger.info(f"Captured capacity snapshot for {today}") + return snapshot + + def get_growth_trends(self, db: Session, days: int = 30) -> dict: + """ + Calculate growth trends over the specified period. + + Returns growth rates and projections for key metrics. + """ + now = datetime.now(UTC) + start_date = now - timedelta(days=days) + + # Get snapshots for the period + snapshots = ( + db.query(CapacitySnapshot) + .filter(CapacitySnapshot.snapshot_date >= start_date) + .order_by(CapacitySnapshot.snapshot_date) + .all() + ) + + if len(snapshots) < 2: + return { + "period_days": days, + "snapshots_available": len(snapshots), + "trends": {}, + "message": "Insufficient data for trend analysis", + } + + first = snapshots[0] + last = snapshots[-1] + period_days = (last.snapshot_date - first.snapshot_date).days or 1 + + def calc_growth(metric: str) -> dict: + start_val = getattr(first, metric) or 0 + end_val = getattr(last, metric) or 0 + change = end_val - start_val + + if start_val > 0: + growth_rate = (change / start_val) * 100 + daily_rate = growth_rate / period_days + monthly_rate = daily_rate * 30 + else: + growth_rate = 0 if end_val == 0 else 100 + daily_rate = 0 + monthly_rate = 0 + + return { + "start_value": start_val, + "current_value": end_val, + "change": change, + "growth_rate_percent": round(growth_rate, 2), + "daily_growth_rate": round(daily_rate, 3), + "monthly_projection": round(end_val * (1 + monthly_rate / 100), 0), + } + + trends = { + "vendors": calc_growth("active_vendors"), + "products": calc_growth("total_products"), + "orders": calc_growth("total_orders_month"), + "team_members": calc_growth("total_team_members"), + "storage_gb": { + "start_value": float(first.storage_used_gb or 0), + "current_value": float(last.storage_used_gb or 0), + "change": float((last.storage_used_gb or 0) - (first.storage_used_gb or 0)), + }, + } + + return { + "period_days": period_days, + "snapshots_available": len(snapshots), + "start_date": first.snapshot_date.isoformat(), + "end_date": last.snapshot_date.isoformat(), + "trends": trends, + } + + def get_scaling_recommendations(self, db: Session) -> list[dict]: + """ + Generate scaling recommendations based on current capacity and growth. + + Returns prioritized list of recommendations. + """ + from app.services.platform_health_service import platform_health_service + + recommendations = [] + + # Get current capacity + capacity = platform_health_service.get_subscription_capacity(db) + health = platform_health_service.get_full_health_report(db) + trends = self.get_growth_trends(db, days=30) + + # Check product capacity + products = capacity["products"] + if products.get("utilization_percent") and products["utilization_percent"] > 80: + recommendations.append({ + "category": "capacity", + "severity": "warning", + "title": "Product capacity approaching limit", + "description": f"Currently at {products['utilization_percent']:.0f}% of theoretical product capacity", + "action": "Consider upgrading vendor tiers or adding capacity", + }) + + # Check infrastructure tier + current_tier = health.get("infrastructure_tier", {}) + next_trigger = health.get("next_tier_trigger") + if next_trigger: + recommendations.append({ + "category": "infrastructure", + "severity": "info", + "title": f"Current tier: {current_tier.get('name', 'Unknown')}", + "description": f"Next upgrade trigger: {next_trigger}", + "action": "Monitor growth and plan for infrastructure scaling", + }) + + # Check growth rate + if trends.get("trends"): + vendor_growth = trends["trends"].get("vendors", {}) + if vendor_growth.get("monthly_projection", 0) > 0: + monthly_rate = vendor_growth.get("growth_rate_percent", 0) + if monthly_rate > 20: + recommendations.append({ + "category": "growth", + "severity": "info", + "title": "High vendor growth rate", + "description": f"Vendor base growing at {monthly_rate:.1f}% over last 30 days", + "action": "Ensure infrastructure can scale to meet demand", + }) + + # Check storage + storage_percent = health.get("image_storage", {}).get("total_size_gb", 0) + if storage_percent > 800: # 80% of 1TB + recommendations.append({ + "category": "storage", + "severity": "warning", + "title": "Storage usage high", + "description": f"Image storage at {storage_percent:.1f} GB", + "action": "Plan for storage expansion or implement cleanup policies", + }) + + # Sort by severity + severity_order = {"critical": 0, "warning": 1, "info": 2} + recommendations.sort(key=lambda r: severity_order.get(r["severity"], 3)) + + return recommendations + + def get_days_until_threshold( + self, db: Session, metric: str, threshold: int + ) -> int | None: + """ + Calculate days until a metric reaches a threshold based on current growth. + + Returns None if insufficient data or no growth. + """ + trends = self.get_growth_trends(db, days=30) + + if not trends.get("trends") or metric not in trends["trends"]: + return None + + metric_data = trends["trends"][metric] + current = metric_data.get("current_value", 0) + daily_rate = metric_data.get("daily_growth_rate", 0) + + if daily_rate <= 0 or current >= threshold: + return None + + remaining = threshold - current + days = remaining / (current * daily_rate / 100) if current > 0 else None + + return int(days) if days else None + + +# Singleton instance +capacity_forecast_service = CapacityForecastService() diff --git a/app/services/platform_health_service.py b/app/services/platform_health_service.py index b4e0cca8..30a5363b 100644 --- a/app/services/platform_health_service.py +++ b/app/services/platform_health_service.py @@ -166,6 +166,101 @@ class PlatformHealthService: "active_vendors": active_vendors, } + def get_subscription_capacity(self, db: Session) -> dict: + """ + Calculate theoretical capacity based on all vendor subscriptions. + + Returns aggregated limits and current usage for capacity planning. + """ + from models.database.subscription import VendorSubscription + from models.database.vendor import VendorUser + + # Get all active subscriptions with their limits + subscriptions = ( + db.query(VendorSubscription) + .filter(VendorSubscription.status.in_(["active", "trial"])) + .all() + ) + + # Aggregate theoretical limits + total_products_limit = 0 + total_orders_limit = 0 + total_team_limit = 0 + unlimited_products = 0 + unlimited_orders = 0 + unlimited_team = 0 + + tier_distribution = {} + + for sub in subscriptions: + # Track tier distribution + tier = sub.tier or "unknown" + tier_distribution[tier] = tier_distribution.get(tier, 0) + 1 + + # Aggregate limits + if sub.products_limit is None: + unlimited_products += 1 + else: + total_products_limit += sub.products_limit + + if sub.orders_limit is None: + unlimited_orders += 1 + else: + total_orders_limit += sub.orders_limit + + if sub.team_members_limit is None: + unlimited_team += 1 + else: + total_team_limit += sub.team_members_limit + + # Get actual usage + actual_products = db.query(func.count(Product.id)).scalar() or 0 + actual_team = ( + db.query(func.count(VendorUser.id)) + .filter(VendorUser.is_active == True) # noqa: E712 + .scalar() + or 0 + ) + + # Orders this period (aggregate across all subscriptions) + total_orders_used = sum(s.orders_this_period for s in subscriptions) + + def calc_utilization(actual: int, limit: int, unlimited: int) -> dict: + if unlimited > 0: + # Some subscriptions have unlimited - can't calculate true % + return { + "actual": actual, + "theoretical_limit": limit, + "unlimited_count": unlimited, + "utilization_percent": None, + "has_unlimited": True, + } + elif limit > 0: + return { + "actual": actual, + "theoretical_limit": limit, + "unlimited_count": 0, + "utilization_percent": round((actual / limit) * 100, 1), + "headroom": limit - actual, + "has_unlimited": False, + } + else: + return { + "actual": actual, + "theoretical_limit": 0, + "unlimited_count": 0, + "utilization_percent": 0, + "has_unlimited": False, + } + + return { + "total_subscriptions": len(subscriptions), + "tier_distribution": tier_distribution, + "products": calc_utilization(actual_products, total_products_limit, unlimited_products), + "orders_monthly": calc_utilization(total_orders_used, total_orders_limit, unlimited_orders), + "team_members": calc_utilization(actual_team, total_team_limit, unlimited_team), + } + def get_full_health_report(self, db: Session) -> dict: """Get comprehensive platform health report.""" # System metrics @@ -177,6 +272,9 @@ class PlatformHealthService: # Image storage metrics image_storage = self.get_image_storage_metrics() + # Subscription capacity + subscription_capacity = self.get_subscription_capacity(db) + # Calculate thresholds thresholds = self._calculate_thresholds(system, database, image_storage) @@ -197,6 +295,7 @@ class PlatformHealthService: "system": system, "database": database, "image_storage": image_storage, + "subscription_capacity": subscription_capacity, "thresholds": thresholds, "recommendations": recommendations, "infrastructure_tier": tier, diff --git a/app/services/stripe_service.py b/app/services/stripe_service.py index f7b2a97b..fde8a78b 100644 --- a/app/services/stripe_service.py +++ b/app/services/stripe_service.py @@ -251,6 +251,19 @@ class StripeService: logger.info(f"Reactivated Stripe subscription {subscription_id}") return subscription + def cancel_subscription_item(self, subscription_item_id: str) -> None: + """ + Cancel a subscription item (used for add-ons). + + Args: + subscription_item_id: Stripe subscription item ID + """ + if not self.is_configured: + raise ValueError("Stripe is not configured") + + stripe.SubscriptionItem.delete(subscription_item_id) + logger.info(f"Cancelled Stripe subscription item {subscription_item_id}") + # ========================================================================= # Checkout & Portal # ========================================================================= @@ -263,6 +276,8 @@ class StripeService: success_url: str, cancel_url: str, trial_days: int | None = None, + quantity: int = 1, + metadata: dict | None = None, ) -> stripe.checkout.Session: """ Create a Stripe Checkout session for subscription signup. @@ -274,6 +289,8 @@ class StripeService: success_url: URL to redirect on success cancel_url: URL to redirect on cancel trial_days: Optional trial period + quantity: Number of items (default 1) + metadata: Additional metadata to store Returns: Stripe Checkout Session object @@ -311,16 +328,21 @@ class StripeService: subscription.stripe_customer_id = customer_id db.flush() + # Build metadata + session_metadata = { + "vendor_id": str(vendor.id), + "vendor_code": vendor.vendor_code, + } + if metadata: + session_metadata.update(metadata) + session_data = { "customer": customer_id, - "line_items": [{"price": price_id, "quantity": 1}], + "line_items": [{"price": price_id, "quantity": quantity}], "mode": "subscription", "success_url": success_url, "cancel_url": cancel_url, - "metadata": { - "vendor_id": str(vendor.id), - "vendor_code": vendor.vendor_code, - }, + "metadata": session_metadata, } if trial_days: diff --git a/app/services/subscription_service.py b/app/services/subscription_service.py index 0f01ecac..aa99fc8f 100644 --- a/app/services/subscription_service.py +++ b/app/services/subscription_service.py @@ -40,6 +40,7 @@ from models.schema.subscription import ( SubscriptionUsage, TierInfo, TierLimits, + UsageSummary, ) logger = logging.getLogger(__name__) @@ -79,8 +80,35 @@ class SubscriptionService: # Tier Information # ========================================================================= - def get_tier_info(self, tier_code: str) -> TierInfo: - """Get full tier information.""" + def get_tier_info(self, tier_code: str, db: Session | None = None) -> TierInfo: + """ + Get full tier information. + + Queries database if db session provided, otherwise falls back to TIER_LIMITS. + """ + # Try database first if session provided + if db is not None: + db_tier = self.get_tier_by_code(db, tier_code) + if db_tier: + return TierInfo( + code=db_tier.code, + name=db_tier.name, + price_monthly_cents=db_tier.price_monthly_cents, + price_annual_cents=db_tier.price_annual_cents, + limits=TierLimits( + orders_per_month=db_tier.orders_per_month, + products_limit=db_tier.products_limit, + team_members=db_tier.team_members, + order_history_months=db_tier.order_history_months, + ), + features=db_tier.features or [], + ) + + # Fallback to hardcoded TIER_LIMITS + return self._get_tier_from_legacy(tier_code) + + def _get_tier_from_legacy(self, tier_code: str) -> TierInfo: + """Get tier info from hardcoded TIER_LIMITS (fallback).""" try: tier = TierCode(tier_code) except ValueError: @@ -101,10 +129,43 @@ class SubscriptionService: features=limits.get("features", []), ) - def get_all_tiers(self) -> list[TierInfo]: - """Get information for all tiers.""" + def get_all_tiers(self, db: Session | None = None) -> list[TierInfo]: + """ + Get information for all tiers. + + Queries database if db session provided, otherwise falls back to TIER_LIMITS. + """ + if db is not None: + db_tiers = ( + db.query(SubscriptionTier) + .filter( + SubscriptionTier.is_active == True, # noqa: E712 + SubscriptionTier.is_public == True, # noqa: E712 + ) + .order_by(SubscriptionTier.display_order) + .all() + ) + if db_tiers: + return [ + TierInfo( + code=t.code, + name=t.name, + price_monthly_cents=t.price_monthly_cents, + price_annual_cents=t.price_annual_cents, + limits=TierLimits( + orders_per_month=t.orders_per_month, + products_limit=t.products_limit, + team_members=t.team_members, + order_history_months=t.order_history_months, + ), + features=t.features or [], + ) + for t in db_tiers + ] + + # Fallback to hardcoded return [ - self.get_tier_info(tier.value) + self._get_tier_from_legacy(tier.value) for tier in TierCode ] @@ -363,6 +424,47 @@ class SubscriptionService: team_members_percent_used=calc_percent(team_count, team_limit), ) + def get_usage_summary(self, db: Session, vendor_id: int) -> UsageSummary: + """Get usage summary for billing page display.""" + subscription = self.get_or_create_subscription(db, vendor_id) + + # Get actual counts + products_count = ( + db.query(func.count(Product.id)) + .filter(Product.vendor_id == vendor_id) + .scalar() + or 0 + ) + + team_count = ( + db.query(func.count(VendorUser.id)) + .filter(VendorUser.vendor_id == vendor_id, VendorUser.is_active == True) + .scalar() + or 0 + ) + + # Get limits + orders_limit = subscription.orders_limit + products_limit = subscription.products_limit + team_limit = subscription.team_members_limit + + def calc_remaining(current: int, limit: int | None) -> int | None: + if limit is None: + return None + return max(0, limit - current) + + return UsageSummary( + orders_this_period=subscription.orders_this_period, + orders_limit=orders_limit, + orders_remaining=calc_remaining(subscription.orders_this_period, orders_limit), + products_count=products_count, + products_limit=products_limit, + products_remaining=calc_remaining(products_count, products_limit), + team_count=team_count, + team_limit=team_limit, + team_remaining=calc_remaining(team_count, team_limit), + ) + def increment_order_count(self, db: Session, vendor_id: int) -> None: """ Increment the order counter for the current period. diff --git a/app/tasks/subscription_tasks.py b/app/tasks/subscription_tasks.py new file mode 100644 index 00000000..02837de6 --- /dev/null +++ b/app/tasks/subscription_tasks.py @@ -0,0 +1,318 @@ +# app/tasks/subscription_tasks.py +""" +Background tasks for subscription management. + +Provides scheduled tasks for: +- Resetting period counters at billing period end +- Expiring trials without payment methods +- Syncing subscription status with Stripe +- Capturing daily capacity snapshots +""" + +import logging +from datetime import UTC, datetime, timedelta + +from app.core.database import SessionLocal +from app.services.stripe_service import stripe_service +from models.database.subscription import SubscriptionStatus, VendorSubscription + +logger = logging.getLogger(__name__) + + +async def reset_period_counters(): + """ + Reset order counters for subscriptions whose billing period has ended. + + Should run daily. Resets orders_this_period to 0 and updates period dates. + """ + db = SessionLocal() + now = datetime.now(UTC) + reset_count = 0 + + try: + # Find subscriptions where period has ended + expired_periods = ( + db.query(VendorSubscription) + .filter( + VendorSubscription.period_end <= now, + VendorSubscription.status.in_(["active", "trial"]), + ) + .all() + ) + + for subscription in expired_periods: + old_period_end = subscription.period_end + + # Reset counters + subscription.orders_this_period = 0 + subscription.orders_limit_reached_at = None + + # Set new period dates + if subscription.is_annual: + subscription.period_start = now + subscription.period_end = now + timedelta(days=365) + else: + subscription.period_start = now + subscription.period_end = now + timedelta(days=30) + + subscription.updated_at = now + reset_count += 1 + + logger.info( + f"Reset period counters for vendor {subscription.vendor_id}: " + f"old_period_end={old_period_end}, new_period_end={subscription.period_end}" + ) + + db.commit() + logger.info(f"Reset period counters for {reset_count} subscriptions") + + except Exception as e: + logger.error(f"Error resetting period counters: {e}") + db.rollback() + raise + finally: + db.close() + + return {"reset_count": reset_count} + + +async def check_trial_expirations(): + """ + Check for expired trials and update their status. + + Trials without a payment method are marked as expired. + Trials with a payment method transition to active. + + Should run daily. + """ + db = SessionLocal() + now = datetime.now(UTC) + expired_count = 0 + activated_count = 0 + + try: + # Find expired trials + expired_trials = ( + db.query(VendorSubscription) + .filter( + VendorSubscription.status == SubscriptionStatus.TRIAL.value, + VendorSubscription.trial_ends_at <= now, + ) + .all() + ) + + for subscription in expired_trials: + if subscription.stripe_payment_method_id: + # Has payment method - activate + subscription.status = SubscriptionStatus.ACTIVE.value + activated_count += 1 + logger.info( + f"Activated subscription for vendor {subscription.vendor_id} " + f"(trial ended with payment method)" + ) + else: + # No payment method - expire + subscription.status = SubscriptionStatus.EXPIRED.value + expired_count += 1 + logger.info( + f"Expired trial for vendor {subscription.vendor_id} " + f"(no payment method)" + ) + + subscription.updated_at = now + + db.commit() + logger.info( + f"Trial expiration check: {expired_count} expired, {activated_count} activated" + ) + + except Exception as e: + logger.error(f"Error checking trial expirations: {e}") + db.rollback() + raise + finally: + db.close() + + return {"expired_count": expired_count, "activated_count": activated_count} + + +async def sync_stripe_status(): + """ + Sync subscription status with Stripe. + + Fetches current status from Stripe and updates local records. + Handles cases where Stripe status differs from local status. + + Should run hourly. + """ + if not stripe_service.is_configured: + logger.warning("Stripe not configured, skipping sync") + return {"synced": 0, "skipped": True} + + db = SessionLocal() + synced_count = 0 + error_count = 0 + + try: + # Find subscriptions with Stripe IDs + subscriptions = ( + db.query(VendorSubscription) + .filter(VendorSubscription.stripe_subscription_id.isnot(None)) + .all() + ) + + for subscription in subscriptions: + try: + # Fetch from Stripe + stripe_sub = stripe_service.get_subscription( + subscription.stripe_subscription_id + ) + + if not stripe_sub: + logger.warning( + f"Stripe subscription {subscription.stripe_subscription_id} " + f"not found for vendor {subscription.vendor_id}" + ) + continue + + # Map Stripe status to local status + status_map = { + "active": SubscriptionStatus.ACTIVE.value, + "trialing": SubscriptionStatus.TRIAL.value, + "past_due": SubscriptionStatus.PAST_DUE.value, + "canceled": SubscriptionStatus.CANCELLED.value, + "unpaid": SubscriptionStatus.PAST_DUE.value, + "incomplete": SubscriptionStatus.TRIAL.value, + "incomplete_expired": SubscriptionStatus.EXPIRED.value, + } + + new_status = status_map.get(stripe_sub.status) + if new_status and new_status != subscription.status: + old_status = subscription.status + subscription.status = new_status + subscription.updated_at = datetime.now(UTC) + logger.info( + f"Updated vendor {subscription.vendor_id} status: " + f"{old_status} -> {new_status} (from Stripe)" + ) + + # Update period dates from Stripe + if stripe_sub.current_period_start: + subscription.period_start = datetime.fromtimestamp( + stripe_sub.current_period_start, tz=UTC + ) + if stripe_sub.current_period_end: + subscription.period_end = datetime.fromtimestamp( + stripe_sub.current_period_end, tz=UTC + ) + + # Update payment method + if stripe_sub.default_payment_method: + subscription.stripe_payment_method_id = ( + stripe_sub.default_payment_method + if isinstance(stripe_sub.default_payment_method, str) + else stripe_sub.default_payment_method.id + ) + + synced_count += 1 + + except Exception as e: + logger.error( + f"Error syncing subscription {subscription.stripe_subscription_id}: {e}" + ) + error_count += 1 + + db.commit() + logger.info(f"Stripe sync complete: {synced_count} synced, {error_count} errors") + + except Exception as e: + logger.error(f"Error in Stripe sync task: {e}") + db.rollback() + raise + finally: + db.close() + + return {"synced_count": synced_count, "error_count": error_count} + + +async def cleanup_stale_subscriptions(): + """ + Clean up subscriptions in inconsistent states. + + Handles edge cases like: + - Subscriptions stuck in processing + - Old cancelled subscriptions past their period end + + Should run weekly. + """ + db = SessionLocal() + now = datetime.now(UTC) + cleaned_count = 0 + + try: + # Find cancelled subscriptions past their period end + stale_cancelled = ( + db.query(VendorSubscription) + .filter( + VendorSubscription.status == SubscriptionStatus.CANCELLED.value, + VendorSubscription.period_end < now - timedelta(days=30), + ) + .all() + ) + + for subscription in stale_cancelled: + # Mark as expired (fully terminated) + subscription.status = SubscriptionStatus.EXPIRED.value + subscription.updated_at = now + cleaned_count += 1 + logger.info( + f"Marked stale cancelled subscription as expired: " + f"vendor {subscription.vendor_id}" + ) + + db.commit() + logger.info(f"Cleaned up {cleaned_count} stale subscriptions") + + except Exception as e: + logger.error(f"Error cleaning up stale subscriptions: {e}") + db.rollback() + raise + finally: + db.close() + + return {"cleaned_count": cleaned_count} + + +async def capture_capacity_snapshot(): + """ + Capture a daily snapshot of platform capacity metrics. + + Used for growth trending and capacity forecasting. + Should run daily (e.g., at midnight). + """ + from app.services.capacity_forecast_service import capacity_forecast_service + + db = SessionLocal() + + try: + snapshot = capacity_forecast_service.capture_daily_snapshot(db) + db.commit() + + logger.info( + f"Captured capacity snapshot: {snapshot.total_vendors} vendors, " + f"{snapshot.total_products} products" + ) + + return { + "snapshot_id": snapshot.id, + "snapshot_date": snapshot.snapshot_date.isoformat(), + "total_vendors": snapshot.total_vendors, + "total_products": snapshot.total_products, + } + + except Exception as e: + logger.error(f"Error capturing capacity snapshot: {e}") + db.rollback() + raise + finally: + db.close() diff --git a/app/templates/admin/vendor-detail.html b/app/templates/admin/vendor-detail.html index 81be0d47..884cf0e9 100644 --- a/app/templates/admin/vendor-detail.html +++ b/app/templates/admin/vendor-detail.html @@ -106,6 +106,146 @@ + +
No Subscription Found
+This vendor doesn't have a subscription yet.
++ +
+No add-ons available
-