feat: complete subscription billing system phases 6-10

Phase 6 - Database-driven tiers:
- Update subscription_service to query database first with legacy fallback
- Add get_tier_info() db parameter and _get_tier_from_legacy() method

Phase 7 - Platform health integration:
- Add get_subscription_capacity() for theoretical vs actual capacity
- Include subscription capacity in full health report

Phase 8 - Background subscription tasks:
- Add reset_period_counters() for billing period resets
- Add check_trial_expirations() for trial management
- Add sync_stripe_status() for Stripe synchronization
- Add cleanup_stale_subscriptions() for maintenance
- Add capture_capacity_snapshot() for daily metrics

Phase 10 - Capacity planning & forecasting:
- Add CapacitySnapshot model for historical tracking
- Create capacity_forecast_service with growth trends
- Add /subscription-capacity, /trends, /recommendations endpoints
- Add /snapshot endpoint for manual captures

Also includes billing API enhancements from phase 4:
- Add upcoming-invoice, change-tier, addon purchase/cancel endpoints
- Add UsageSummary schema for billing page
- Enhance billing.js with addon management functions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-26 20:51:13 +01:00
parent b717c23787
commit c6e7f4087f
20 changed files with 1895 additions and 29 deletions

View File

@@ -177,7 +177,7 @@ class AdminSubscriptionService:
)
if not result:
raise ResourceNotFoundException(f"Subscription for vendor {vendor_id} not found")
raise ResourceNotFoundException("Subscription", str(vendor_id))
return result

View File

