refactor: move capacity_forecast_service from billing to monitoring
Some checks failed
CI / ruff (push) Failing after 8s
CI / pytest (push) Successful in 36m5s
CI / architecture (push) Successful in 11s
CI / dependency-scanning (push) Successful in 27s
CI / docs (push) Has been skipped
CI / audit (push) Successful in 8s

Resolves the billing (core) → monitoring (optional) architecture violation
by moving CapacityForecastService to the monitoring module where it belongs.

- Create BillingMetricsProvider to expose subscription counts via stats_aggregator
- Move CapacitySnapshot model from billing to monitoring
- Replace direct MerchantSubscription queries with stats_aggregator calls
- Fix middleware test mocks to cover StoreDomain/MerchantDomain fallback chains

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-13 20:58:22 +01:00
parent 7c43d6f4a2
commit 9c27fa02b0
16 changed files with 234 additions and 102 deletions

View File

@@ -89,6 +89,13 @@ def _get_store_router():
return store_router
def _get_metrics_provider():
"""Lazy import of metrics provider to avoid circular imports."""
from app.modules.billing.services.billing_metrics import billing_metrics_provider
return billing_metrics_provider
def _get_feature_provider():
"""Lazy import of feature provider to avoid circular imports."""
from app.modules.billing.services.billing_features import billing_feature_provider
@@ -271,6 +278,8 @@ billing_module = ModuleDefinition(
],
# Feature provider for feature flags
feature_provider=_get_feature_provider,
# Metrics provider for subscription metrics
metrics_provider=_get_metrics_provider,
)

View File

