# app/modules/prospecting/services/lead_service.py """ Lead filtering and export service. Provides filtered views of scored prospects and CSV export capabilities. """ import csv import io import json import logging from sqlalchemy.orm import Session, joinedload from app.modules.prospecting.models import ( Prospect, ProspectScore, ) logger = logging.getLogger(__name__) class LeadService: """Service for lead retrieval and export.""" def get_leads( self, db: Session, *, page: int = 1, per_page: int = 20, min_score: int = 0, max_score: int = 100, lead_tier: str | None = None, channel: str | None = None, has_email: bool | None = None, has_phone: bool | None = None, reason_flag: str | None = None, ) -> tuple[list[dict], int]: """Get filtered leads with scores.""" query = ( db.query(Prospect) .join(ProspectScore) .options( joinedload(Prospect.score), joinedload(Prospect.contacts), ) .filter( ProspectScore.score >= min_score, ProspectScore.score <= max_score, ) ) if lead_tier: query = query.filter(ProspectScore.lead_tier == lead_tier) if channel: query = query.filter(Prospect.channel == channel) if reason_flag: query = query.filter(ProspectScore.reason_flags.contains(reason_flag)) total = query.count() prospects = ( query.order_by(ProspectScore.score.desc()) .offset((page - 1) * per_page) .limit(per_page) .all() ) leads = [] for p in prospects: contacts = p.contacts or [] primary_email = next((c.value for c in contacts if c.contact_type == "email" and c.is_primary), None) if not primary_email: primary_email = next((c.value for c in contacts if c.contact_type == "email"), None) primary_phone = next((c.value for c in contacts if c.contact_type == "phone" and c.is_primary), None) if not primary_phone: primary_phone = next((c.value for c in contacts if c.contact_type == "phone"), None) # Filter by contact availability if requested if has_email is True and not primary_email: continue if has_email is False and primary_email: continue if has_phone is True and not primary_phone: continue if has_phone is False and primary_phone: continue reason_flags = json.loads(p.score.reason_flags) if p.score and p.score.reason_flags else [] leads.append({ "id": p.id, "business_name": p.business_name, "domain_name": p.domain_name, "channel": str(p.channel.value) if p.channel else None, "score": p.score.score if p.score else 0, "lead_tier": p.score.lead_tier if p.score else None, "reason_flags": reason_flags, "primary_email": primary_email, "primary_phone": primary_phone, }) return leads, total def get_top_priority(self, db: Session, limit: int = 50) -> list[dict]: leads, _ = self.get_leads(db, min_score=70, per_page=limit) return leads def get_quick_wins(self, db: Session, limit: int = 50) -> list[dict]: leads, _ = self.get_leads(db, min_score=50, max_score=69, per_page=limit) return leads def export_csv( self, db: Session, *, min_score: int = 0, lead_tier: str | None = None, channel: str | None = None, limit: int = 1000, ) -> str: """Export leads to CSV string.""" leads, _ = self.get_leads( db, min_score=min_score, lead_tier=lead_tier, channel=channel, per_page=limit, ) output = io.StringIO() writer = csv.writer(output) writer.writerow([ "Domain", "Business Name", "Channel", "Score", "Tier", "Issues", "Email", "Phone", ]) for lead in leads: writer.writerow([ lead["domain_name"] or "", lead["business_name"] or "", lead["channel"] or "", lead["score"], lead["lead_tier"] or "", "; ".join(lead["reason_flags"]), lead["primary_email"] or "", lead["primary_phone"] or "", ]) return output.getvalue() lead_service = LeadService()