@@ -365,6 +365,224 @@ class BillingService:
return {"message": "Subscription reactivated successfully"}
def get_upcoming_invoice(self, db: Session, vendor_id: int) -> dict:
"""
Get upcoming invoice preview.
Returns:
Dict with amount_due_cents, currency, next_payment_date, line_items
Raises:
NoActiveSubscriptionError: If no subscription with customer ID
"""
subscription = subscription_service.get_subscription(db, vendor_id)
if not subscription or not subscription.stripe_customer_id:
raise NoActiveSubscriptionError()
if not stripe_service.is_configured:
# Return empty preview if Stripe not configured
return {
"amount_due_cents": 0,
"currency": "EUR",
"next_payment_date": None,
"line_items": [],
}
invoice = stripe_service.get_upcoming_invoice(subscription.stripe_customer_id)
if not invoice:
return {
"amount_due_cents": 0,
"currency": "EUR",
"next_payment_date": None,
"line_items": [],
}
line_items = []
if invoice.lines and invoice.lines.data:
for line in invoice.lines.data:
line_items.append({
"description": line.description or "",
"amount_cents": line.amount,
"quantity": line.quantity or 1,
})
return {
"amount_due_cents": invoice.amount_due,
"currency": invoice.currency.upper(),
"next_payment_date": datetime.fromtimestamp(invoice.next_payment_attempt).isoformat()
if invoice.next_payment_attempt
else None,
"line_items": line_items,
}
def change_tier(
self,
db: Session,
vendor_id: int,
new_tier_code: str,
is_annual: bool,
) -> dict:
"""
Change subscription tier (upgrade/downgrade).
Returns:
Dict with message, new_tier, effective_immediately
Raises:
TierNotFoundError: If tier doesn't exist
NoActiveSubscriptionError: If no subscription
StripePriceNotConfiguredError: If price not configured
"""
subscription = subscription_service.get_subscription(db, vendor_id)
if not subscription or not subscription.stripe_subscription_id:
raise NoActiveSubscriptionError()
tier = self.get_tier_by_code(db, new_tier_code)
price_id = (
tier.stripe_price_annual_id
if is_annual and tier.stripe_price_annual_id
else tier.stripe_price_monthly_id
)
if not price_id:
raise StripePriceNotConfiguredError(new_tier_code)
# Update in Stripe
if stripe_service.is_configured:
stripe_service.update_subscription(
subscription_id=subscription.stripe_subscription_id,
new_price_id=price_id,
)
# Update local subscription
old_tier = subscription.tier
subscription.tier = new_tier_code
subscription.tier_id = tier.id
subscription.is_annual = is_annual
subscription.updated_at = datetime.utcnow()
is_upgrade = self._is_upgrade(db, old_tier, new_tier_code)
return {
"message": f"Subscription {'upgraded' if is_upgrade else 'changed'} to {tier.name}",
"new_tier": new_tier_code,
"effective_immediately": True,
}
def _is_upgrade(self, db: Session, old_tier: str, new_tier: str) -> bool:
"""Check if tier change is an upgrade."""
old = db.query(SubscriptionTier).filter(SubscriptionTier.code == old_tier).first()
new = db.query(SubscriptionTier).filter(SubscriptionTier.code == new_tier).first()
if not old or not new:
return False
return new.display_order > old.display_order
def purchase_addon(
self,
db: Session,
vendor_id: int,
addon_code: str,
domain_name: str | None,
quantity: int,
success_url: str,
cancel_url: str,
) -> dict:
"""
Create checkout session for add-on purchase.
Returns:
Dict with checkout_url and session_id
Raises:
PaymentSystemNotConfiguredError: If Stripe not configured
AddonNotFoundError: If addon doesn't exist
"""
if not stripe_service.is_configured:
raise PaymentSystemNotConfiguredError()
addon = (
db.query(AddOnProduct)
.filter(
AddOnProduct.code == addon_code,
AddOnProduct.is_active == True, # noqa: E712
)
.first()
)
if not addon:
raise BillingServiceError(f"Add-on '{addon_code}' not found")
if not addon.stripe_price_id:
raise BillingServiceError(f"Stripe price not configured for add-on '{addon_code}'")
vendor = self.get_vendor(db, vendor_id)
subscription = subscription_service.get_or_create_subscription(db, vendor_id)
# Create checkout session for add-on
session = stripe_service.create_checkout_session(
db=db,
vendor=vendor,
price_id=addon.stripe_price_id,
success_url=success_url,
cancel_url=cancel_url,
quantity=quantity,
metadata={
"addon_code": addon_code,
"domain_name": domain_name or "",
},
)
return {
"checkout_url": session.url,
"session_id": session.id,
}
def cancel_addon(self, db: Session, vendor_id: int, addon_id: int) -> dict:
"""
Cancel a purchased add-on.
Returns:
Dict with message and addon_code
Raises:
BillingServiceError: If addon not found or not owned by vendor
"""
vendor_addon = (
db.query(VendorAddOn)
.filter(
VendorAddOn.id == addon_id,
VendorAddOn.vendor_id == vendor_id,
)
.first()
)
if not vendor_addon:
raise BillingServiceError("Add-on not found")
addon_code = vendor_addon.addon_product.code
# Cancel in Stripe if applicable
if stripe_service.is_configured and vendor_addon.stripe_subscription_item_id:
try:
stripe_service.cancel_subscription_item(vendor_addon.stripe_subscription_item_id)
except Exception as e:
logger.warning(f"Failed to cancel addon in Stripe: {e}")
# Mark as cancelled
vendor_addon.status = "cancelled"
vendor_addon.cancelled_at = datetime.utcnow()
return {
"message": "Add-on cancelled successfully",
"addon_code": addon_code,
}
# Create service instance
billing_service = BillingService()

View File

