Files
orion/app/modules/prospecting/services/stats_service.py
Samir Boulahtit 6d6eba75bf
Some checks failed
CI / pytest (push) Failing after 48m31s
CI / docs (push) Has been skipped
CI / deploy (push) Has been skipped
CI / ruff (push) Successful in 11s
CI / validate (push) Successful in 23s
CI / dependency-scanning (push) Successful in 28s
feat(prospecting): add complete prospecting module for lead discovery and scoring
Migrates scanning pipeline from marketing-.lu-domains app into Orion module.
Supports digital (domain scan) and offline (manual capture) lead channels
with enrichment, scoring, campaign management, and interaction tracking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 00:59:47 +01:00

100 lines
3.4 KiB
Python

# app/modules/prospecting/services/stats_service.py
"""
Statistics service for the prospecting dashboard.
"""
import logging
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.modules.prospecting.models import (
Prospect,
ProspectChannel,
ProspectScanJob,
ProspectScore,
)
logger = logging.getLogger(__name__)
class StatsService:
"""Service for dashboard statistics and reporting."""
def get_overview(self, db: Session) -> dict:
"""Get overview statistics for the dashboard."""
total = db.query(func.count(Prospect.id)).scalar() or 0
digital = db.query(func.count(Prospect.id)).filter(Prospect.channel == ProspectChannel.DIGITAL).scalar() or 0
offline = db.query(func.count(Prospect.id)).filter(Prospect.channel == ProspectChannel.OFFLINE).scalar() or 0
with_website = db.query(func.count(Prospect.id)).filter(Prospect.has_website.is_(True)).scalar() or 0
with_https = db.query(func.count(Prospect.id)).filter(Prospect.uses_https.is_(True)).scalar() or 0
scored = db.query(func.count(ProspectScore.id)).scalar() or 0
avg_score = db.query(func.avg(ProspectScore.score)).scalar()
# Leads by tier
tier_results = (
db.query(ProspectScore.lead_tier, func.count(ProspectScore.id))
.group_by(ProspectScore.lead_tier)
.all()
)
leads_by_tier = {tier: count for tier, count in tier_results if tier}
# Common issues (from reason_flags JSON)
# Simplified: count scored prospects per tier
top_priority = leads_by_tier.get("top_priority", 0)
return {
"total_prospects": total,
"digital_count": digital,
"offline_count": offline,
"with_website": with_website,
"with_https": with_https,
"scored": scored,
"avg_score": round(avg_score, 1) if avg_score else None,
"top_priority": top_priority,
"leads_by_tier": leads_by_tier,
"common_issues": self._get_common_issues(db),
}
def get_scan_jobs(
self,
db: Session,
*,
page: int = 1,
per_page: int = 20,
status: str | None = None,
) -> tuple[list[ProspectScanJob], int]:
"""Get paginated scan jobs."""
query = db.query(ProspectScanJob)
if status:
query = query.filter(ProspectScanJob.status == status)
total = query.count()
jobs = (
query.order_by(ProspectScanJob.created_at.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
return jobs, total
def _get_common_issues(self, db: Session) -> list[dict]:
"""Extract common issue flags from scored prospects."""
scores = db.query(ProspectScore.reason_flags).filter(ProspectScore.reason_flags.isnot(None)).all()
import json
flag_counts: dict[str, int] = {}
for (flags_json,) in scores:
try:
flags = json.loads(flags_json)
for flag in flags:
flag_counts[flag] = flag_counts.get(flag, 0) + 1
except (json.JSONDecodeError, TypeError):
continue
sorted_flags = sorted(flag_counts.items(), key=lambda x: x[1], reverse=True)
return [{"flag": flag, "count": count} for flag, count in sorted_flags[:10]]
stats_service = StatsService()