Files
orion/app/modules/prospecting/services/lead_service.py
Samir Boulahtit 6d6eba75bf
Some checks failed
CI / pytest (push) Failing after 48m31s
CI / docs (push) Has been skipped
CI / deploy (push) Has been skipped
CI / ruff (push) Successful in 11s
CI / validate (push) Successful in 23s
CI / dependency-scanning (push) Successful in 28s
feat(prospecting): add complete prospecting module for lead discovery and scoring
Migrates scanning pipeline from marketing-.lu-domains app into Orion module.
Supports digital (domain scan) and offline (manual capture) lead channels
with enrichment, scoring, campaign management, and interaction tracking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 00:59:47 +01:00

154 lines
4.7 KiB
Python

# app/modules/prospecting/services/lead_service.py
"""
Lead filtering and export service.
Provides filtered views of scored prospects and CSV export capabilities.
"""
import csv
import io
import json
import logging
from sqlalchemy.orm import Session, joinedload
from app.modules.prospecting.models import (
Prospect,
ProspectScore,
)
logger = logging.getLogger(__name__)
class LeadService:
"""Service for lead retrieval and export."""
def get_leads(
self,
db: Session,
*,
page: int = 1,
per_page: int = 20,
min_score: int = 0,
max_score: int = 100,
lead_tier: str | None = None,
channel: str | None = None,
has_email: bool | None = None,
has_phone: bool | None = None,
reason_flag: str | None = None,
) -> tuple[list[dict], int]:
"""Get filtered leads with scores."""
query = (
db.query(Prospect)
.join(ProspectScore)
.options(
joinedload(Prospect.score),
joinedload(Prospect.contacts),
)
.filter(
ProspectScore.score >= min_score,
ProspectScore.score <= max_score,
)
)
if lead_tier:
query = query.filter(ProspectScore.lead_tier == lead_tier)
if channel:
query = query.filter(Prospect.channel == channel)
if reason_flag:
query = query.filter(ProspectScore.reason_flags.contains(reason_flag))
total = query.count()
prospects = (
query.order_by(ProspectScore.score.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
leads = []
for p in prospects:
contacts = p.contacts or []
primary_email = next((c.value for c in contacts if c.contact_type == "email" and c.is_primary), None)
if not primary_email:
primary_email = next((c.value for c in contacts if c.contact_type == "email"), None)
primary_phone = next((c.value for c in contacts if c.contact_type == "phone" and c.is_primary), None)
if not primary_phone:
primary_phone = next((c.value for c in contacts if c.contact_type == "phone"), None)
# Filter by contact availability if requested
if has_email is True and not primary_email:
continue
if has_email is False and primary_email:
continue
if has_phone is True and not primary_phone:
continue
if has_phone is False and primary_phone:
continue
reason_flags = json.loads(p.score.reason_flags) if p.score and p.score.reason_flags else []
leads.append({
"id": p.id,
"business_name": p.business_name,
"domain_name": p.domain_name,
"channel": str(p.channel.value) if p.channel else None,
"score": p.score.score if p.score else 0,
"lead_tier": p.score.lead_tier if p.score else None,
"reason_flags": reason_flags,
"primary_email": primary_email,
"primary_phone": primary_phone,
})
return leads, total
def get_top_priority(self, db: Session, limit: int = 50) -> list[dict]:
leads, _ = self.get_leads(db, min_score=70, per_page=limit)
return leads
def get_quick_wins(self, db: Session, limit: int = 50) -> list[dict]:
leads, _ = self.get_leads(db, min_score=50, max_score=69, per_page=limit)
return leads
def export_csv(
self,
db: Session,
*,
min_score: int = 0,
lead_tier: str | None = None,
channel: str | None = None,
limit: int = 1000,
) -> str:
"""Export leads to CSV string."""
leads, _ = self.get_leads(
db,
min_score=min_score,
lead_tier=lead_tier,
channel=channel,
per_page=limit,
)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow([
"Domain", "Business Name", "Channel", "Score", "Tier",
"Issues", "Email", "Phone",
])
for lead in leads:
writer.writerow([
lead["domain_name"] or "",
lead["business_name"] or "",
lead["channel"] or "",
lead["score"],
lead["lead_tier"] or "",
"; ".join(lead["reason_flags"]),
lead["primary_email"] or "",
lead["primary_phone"] or "",
])
return output.getvalue()
lead_service = LeadService()