@@ -0,0 +1,321 @@
# app/services/capacity_forecast_service.py
"""
Capacity forecasting service for growth trends and scaling recommendations.
Provides:
- Historical capacity trend analysis
- Growth rate calculations
- Days-until-threshold projections
- Scaling recommendations based on growth patterns
"""
import logging
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from sqlalchemy import func
from sqlalchemy.orm import Session
from models.database.product import Product
from models.database.subscription import (
CapacitySnapshot,
SubscriptionStatus,
VendorSubscription,
)
from models.database.vendor import Vendor, VendorUser
logger = logging.getLogger(__name__)
# Scaling thresholds based on capacity-planning.md
INFRASTRUCTURE_SCALING = [
{"name": "Starter", "max_vendors": 50, "max_products": 10_000, "cost_monthly": 30},
{"name": "Small", "max_vendors": 100, "max_products": 30_000, "cost_monthly": 80},
{"name": "Medium", "max_vendors": 300, "max_products": 100_000, "cost_monthly": 150},
{"name": "Large", "max_vendors": 500, "max_products": 250_000, "cost_monthly": 350},
{"name": "Scale", "max_vendors": 1000, "max_products": 500_000, "cost_monthly": 700},
{"name": "Enterprise", "max_vendors": None, "max_products": None, "cost_monthly": 1500},
]
class CapacityForecastService:
"""Service for capacity forecasting and trend analysis."""
def capture_daily_snapshot(self, db: Session) -> CapacitySnapshot:
"""
Capture a daily snapshot of platform capacity metrics.
Should be called by a daily background job.
"""
from app.services.image_service import image_service
from app.services.platform_health_service import platform_health_service
now = datetime.now(UTC)
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Check if snapshot already exists for today
existing = (
db.query(CapacitySnapshot)
.filter(CapacitySnapshot.snapshot_date == today)
.first()
)
if existing:
logger.info(f"Snapshot already exists for {today}")
return existing
# Gather metrics
total_vendors = db.query(func.count(Vendor.id)).scalar() or 0
active_vendors = (
db.query(func.count(Vendor.id))
.filter(Vendor.is_active == True) # noqa: E712
.scalar()
or 0
)
# Subscription metrics
total_subs = db.query(func.count(VendorSubscription.id)).scalar() or 0
active_subs = (
db.query(func.count(VendorSubscription.id))
.filter(VendorSubscription.status.in_(["active", "trial"]))
.scalar()
or 0
)
trial_vendors = (
db.query(func.count(VendorSubscription.id))
.filter(VendorSubscription.status == SubscriptionStatus.TRIAL.value)
.scalar()
or 0
)
# Resource metrics
total_products = db.query(func.count(Product.id)).scalar() or 0
total_team = (
db.query(func.count(VendorUser.id))
.filter(VendorUser.is_active == True) # noqa: E712
.scalar()
or 0
)
# Orders this month
start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
total_orders = sum(
s.orders_this_period
for s in db.query(VendorSubscription).all()
)
# Storage metrics
try:
image_stats = image_service.get_storage_stats()
storage_gb = image_stats.get("total_size_gb", 0)
except Exception:
storage_gb = 0
try:
db_size = platform_health_service._get_database_size(db)
except Exception:
db_size = 0
# Theoretical capacity from subscriptions
capacity = platform_health_service.get_subscription_capacity(db)
theoretical_products = capacity["products"].get("theoretical_limit", 0)
theoretical_orders = capacity["orders_monthly"].get("theoretical_limit", 0)
theoretical_team = capacity["team_members"].get("theoretical_limit", 0)
# Tier distribution
tier_distribution = capacity.get("tier_distribution", {})
# Create snapshot
snapshot = CapacitySnapshot(
snapshot_date=today,
total_vendors=total_vendors,
active_vendors=active_vendors,
trial_vendors=trial_vendors,
total_subscriptions=total_subs,
active_subscriptions=active_subs,
total_products=total_products,
total_orders_month=total_orders,
total_team_members=total_team,
storage_used_gb=Decimal(str(storage_gb)),
db_size_mb=Decimal(str(db_size)),
theoretical_products_limit=theoretical_products,
theoretical_orders_limit=theoretical_orders,
theoretical_team_limit=theoretical_team,
tier_distribution=tier_distribution,
)
db.add(snapshot)
db.flush()
db.refresh(snapshot)
logger.info(f"Captured capacity snapshot for {today}")
return snapshot
def get_growth_trends(self, db: Session, days: int = 30) -> dict:
"""
Calculate growth trends over the specified period.
Returns growth rates and projections for key metrics.
"""
now = datetime.now(UTC)
start_date = now - timedelta(days=days)
# Get snapshots for the period
snapshots = (
db.query(CapacitySnapshot)
.filter(CapacitySnapshot.snapshot_date >= start_date)
.order_by(CapacitySnapshot.snapshot_date)
.all()
)
if len(snapshots) < 2:
return {
"period_days": days,
"snapshots_available": len(snapshots),
"trends": {},
"message": "Insufficient data for trend analysis",
}
first = snapshots[0]
last = snapshots[-1]
period_days = (last.snapshot_date - first.snapshot_date).days or 1
def calc_growth(metric: str) -> dict:
start_val = getattr(first, metric) or 0
end_val = getattr(last, metric) or 0
change = end_val - start_val
if start_val > 0:
growth_rate = (change / start_val) * 100
daily_rate = growth_rate / period_days
monthly_rate = daily_rate * 30
else:
growth_rate = 0 if end_val == 0 else 100
daily_rate = 0
monthly_rate = 0
return {
"start_value": start_val,
"current_value": end_val,
"change": change,
"growth_rate_percent": round(growth_rate, 2),
"daily_growth_rate": round(daily_rate, 3),
"monthly_projection": round(end_val * (1 + monthly_rate / 100), 0),
}
trends = {
"vendors": calc_growth("active_vendors"),
"products": calc_growth("total_products"),
"orders": calc_growth("total_orders_month"),
"team_members": calc_growth("total_team_members"),
"storage_gb": {
"start_value": float(first.storage_used_gb or 0),
"current_value": float(last.storage_used_gb or 0),
"change": float((last.storage_used_gb or 0) - (first.storage_used_gb or 0)),
},
}
return {
"period_days": period_days,
"snapshots_available": len(snapshots),
"start_date": first.snapshot_date.isoformat(),
"end_date": last.snapshot_date.isoformat(),
"trends": trends,
}
def get_scaling_recommendations(self, db: Session) -> list[dict]:
"""
Generate scaling recommendations based on current capacity and growth.
Returns prioritized list of recommendations.
"""
from app.services.platform_health_service import platform_health_service
recommendations = []
# Get current capacity
capacity = platform_health_service.get_subscription_capacity(db)
health = platform_health_service.get_full_health_report(db)
trends = self.get_growth_trends(db, days=30)
# Check product capacity
products = capacity["products"]
if products.get("utilization_percent") and products["utilization_percent"] > 80:
recommendations.append({
"category": "capacity",
"severity": "warning",
"title": "Product capacity approaching limit",
"description": f"Currently at {products['utilization_percent']:.0f}% of theoretical product capacity",
"action": "Consider upgrading vendor tiers or adding capacity",
})
# Check infrastructure tier
current_tier = health.get("infrastructure_tier", {})
next_trigger = health.get("next_tier_trigger")
if next_trigger:
recommendations.append({
"category": "infrastructure",
"severity": "info",
"title": f"Current tier: {current_tier.get('name', 'Unknown')}",
"description": f"Next upgrade trigger: {next_trigger}",
"action": "Monitor growth and plan for infrastructure scaling",
})
# Check growth rate
if trends.get("trends"):
vendor_growth = trends["trends"].get("vendors", {})
if vendor_growth.get("monthly_projection", 0) > 0:
monthly_rate = vendor_growth.get("growth_rate_percent", 0)
if monthly_rate > 20:
recommendations.append({
"category": "growth",
"severity": "info",
"title": "High vendor growth rate",
"description": f"Vendor base growing at {monthly_rate:.1f}% over last 30 days",
"action": "Ensure infrastructure can scale to meet demand",
})
# Check storage
storage_percent = health.get("image_storage", {}).get("total_size_gb", 0)
if storage_percent > 800: # 80% of 1TB
recommendations.append({
"category": "storage",
"severity": "warning",
"title": "Storage usage high",
"description": f"Image storage at {storage_percent:.1f} GB",
"action": "Plan for storage expansion or implement cleanup policies",
})
# Sort by severity
severity_order = {"critical": 0, "warning": 1, "info": 2}
recommendations.sort(key=lambda r: severity_order.get(r["severity"], 3))
return recommendations
def get_days_until_threshold(
self, db: Session, metric: str, threshold: int
) -> int | None:
"""
Calculate days until a metric reaches a threshold based on current growth.
Returns None if insufficient data or no growth.
"""
trends = self.get_growth_trends(db, days=30)
if not trends.get("trends") or metric not in trends["trends"]:
return None
metric_data = trends["trends"][metric]
current = metric_data.get("current_value", 0)
daily_rate = metric_data.get("daily_growth_rate", 0)
if daily_rate <= 0 or current >= threshold:
return None
remaining = threshold - current
days = remaining / (current * daily_rate / 100) if current > 0 else None
return int(days) if days else None
# Singleton instance
capacity_forecast_service = CapacityForecastService()

