Files
orion/app/modules/prospecting/services/stats_service.py
Samir Boulahtit f310363f7c fix(prospecting): fix scan-jobs batch endpoints and add job tracking
- Reorder routes: batch endpoints before /{prospect_id} to fix FastAPI
  route matching (was parsing "batch" as prospect_id → 422)
- Add scan job tracking via stats_service.create_job/complete_job so
  the scan-jobs table gets populated after each batch run
- Add contact scrape batch endpoint (POST /contacts/batch) with
  get_pending_contact_scrape query
- Fix scan-jobs.js: explicit route map instead of naive replace
- Normalize domain_name on create/update (strip protocol, www, slash)
- Add domain_name to ProspectUpdate schema
- Add proposal for contact scraper enum + regex fixes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 23:31:33 +02:00

122 lines
4.2 KiB
Python

# app/modules/prospecting/services/stats_service.py
"""
Statistics service for the prospecting dashboard.
"""
import logging
from datetime import UTC, datetime
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.modules.prospecting.models import (
JobStatus,
JobType,
Prospect,
ProspectChannel,
ProspectScanJob,
ProspectScore,
)
logger = logging.getLogger(__name__)
class StatsService:
"""Service for dashboard statistics and reporting."""
def get_overview(self, db: Session) -> dict:
"""Get overview statistics for the dashboard."""
total = db.query(func.count(Prospect.id)).scalar() or 0
digital = db.query(func.count(Prospect.id)).filter(Prospect.channel == ProspectChannel.DIGITAL).scalar() or 0
offline = db.query(func.count(Prospect.id)).filter(Prospect.channel == ProspectChannel.OFFLINE).scalar() or 0
with_website = db.query(func.count(Prospect.id)).filter(Prospect.has_website.is_(True)).scalar() or 0
with_https = db.query(func.count(Prospect.id)).filter(Prospect.uses_https.is_(True)).scalar() or 0
scored = db.query(func.count(ProspectScore.id)).scalar() or 0
avg_score = db.query(func.avg(ProspectScore.score)).scalar()
# Leads by tier
tier_results = (
db.query(ProspectScore.lead_tier, func.count(ProspectScore.id))
.group_by(ProspectScore.lead_tier)
.all()
)
leads_by_tier = {tier: count for tier, count in tier_results if tier}
# Common issues (from reason_flags JSON)
# Simplified: count scored prospects per tier
top_priority = leads_by_tier.get("top_priority", 0)
return {
"total_prospects": total,
"digital_count": digital,
"offline_count": offline,
"with_website": with_website,
"with_https": with_https,
"scored": scored,
"avg_score": round(avg_score, 1) if avg_score else None,
"top_priority": top_priority,
"leads_by_tier": leads_by_tier,
"common_issues": self._get_common_issues(db),
}
def create_job(self, db: Session, job_type: JobType) -> ProspectScanJob:
"""Create a scan job record for tracking."""
job = ProspectScanJob(
job_type=job_type,
status=JobStatus.RUNNING,
started_at=datetime.now(UTC),
)
db.add(job)
db.flush()
return job
def complete_job(self, job: ProspectScanJob, processed: int, failed: int = 0) -> None:
"""Mark a scan job as completed."""
job.total_items = processed + failed
job.processed_items = processed
job.failed_items = failed
job.status = JobStatus.COMPLETED
job.completed_at = datetime.now(UTC)
def get_scan_jobs(
self,
db: Session,
*,
page: int = 1,
per_page: int = 20,
status: str | None = None,
) -> tuple[list[ProspectScanJob], int]:
"""Get paginated scan jobs."""
query = db.query(ProspectScanJob)
if status:
query = query.filter(ProspectScanJob.status == status)
total = query.count()
jobs = (
query.order_by(ProspectScanJob.created_at.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
return jobs, total
def _get_common_issues(self, db: Session) -> list[dict]:
"""Extract common issue flags from scored prospects."""
scores = db.query(ProspectScore.reason_flags).filter(ProspectScore.reason_flags.isnot(None)).all() # noqa: SVC-005 - prospecting is platform-scoped, not store-scoped
import json
flag_counts: dict[str, int] = {}
for (flags_json,) in scores:
try:
flags = json.loads(flags_json)
for flag in flags:
flag_counts[flag] = flag_counts.get(flag, 0) + 1
except (json.JSONDecodeError, TypeError):
continue
sorted_flags = sorted(flag_counts.items(), key=lambda x: x[1], reverse=True)
return [{"flag": flag, "count": count} for flag, count in sorted_flags[:10]]
stats_service = StatsService()