Some checks failed
Resolves the billing (core) → monitoring (optional) architecture violation by moving CapacityForecastService to the monitoring module where it belongs. - Create BillingMetricsProvider to expose subscription counts via stats_aggregator - Move CapacitySnapshot model from billing to monitoring - Replace direct MerchantSubscription queries with stats_aggregator calls - Fix middleware test mocks to cover StoreDomain/MerchantDomain fallback chains Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
48 lines
1.3 KiB
Python
48 lines
1.3 KiB
Python
# app/modules/monitoring/tasks/capacity.py
|
|
"""
|
|
Celery tasks for capacity monitoring and forecasting.
|
|
|
|
Captures daily snapshots of platform capacity metrics for trend analysis.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from app.core.celery_config import celery_app
|
|
from app.modules.task_base import ModuleTask
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task(
|
|
bind=True,
|
|
base=ModuleTask,
|
|
name="app.modules.monitoring.tasks.capacity.capture_capacity_snapshot",
|
|
)
|
|
def capture_capacity_snapshot(self):
|
|
"""
|
|
Capture a daily snapshot of platform capacity metrics.
|
|
|
|
Runs daily at midnight via Celery beat.
|
|
|
|
Returns:
|
|
dict: Snapshot summary with store and product counts.
|
|
"""
|
|
from app.modules.monitoring.services.capacity_forecast_service import (
|
|
capacity_forecast_service,
|
|
)
|
|
|
|
with self.get_db() as db:
|
|
snapshot = capacity_forecast_service.capture_daily_snapshot(db)
|
|
|
|
logger.info(
|
|
f"Captured capacity snapshot: {snapshot.total_stores} stores, "
|
|
f"{snapshot.total_products} products"
|
|
)
|
|
|
|
return {
|
|
"snapshot_id": snapshot.id,
|
|
"snapshot_date": snapshot.snapshot_date.isoformat(),
|
|
"total_stores": snapshot.total_stores,
|
|
"total_products": snapshot.total_products,
|
|
}
|