View File

@@ -166,6 +166,101 @@ class PlatformHealthService:
"active_vendors": active_vendors,
}
def get_subscription_capacity(self, db: Session) -> dict:
"""
Calculate theoretical capacity based on all vendor subscriptions.
Returns aggregated limits and current usage for capacity planning.
"""
from models.database.subscription import VendorSubscription
from models.database.vendor import VendorUser
# Get all active subscriptions with their limits
subscriptions = (
db.query(VendorSubscription)
.filter(VendorSubscription.status.in_(["active", "trial"]))
.all()
)
# Aggregate theoretical limits
total_products_limit = 0
total_orders_limit = 0
total_team_limit = 0
unlimited_products = 0
unlimited_orders = 0
unlimited_team = 0
tier_distribution = {}
for sub in subscriptions:
# Track tier distribution
tier = sub.tier or "unknown"
tier_distribution[tier] = tier_distribution.get(tier, 0) + 1
# Aggregate limits
if sub.products_limit is None:
unlimited_products += 1
else:
total_products_limit += sub.products_limit
if sub.orders_limit is None:
unlimited_orders += 1
else:
total_orders_limit += sub.orders_limit
if sub.team_members_limit is None:
unlimited_team += 1
else:
total_team_limit += sub.team_members_limit
# Get actual usage
actual_products = db.query(func.count(Product.id)).scalar() or 0
actual_team = (
db.query(func.count(VendorUser.id))
.filter(VendorUser.is_active == True) # noqa: E712
.scalar()
or 0
)
# Orders this period (aggregate across all subscriptions)
total_orders_used = sum(s.orders_this_period for s in subscriptions)
def calc_utilization(actual: int, limit: int, unlimited: int) -> dict:
if unlimited > 0:
# Some subscriptions have unlimited - can't calculate true %
return {
"actual": actual,
"theoretical_limit": limit,
"unlimited_count": unlimited,
"utilization_percent": None,
"has_unlimited": True,
}
elif limit > 0:
return {
"actual": actual,
"theoretical_limit": limit,
"unlimited_count": 0,
"utilization_percent": round((actual / limit) * 100, 1),
"headroom": limit - actual,
"has_unlimited": False,
}
else:
return {
"actual": actual,
"theoretical_limit": 0,
"unlimited_count": 0,
"utilization_percent": 0,
"has_unlimited": False,
}
return {
"total_subscriptions": len(subscriptions),
"tier_distribution": tier_distribution,
"products": calc_utilization(actual_products, total_products_limit, unlimited_products),
"orders_monthly": calc_utilization(total_orders_used, total_orders_limit, unlimited_orders),
"team_members": calc_utilization(actual_team, total_team_limit, unlimited_team),
}
def get_full_health_report(self, db: Session) -> dict:
"""Get comprehensive platform health report."""
# System metrics
@@ -177,6 +272,9 @@ class PlatformHealthService:
# Image storage metrics
image_storage = self.get_image_storage_metrics()
# Subscription capacity
subscription_capacity = self.get_subscription_capacity(db)
# Calculate thresholds
thresholds = self._calculate_thresholds(system, database, image_storage)
@@ -197,6 +295,7 @@ class PlatformHealthService:
"system": system,
"database": database,
"image_storage": image_storage,
"subscription_capacity": subscription_capacity,
"thresholds": thresholds,
"recommendations": recommendations,
"infrastructure_tier": tier,

