Files
orion/app/modules/hosting/services/stats_service.py
Samir Boulahtit 8b147f53c6
Some checks failed
CI / pytest (push) Failing after 49m20s
CI / validate (push) Successful in 24s
CI / dependency-scanning (push) Successful in 33s
CI / docs (push) Has been skipped
CI / deploy (push) Has been skipped
CI / ruff (push) Successful in 10s
feat(hosting): add HostWizard platform module and fix migration chain
- Add complete hosting module (models, routes, schemas, services, templates, migrations)
- Add HostWizard platform to init_production seed (code=hosting, domain=hostwizard.lu)
- Fix cms_002 migration down_revision to z_unique_subdomain_domain
- Fix prospecting_001 migration to chain after cms_002 (remove branch label)
- Add hosting/prospecting version_locations to alembic.ini
- Fix admin_services delete endpoint to use proper response model
- Add hostwizard.lu to deployment docs (DNS, Caddy, Cloudflare)
- Add hosting and prospecting user journey docs to mkdocs nav

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 19:34:56 +01:00

102 lines
3.0 KiB
Python

# app/modules/hosting/services/stats_service.py
"""
Statistics service for the hosting dashboard.
"""
import logging
from datetime import UTC, datetime, timedelta
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.modules.hosting.models import (
ClientService,
ClientServiceStatus,
HostedSite,
)
logger = logging.getLogger(__name__)
class StatsService:
"""Service for dashboard statistics and reporting."""
def get_dashboard_stats(self, db: Session) -> dict:
"""Get overview statistics for the hosting dashboard."""
total = db.query(func.count(HostedSite.id)).scalar() or 0
# Sites by status
status_results = (
db.query(HostedSite.status, func.count(HostedSite.id))
.group_by(HostedSite.status)
.all()
)
sites_by_status = {
status.value if hasattr(status, "value") else str(status): count
for status, count in status_results
}
live_count = sites_by_status.get("live", 0)
poc_count = (
sites_by_status.get("draft", 0)
+ sites_by_status.get("poc_ready", 0)
+ sites_by_status.get("proposal_sent", 0)
)
# Active services
active_services = (
db.query(func.count(ClientService.id))
.filter(ClientService.status == ClientServiceStatus.ACTIVE)
.scalar()
or 0
)
# Monthly revenue (sum of price_cents for active services with monthly billing)
monthly_revenue = (
db.query(func.sum(ClientService.price_cents))
.filter(
ClientService.status == ClientServiceStatus.ACTIVE,
)
.scalar()
or 0
)
# Upcoming renewals (next 30 days)
cutoff = datetime.now(UTC) + timedelta(days=30)
upcoming_renewals = (
db.query(func.count(ClientService.id))
.filter(
ClientService.expires_at.isnot(None),
ClientService.expires_at <= cutoff,
ClientService.status == ClientServiceStatus.ACTIVE,
)
.scalar()
or 0
)
# Services by type
type_results = (
db.query(ClientService.service_type, func.count(ClientService.id))
.filter(ClientService.status == ClientServiceStatus.ACTIVE)
.group_by(ClientService.service_type)
.all()
)
services_by_type = {
stype.value if hasattr(stype, "value") else str(stype): count
for stype, count in type_results
}
return {
"total_sites": total,
"live_sites": live_count,
"poc_sites": poc_count,
"sites_by_status": sites_by_status,
"active_services": active_services,
"monthly_revenue_cents": monthly_revenue,
"upcoming_renewals": upcoming_renewals,
"services_by_type": services_by_type,
}
stats_service = StatsService()