@@ -22,7 +22,6 @@ from app.modules.billing.models.subscription import (
AddOnProduct,
BillingHistory,
BillingPeriod,
CapacitySnapshot,
StoreAddOn,
StripeWebhookEvent,
SubscriptionStatus,
@@ -46,7 +45,6 @@ __all__ = [
"StoreAddOn",
"StripeWebhookEvent",
"BillingHistory",
"CapacitySnapshot",
# Merchant Subscription
"MerchantSubscription",
# Feature Limits

View File

@@ -345,61 +345,3 @@ class BillingHistory(Base, TimestampMixin):
def __repr__(self):
return f"<BillingHistory(store_id={self.store_id}, invoice='{self.invoice_number}', status='{self.status}')>"
# ============================================================================
# Capacity Planning
# ============================================================================
class CapacitySnapshot(Base, TimestampMixin):
"""
Daily snapshot of platform capacity metrics.
Used for growth trending and capacity forecasting.
Captured daily by background job.
"""
__tablename__ = "capacity_snapshots"
id = Column(Integer, primary_key=True, index=True)
snapshot_date = Column(DateTime(timezone=True), nullable=False, unique=True, index=True)
# Store metrics
total_stores = Column(Integer, default=0, nullable=False)
active_stores = Column(Integer, default=0, nullable=False)
trial_stores = Column(Integer, default=0, nullable=False)
# Subscription metrics
total_subscriptions = Column(Integer, default=0, nullable=False)
active_subscriptions = Column(Integer, default=0, nullable=False)
# Resource metrics
total_products = Column(Integer, default=0, nullable=False)
total_orders_month = Column(Integer, default=0, nullable=False)
total_team_members = Column(Integer, default=0, nullable=False)
# Storage metrics
storage_used_gb = Column(Numeric(10, 2), default=0, nullable=False)
db_size_mb = Column(Numeric(10, 2), default=0, nullable=False)
# Capacity metrics (theoretical limits from subscriptions)
theoretical_products_limit = Column(Integer, nullable=True)
theoretical_orders_limit = Column(Integer, nullable=True)
theoretical_team_limit = Column(Integer, nullable=True)
# Tier distribution (JSON: {"essential": 10, "professional": 5, ...})
tier_distribution = Column(JSON, nullable=True)
# Performance metrics
avg_response_ms = Column(Integer, nullable=True)
peak_cpu_percent = Column(Numeric(5, 2), nullable=True)
peak_memory_percent = Column(Numeric(5, 2), nullable=True)
# Indexes
__table_args__ = (
Index("ix_capacity_snapshots_date", "snapshot_date"),
)
def __repr__(self) -> str:
return f"<CapacitySnapshot(date={self.snapshot_date}, stores={self.total_stores})>"

View File

@@ -21,10 +21,6 @@ from app.modules.billing.services.billing_service import (
BillingService,
billing_service,
)
from app.modules.billing.services.capacity_forecast_service import (
CapacityForecastService,
capacity_forecast_service,
)
from app.modules.billing.services.feature_service import (
FeatureService,
feature_service,
@@ -68,8 +64,6 @@ __all__ = [
"SubscriptionNotCancelledError",
"FeatureService",
"feature_service",
"CapacityForecastService",
"capacity_forecast_service",
"PlatformPricingService",
"platform_pricing_service",
"UsageService",

View File

@@ -0,0 +1,115 @@
# app/modules/billing/services/billing_metrics.py
"""
Metrics provider for the billing module.
Provides metrics for:
- Subscription counts (total, active, trial)
"""
import logging
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.modules.contracts.metrics import (
MetricsContext,
MetricValue,
)
logger = logging.getLogger(__name__)
class BillingMetricsProvider:
"""
Metrics provider for billing module.
Provides subscription metrics at the platform level.
"""
@property
def metrics_category(self) -> str:
return "billing"
def get_store_metrics(
self,
db: Session,
store_id: int,
context: MetricsContext | None = None,
) -> list[MetricValue]:
"""
Get metrics for a specific store.
Subscriptions are merchant-level, not store-level, so no store metrics.
"""
return []
def get_platform_metrics(
self,
db: Session,
platform_id: int,
context: MetricsContext | None = None,
) -> list[MetricValue]:
"""
Get subscription metrics aggregated for a platform.
Provides:
- Total subscriptions
- Active subscriptions (active + trial)
- Trial subscriptions
"""
from app.modules.billing.models import MerchantSubscription, SubscriptionStatus
try:
total_subs = (
db.query(func.count(MerchantSubscription.id)).scalar() or 0
)
active_subs = (
db.query(func.count(MerchantSubscription.id))
.filter(MerchantSubscription.status.in_(["active", "trial"]))
.scalar()
or 0
)
trial_subs = (
db.query(func.count(MerchantSubscription.id))
.filter(MerchantSubscription.status == SubscriptionStatus.TRIAL.value)
.scalar()
or 0
)
return [
MetricValue(
key="billing.total_subscriptions",
value=total_subs,
label="Total Subscriptions",
category="billing",
icon="credit-card",
description="Total number of merchant subscriptions",
),
MetricValue(
key="billing.active_subscriptions",
value=active_subs,
label="Active Subscriptions",
category="billing",
icon="check-circle",
description="Subscriptions with active or trial status",
),
MetricValue(
key="billing.trial_subscriptions",
value=trial_subs,
label="Trial Subscriptions",
category="billing",
icon="clock",
description="Subscriptions currently in trial period",
),
]
except Exception as e:
logger.warning(f"Failed to get billing platform metrics: {e}")
return []
# Singleton instance
billing_metrics_provider = BillingMetricsProvider()
__all__ = ["BillingMetricsProvider", "billing_metrics_provider"]

View File

@@ -1,331 +0,0 @@
# app/modules/billing/services/capacity_forecast_service.py
"""
Capacity forecasting service for growth trends and scaling recommendations.
Provides:
- Historical capacity trend analysis
- Growth rate calculations
- Days-until-threshold projections
- Scaling recommendations based on growth patterns
"""
import logging
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.modules.billing.models import (
CapacitySnapshot,
MerchantSubscription,
SubscriptionStatus,
)
from app.modules.contracts.metrics import MetricsContext
from app.modules.core.services.stats_aggregator import stats_aggregator
from app.modules.tenancy.models import Platform, Store, StoreUser
logger = logging.getLogger(__name__)
# Scaling thresholds based on capacity-planning.md
INFRASTRUCTURE_SCALING = [
{"name": "Starter", "max_stores": 50, "max_products": 10_000, "cost_monthly": 30},
{"name": "Small", "max_stores": 100, "max_products": 30_000, "cost_monthly": 80},
{"name": "Medium", "max_stores": 300, "max_products": 100_000, "cost_monthly": 150},
{"name": "Large", "max_stores": 500, "max_products": 250_000, "cost_monthly": 350},
{"name": "Scale", "max_stores": 1000, "max_products": 500_000, "cost_monthly": 700},
{"name": "Enterprise", "max_stores": None, "max_products": None, "cost_monthly": 1500},
]
class CapacityForecastService:
"""Service for capacity forecasting and trend analysis."""
def capture_daily_snapshot(self, db: Session) -> CapacitySnapshot:
"""
Capture a daily snapshot of platform capacity metrics.
Should be called by a daily background job.
"""
from app.modules.cms.services.media_service import media_service
from app.modules.monitoring.services.platform_health_service import (
platform_health_service,
)
now = datetime.now(UTC)
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Check if snapshot already exists for today
existing = (
db.query(CapacitySnapshot)
.filter(CapacitySnapshot.snapshot_date == today)
.first()
)
if existing:
logger.info(f"Snapshot already exists for {today}")
return existing
# Gather metrics
total_stores = db.query(func.count(Store.id)).scalar() or 0
active_stores = (
db.query(func.count(Store.id))
.filter(Store.is_active == True) # noqa: E712
.scalar()
or 0
)
# Subscription metrics
total_subs = db.query(func.count(MerchantSubscription.id)).scalar() or 0
active_subs = (
db.query(func.count(MerchantSubscription.id))
.filter(MerchantSubscription.status.in_(["active", "trial"]))
.scalar()
or 0
)
trial_stores = (
db.query(func.count(MerchantSubscription.id))
.filter(MerchantSubscription.status == SubscriptionStatus.TRIAL.value)
.scalar()
or 0
)
# Resource metrics via provider pattern (avoids direct catalog/orders imports)
start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
platform = db.query(Platform).first()
platform_id = platform.id if platform else 1
stats = stats_aggregator.get_admin_stats_flat(
db, platform_id,
context=MetricsContext(date_from=start_of_month),
)
total_products = stats.get("catalog.total_products", 0)
total_team = (
db.query(func.count(StoreUser.id))
.filter(StoreUser.is_active == True) # noqa: E712
.scalar()
or 0
)
# Orders this month (from stats aggregator)
total_orders = stats.get("orders.in_period", 0)
# Storage metrics
try:
image_stats = media_service.get_storage_stats(db)
storage_gb = image_stats.get("total_size_gb", 0)
except Exception:
storage_gb = 0
try:
db_size = platform_health_service._get_database_size(db)
except Exception:
db_size = 0
# Theoretical capacity from subscriptions
capacity = platform_health_service.get_subscription_capacity(db)
theoretical_products = capacity["products"].get("theoretical_limit", 0)
theoretical_orders = capacity["orders_monthly"].get("theoretical_limit", 0)
theoretical_team = capacity["team_members"].get("theoretical_limit", 0)
# Tier distribution
tier_distribution = capacity.get("tier_distribution", {})
# Create snapshot
snapshot = CapacitySnapshot(
snapshot_date=today,
total_stores=total_stores,
active_stores=active_stores,
trial_stores=trial_stores,
total_subscriptions=total_subs,
active_subscriptions=active_subs,
total_products=total_products,
total_orders_month=total_orders,
total_team_members=total_team,
storage_used_gb=Decimal(str(storage_gb)),
db_size_mb=Decimal(str(db_size)),
theoretical_products_limit=theoretical_products,
theoretical_orders_limit=theoretical_orders,
theoretical_team_limit=theoretical_team,
tier_distribution=tier_distribution,
)
db.add(snapshot)
db.flush()
db.refresh(snapshot)
logger.info(f"Captured capacity snapshot for {today}")
return snapshot
def get_growth_trends(self, db: Session, days: int = 30) -> dict:
"""
Calculate growth trends over the specified period.
Returns growth rates and projections for key metrics.
"""
now = datetime.now(UTC)
start_date = now - timedelta(days=days)
# Get snapshots for the period
snapshots = (
db.query(CapacitySnapshot)
.filter(CapacitySnapshot.snapshot_date >= start_date)
.order_by(CapacitySnapshot.snapshot_date)
.all()
)
if len(snapshots) < 2:
return {
"period_days": days,
"snapshots_available": len(snapshots),
"trends": {},
"message": "Insufficient data for trend analysis",
}
first = snapshots[0]
last = snapshots[-1]
period_days = (last.snapshot_date - first.snapshot_date).days or 1
def calc_growth(metric: str) -> dict:
start_val = getattr(first, metric) or 0
end_val = getattr(last, metric) or 0
change = end_val - start_val
if start_val > 0:
growth_rate = (change / start_val) * 100
daily_rate = growth_rate / period_days
monthly_rate = daily_rate * 30
else:
growth_rate = 0 if end_val == 0 else 100
daily_rate = 0
monthly_rate = 0
return {
"start_value": start_val,
"current_value": end_val,
"change": change,
"growth_rate_percent": round(growth_rate, 2),
"daily_growth_rate": round(daily_rate, 3),
"monthly_projection": round(end_val * (1 + monthly_rate / 100), 0),
}
trends = {
"stores": calc_growth("active_stores"),
"products": calc_growth("total_products"),
"orders": calc_growth("total_orders_month"),
"team_members": calc_growth("total_team_members"),
"storage_gb": {
"start_value": float(first.storage_used_gb or 0),
"current_value": float(last.storage_used_gb or 0),
"change": float((last.storage_used_gb or 0) - (first.storage_used_gb or 0)),
},
}
return {
"period_days": period_days,
"snapshots_available": len(snapshots),
"start_date": first.snapshot_date.isoformat(),
"end_date": last.snapshot_date.isoformat(),
"trends": trends,
}
def get_scaling_recommendations(self, db: Session) -> list[dict]:
"""
Generate scaling recommendations based on current capacity and growth.
Returns prioritized list of recommendations.
"""
from app.modules.monitoring.services.platform_health_service import (
platform_health_service,
)
recommendations = []
# Get current capacity
capacity = platform_health_service.get_subscription_capacity(db)
health = platform_health_service.get_full_health_report(db)
trends = self.get_growth_trends(db, days=30)
# Check product capacity
products = capacity["products"]
if products.get("utilization_percent") and products["utilization_percent"] > 80:
recommendations.append({
"category": "capacity",
"severity": "warning",
"title": "Product capacity approaching limit",
"description": f"Currently at {products['utilization_percent']:.0f}% of theoretical product capacity",
"action": "Consider upgrading store tiers or adding capacity",
})
# Check infrastructure tier
current_tier = health.get("infrastructure_tier", {})
next_trigger = health.get("next_tier_trigger")
if next_trigger:
recommendations.append({
"category": "infrastructure",
"severity": "info",
"title": f"Current tier: {current_tier.get('name', 'Unknown')}",
"description": f"Next upgrade trigger: {next_trigger}",
"action": "Monitor growth and plan for infrastructure scaling",
})
# Check growth rate
if trends.get("trends"):
store_growth = trends["trends"].get("stores", {})
if store_growth.get("monthly_projection", 0) > 0:
monthly_rate = store_growth.get("growth_rate_percent", 0)
if monthly_rate > 20:
recommendations.append({
"category": "growth",
"severity": "info",
"title": "High store growth rate",
"description": f"Store base growing at {monthly_rate:.1f}% over last 30 days",
"action": "Ensure infrastructure can scale to meet demand",
})
# Check storage
storage_percent = health.get("image_storage", {}).get("total_size_gb", 0)
if storage_percent > 800: # 80% of 1TB
recommendations.append({
"category": "storage",
"severity": "warning",
"title": "Storage usage high",
"description": f"Image storage at {storage_percent:.1f} GB",
"action": "Plan for storage expansion or implement cleanup policies",
})
# Sort by severity
severity_order = {"critical": 0, "warning": 1, "info": 2}
recommendations.sort(key=lambda r: severity_order.get(r["severity"], 3))
return recommendations
def get_days_until_threshold(
self, db: Session, metric: str, threshold: int
) -> int | None:
"""
Calculate days until a metric reaches a threshold based on current growth.
Returns None if insufficient data or no growth.
"""
trends = self.get_growth_trends(db, days=30)
if not trends.get("trends") or metric not in trends["trends"]:
return None
metric_data = trends["trends"][metric]
current = metric_data.get("current_value", 0)
daily_rate = metric_data.get("daily_growth_rate", 0)
if daily_rate <= 0 or current >= threshold:
return None
remaining = threshold - current
days = remaining / (current * daily_rate / 100) if current > 0 else None
return int(days) if days else None
# Singleton instance
capacity_forecast_service = CapacityForecastService()

View File

@@ -1,334 +0,0 @@
# tests/unit/services/test_capacity_forecast_service.py
"""
Unit tests for CapacityForecastService.
Tests cover:
- Daily snapshot capture
- Growth trend calculation
- Scaling recommendations
- Days until threshold calculation
"""
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from app.modules.billing.models import CapacitySnapshot
from app.modules.billing.services.capacity_forecast_service import (
INFRASTRUCTURE_SCALING,
CapacityForecastService,
capacity_forecast_service,
)
@pytest.mark.unit
@pytest.mark.service
class TestCapacityForecastServiceSnapshot:
"""Test snapshot capture functionality"""
def test_capture_daily_snapshot_returns_existing(self, db):
"""Test capture_daily_snapshot returns existing snapshot for today"""
now = datetime.now(UTC)
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Create existing snapshot
existing = CapacitySnapshot(
snapshot_date=today,
total_stores=10,
active_stores=8,
trial_stores=2,
total_subscriptions=10,
active_subscriptions=8,
total_products=1000,
total_orders_month=500,
total_team_members=20,
storage_used_gb=Decimal("50.0"),
db_size_mb=Decimal("100.0"),
theoretical_products_limit=10000,
theoretical_orders_limit=5000,
theoretical_team_limit=100,
tier_distribution={"starter": 5},
)
db.add(existing)
db.commit()
service = CapacityForecastService()
result = service.capture_daily_snapshot(db)
assert result.id == existing.id
@pytest.mark.unit
@pytest.mark.service
class TestCapacityForecastServiceTrends:
"""Test growth trend functionality"""
def test_get_growth_trends_insufficient_data(self, db):
"""Test get_growth_trends returns message when insufficient data"""
service = CapacityForecastService()
result = service.get_growth_trends(db, days=30)
assert result["snapshots_available"] < 2
assert "Insufficient data" in result.get("message", "")
def test_get_growth_trends_with_data(self, db):
"""Test get_growth_trends calculates trends correctly"""
now = datetime.now(UTC)
# Create two snapshots
snapshot1 = CapacitySnapshot(
snapshot_date=now - timedelta(days=30),
total_stores=10,
active_stores=8,
trial_stores=2,
total_subscriptions=10,
active_subscriptions=8,
total_products=1000,
total_orders_month=500,
total_team_members=20,
storage_used_gb=Decimal("50.0"),
db_size_mb=Decimal("100.0"),
theoretical_products_limit=10000,
theoretical_orders_limit=5000,
theoretical_team_limit=100,
tier_distribution={"starter": 5},
)
snapshot2 = CapacitySnapshot(
snapshot_date=now.replace(hour=0, minute=0, second=0, microsecond=0),
total_stores=15,
active_stores=12,
trial_stores=3,
total_subscriptions=15,
active_subscriptions=12,
total_products=1500,
total_orders_month=750,
total_team_members=30,
storage_used_gb=Decimal("75.0"),
db_size_mb=Decimal("150.0"),
theoretical_products_limit=15000,
theoretical_orders_limit=7500,
theoretical_team_limit=150,
tier_distribution={"starter": 8, "professional": 4},
)
db.add(snapshot1)
db.add(snapshot2)
db.commit()
service = CapacityForecastService()
result = service.get_growth_trends(db, days=60)
assert result["snapshots_available"] >= 2
assert "trends" in result
assert "stores" in result["trends"]
assert result["trends"]["stores"]["start_value"] == 8
assert result["trends"]["stores"]["current_value"] == 12
def test_get_growth_trends_zero_start_value(self, db):
"""Test get_growth_trends handles zero start value"""
now = datetime.now(UTC)
# Create snapshots with zero start value
snapshot1 = CapacitySnapshot(
snapshot_date=now - timedelta(days=30),
total_stores=0,
active_stores=0,
trial_stores=0,
total_subscriptions=0,
active_subscriptions=0,
total_products=0,
total_orders_month=0,
total_team_members=0,
storage_used_gb=Decimal("0"),
db_size_mb=Decimal("0"),
theoretical_products_limit=0,
theoretical_orders_limit=0,
theoretical_team_limit=0,
tier_distribution={},
)
snapshot2 = CapacitySnapshot(
snapshot_date=now.replace(hour=0, minute=0, second=0, microsecond=0),
total_stores=10,
active_stores=8,
trial_stores=2,
total_subscriptions=10,
active_subscriptions=8,
total_products=1000,
total_orders_month=500,
total_team_members=20,
storage_used_gb=Decimal("50.0"),
db_size_mb=Decimal("100.0"),
theoretical_products_limit=10000,
theoretical_orders_limit=5000,
theoretical_team_limit=100,
tier_distribution={"starter": 5},
)
db.add(snapshot1)
db.add(snapshot2)
db.commit()
service = CapacityForecastService()
result = service.get_growth_trends(db, days=60)
assert result["snapshots_available"] >= 2
# When start is 0 and end is not 0, growth should be 100%
assert result["trends"]["stores"]["growth_rate_percent"] == 100
@pytest.mark.unit
@pytest.mark.service
class TestCapacityForecastServiceRecommendations:
"""Test scaling recommendations functionality"""
def test_get_scaling_recommendations_returns_list(self, db):
"""Test get_scaling_recommendations returns a list"""
service = CapacityForecastService()
try:
result = service.get_scaling_recommendations(db)
assert isinstance(result, list)
except Exception:
# May fail if health service dependencies are not set up
pass
@pytest.mark.unit
@pytest.mark.service
class TestCapacityForecastServiceThreshold:
"""Test days until threshold functionality"""
def test_get_days_until_threshold_insufficient_data(self, db):
"""Test get_days_until_threshold returns None with insufficient data"""
service = CapacityForecastService()
result = service.get_days_until_threshold(db, "stores", 100)
assert result is None
def test_get_days_until_threshold_no_growth(self, db):
"""Test get_days_until_threshold returns None with no growth"""
now = datetime.now(UTC)
# Create two snapshots with no growth
snapshot1 = CapacitySnapshot(
snapshot_date=now - timedelta(days=30),
total_stores=10,
active_stores=10,
trial_stores=0,
total_subscriptions=10,
active_subscriptions=10,
total_products=1000,
total_orders_month=500,
total_team_members=20,
storage_used_gb=Decimal("50.0"),
db_size_mb=Decimal("100.0"),
theoretical_products_limit=10000,
theoretical_orders_limit=5000,
theoretical_team_limit=100,
tier_distribution={},
)
snapshot2 = CapacitySnapshot(
snapshot_date=now.replace(hour=0, minute=0, second=0, microsecond=0),
total_stores=10,
active_stores=10, # Same as before
trial_stores=0,
total_subscriptions=10,
active_subscriptions=10,
total_products=1000,
total_orders_month=500,
total_team_members=20,
storage_used_gb=Decimal("50.0"),
db_size_mb=Decimal("100.0"),
theoretical_products_limit=10000,
theoretical_orders_limit=5000,
theoretical_team_limit=100,
tier_distribution={},
)
db.add(snapshot1)
db.add(snapshot2)
db.commit()
service = CapacityForecastService()
result = service.get_days_until_threshold(db, "stores", 100)
assert result is None
def test_get_days_until_threshold_already_exceeded(self, db):
"""Test get_days_until_threshold returns None when already at threshold"""
now = datetime.now(UTC)
# Create two snapshots where current value exceeds threshold
snapshot1 = CapacitySnapshot(
snapshot_date=now - timedelta(days=30),
total_stores=80,
active_stores=80,
trial_stores=0,
total_subscriptions=80,
active_subscriptions=80,
total_products=8000,
total_orders_month=4000,
total_team_members=160,
storage_used_gb=Decimal("400.0"),
db_size_mb=Decimal("800.0"),
theoretical_products_limit=80000,
theoretical_orders_limit=40000,
theoretical_team_limit=800,
tier_distribution={},
)
snapshot2 = CapacitySnapshot(
snapshot_date=now.replace(hour=0, minute=0, second=0, microsecond=0),
total_stores=120,
active_stores=120, # Already exceeds threshold of 100
trial_stores=0,
total_subscriptions=120,
active_subscriptions=120,
total_products=12000,
total_orders_month=6000,
total_team_members=240,
storage_used_gb=Decimal("600.0"),
db_size_mb=Decimal("1200.0"),
theoretical_products_limit=120000,
theoretical_orders_limit=60000,
theoretical_team_limit=1200,
tier_distribution={},
)
db.add(snapshot1)
db.add(snapshot2)
db.commit()
service = CapacityForecastService()
result = service.get_days_until_threshold(db, "stores", 100)
# Should return None since we're already past the threshold
assert result is None
@pytest.mark.unit
@pytest.mark.service
class TestInfrastructureScaling:
"""Test infrastructure scaling constants"""
def test_infrastructure_scaling_defined(self):
"""Test INFRASTRUCTURE_SCALING is properly defined"""
assert len(INFRASTRUCTURE_SCALING) > 0
# Verify structure
for tier in INFRASTRUCTURE_SCALING:
assert "name" in tier
assert "max_stores" in tier
assert "max_products" in tier
assert "cost_monthly" in tier
def test_infrastructure_scaling_ordered(self):
"""Test INFRASTRUCTURE_SCALING is ordered by size"""
# Cost should increase with each tier
for i in range(1, len(INFRASTRUCTURE_SCALING)):
current = INFRASTRUCTURE_SCALING[i]
previous = INFRASTRUCTURE_SCALING[i - 1]
assert current["cost_monthly"] > previous["cost_monthly"]
@pytest.mark.unit
@pytest.mark.service
class TestCapacityForecastServiceSingleton:
"""Test singleton instance"""
def test_singleton_exists(self):
"""Test capacity_forecast_service singleton exists"""
assert capacity_forecast_service is not None
assert isinstance(capacity_forecast_service, CapacityForecastService)