View File

@@ -251,6 +251,19 @@ class StripeService:
logger.info(f"Reactivated Stripe subscription {subscription_id}")
return subscription
def cancel_subscription_item(self, subscription_item_id: str) -> None:
"""
Cancel a subscription item (used for add-ons).
Args:
subscription_item_id: Stripe subscription item ID
"""
if not self.is_configured:
raise ValueError("Stripe is not configured")
stripe.SubscriptionItem.delete(subscription_item_id)
logger.info(f"Cancelled Stripe subscription item {subscription_item_id}")
# =========================================================================
# Checkout & Portal
# =========================================================================
@@ -263,6 +276,8 @@ class StripeService:
success_url: str,
cancel_url: str,
trial_days: int | None = None,
quantity: int = 1,
metadata: dict | None = None,
) -> stripe.checkout.Session:
"""
Create a Stripe Checkout session for subscription signup.
@@ -274,6 +289,8 @@ class StripeService:
success_url: URL to redirect on success
cancel_url: URL to redirect on cancel
trial_days: Optional trial period
quantity: Number of items (default 1)
metadata: Additional metadata to store
Returns:
Stripe Checkout Session object
@@ -311,16 +328,21 @@ class StripeService:
subscription.stripe_customer_id = customer_id
db.flush()
# Build metadata
session_metadata = {
"vendor_id": str(vendor.id),
"vendor_code": vendor.vendor_code,
}
if metadata:
session_metadata.update(metadata)
session_data = {
"customer": customer_id,
"line_items": [{"price": price_id, "quantity": 1}],
"line_items": [{"price": price_id, "quantity": quantity}],
"mode": "subscription",
"success_url": success_url,
"cancel_url": cancel_url,
"metadata": {
"vendor_id": str(vendor.id),
"vendor_code": vendor.vendor_code,
},
"metadata": session_metadata,
}
if trial_days:

View File

@@ -40,6 +40,7 @@ from models.schema.subscription import (
SubscriptionUsage,
TierInfo,
TierLimits,
UsageSummary,
)
logger = logging.getLogger(__name__)
@@ -79,8 +80,35 @@ class SubscriptionService:
# Tier Information
# =========================================================================
def get_tier_info(self, tier_code: str) -> TierInfo:
"""Get full tier information."""
def get_tier_info(self, tier_code: str, db: Session | None = None) -> TierInfo:
"""
Get full tier information.
Queries database if db session provided, otherwise falls back to TIER_LIMITS.
"""
# Try database first if session provided
if db is not None:
db_tier = self.get_tier_by_code(db, tier_code)
if db_tier:
return TierInfo(
code=db_tier.code,
name=db_tier.name,
price_monthly_cents=db_tier.price_monthly_cents,
price_annual_cents=db_tier.price_annual_cents,
limits=TierLimits(
orders_per_month=db_tier.orders_per_month,
products_limit=db_tier.products_limit,
team_members=db_tier.team_members,
order_history_months=db_tier.order_history_months,
),
features=db_tier.features or [],
)
# Fallback to hardcoded TIER_LIMITS
return self._get_tier_from_legacy(tier_code)
def _get_tier_from_legacy(self, tier_code: str) -> TierInfo:
"""Get tier info from hardcoded TIER_LIMITS (fallback)."""
try:
tier = TierCode(tier_code)
except ValueError:
@@ -101,10 +129,43 @@ class SubscriptionService:
features=limits.get("features", []),
)
def get_all_tiers(self) -> list[TierInfo]:
"""Get information for all tiers."""
def get_all_tiers(self, db: Session | None = None) -> list[TierInfo]:
"""
Get information for all tiers.
Queries database if db session provided, otherwise falls back to TIER_LIMITS.
"""
if db is not None:
db_tiers = (
db.query(SubscriptionTier)
.filter(
SubscriptionTier.is_active == True, # noqa: E712
SubscriptionTier.is_public == True, # noqa: E712
)
.order_by(SubscriptionTier.display_order)
.all()
)
if db_tiers:
return [
TierInfo(
code=t.code,
name=t.name,
price_monthly_cents=t.price_monthly_cents,
price_annual_cents=t.price_annual_cents,
limits=TierLimits(
orders_per_month=t.orders_per_month,
products_limit=t.products_limit,
team_members=t.team_members,
order_history_months=t.order_history_months,
),
features=t.features or [],
)
for t in db_tiers
]
# Fallback to hardcoded
return [
self.get_tier_info(tier.value)
self._get_tier_from_legacy(tier.value)
for tier in TierCode
]
@@ -363,6 +424,47 @@ class SubscriptionService:
team_members_percent_used=calc_percent(team_count, team_limit),
)
def get_usage_summary(self, db: Session, vendor_id: int) -> UsageSummary:
"""Get usage summary for billing page display."""
subscription = self.get_or_create_subscription(db, vendor_id)
# Get actual counts
products_count = (
db.query(func.count(Product.id))
.filter(Product.vendor_id == vendor_id)
.scalar()
or 0
)
team_count = (
db.query(func.count(VendorUser.id))
.filter(VendorUser.vendor_id == vendor_id, VendorUser.is_active == True)
.scalar()
or 0
)
# Get limits
orders_limit = subscription.orders_limit
products_limit = subscription.products_limit
team_limit = subscription.team_members_limit
def calc_remaining(current: int, limit: int | None) -> int | None:
if limit is None:
return None
return max(0, limit - current)
return UsageSummary(
orders_this_period=subscription.orders_this_period,
orders_limit=orders_limit,
orders_remaining=calc_remaining(subscription.orders_this_period, orders_limit),
products_count=products_count,
products_limit=products_limit,
products_remaining=calc_remaining(products_count, products_limit),
team_count=team_count,
team_limit=team_limit,
team_remaining=calc_remaining(team_count, team_limit),
)
def increment_order_count(self, db: Session, vendor_id: int) -> None:
"""
Increment the order counter for the current period.