# app/modules/prospecting/routes/api/admin_prospects.py """ Admin API routes for prospect management. """ import logging from math import ceil from fastapi import APIRouter, Depends, Path, Query, UploadFile from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db from app.modules.prospecting.schemas.prospect import ( ProspectCreate, ProspectDeleteResponse, ProspectDetailResponse, ProspectImportResponse, ProspectListResponse, ProspectResponse, ProspectUpdate, ) from app.modules.prospecting.services.prospect_service import prospect_service from app.modules.tenancy.schemas.auth import UserContext router = APIRouter(prefix="/prospects") logger = logging.getLogger(__name__) @router.get("", response_model=ProspectListResponse) def list_prospects( page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), search: str | None = Query(None), channel: str | None = Query(None), status: str | None = Query(None), tier: str | None = Query(None), city: str | None = Query(None), db: Session = Depends(get_db), current_admin: UserContext = Depends(get_current_admin_api), ): """List prospects with filters and pagination.""" prospects, total = prospect_service.get_all( db, page=page, per_page=per_page, search=search, channel=channel, status=status, tier=tier, city=city, ) return ProspectListResponse( items=[_to_response(p) for p in prospects], total=total, page=page, per_page=per_page, pages=ceil(total / per_page) if per_page else 0, ) @router.get("/{prospect_id}", response_model=ProspectDetailResponse) def get_prospect( prospect_id: int = Path(...), db: Session = Depends(get_db), current_admin: UserContext = Depends(get_current_admin_api), ): """Get full prospect detail with all related data.""" prospect = prospect_service.get_by_id(db, prospect_id) return prospect @router.post("", response_model=ProspectResponse) def create_prospect( data: ProspectCreate, db: Session = Depends(get_db), current_admin: UserContext = Depends(get_current_admin_api), ): """Create a new prospect (digital or offline).""" prospect = prospect_service.create( db, data.model_dump(exclude_none=True), captured_by_user_id=current_admin.id, ) db.commit() return _to_response(prospect) @router.put("/{prospect_id}", response_model=ProspectResponse) def update_prospect( data: ProspectUpdate, prospect_id: int = Path(...), db: Session = Depends(get_db), current_admin: UserContext = Depends(get_current_admin_api), ): """Update a prospect.""" prospect = prospect_service.update(db, prospect_id, data.model_dump(exclude_none=True)) db.commit() return _to_response(prospect) @router.delete("/{prospect_id}", response_model=ProspectDeleteResponse) def delete_prospect( prospect_id: int = Path(...), db: Session = Depends(get_db), current_admin: UserContext = Depends(get_current_admin_api), ): """Delete a prospect.""" prospect_service.delete(db, prospect_id) db.commit() return ProspectDeleteResponse(message="Prospect deleted") @router.post("/import", response_model=ProspectImportResponse) async def import_domains( file: UploadFile, db: Session = Depends(get_db), current_admin: UserContext = Depends(get_current_admin_api), ): """Import domains from a CSV file.""" content = await file.read() lines = content.decode("utf-8").strip().split("\n") # Skip header if present domains = [] for line in lines: domain = line.strip().strip(",").strip('"') if domain and "." in domain and not domain.startswith("#"): domains.append(domain) created, skipped = prospect_service.create_bulk(db, domains, source="csv_import") db.commit() return ProspectImportResponse(created=created, skipped=skipped, total=len(domains)) def _to_response(prospect) -> ProspectResponse: """Convert a prospect model to response schema.""" import json contacts = prospect.contacts or [] primary_email = next((c.value for c in contacts if c.contact_type == "email"), None) primary_phone = next((c.value for c in contacts if c.contact_type == "phone"), None) tags = None if prospect.tags: try: tags = json.loads(prospect.tags) except (json.JSONDecodeError, TypeError): tags = None score_resp = None if prospect.score: from app.modules.prospecting.schemas.score import ProspectScoreResponse reason_flags = [] if prospect.score.reason_flags: try: reason_flags = json.loads(prospect.score.reason_flags) except (json.JSONDecodeError, TypeError): pass score_breakdown = None if prospect.score.score_breakdown: try: score_breakdown = json.loads(prospect.score.score_breakdown) except (json.JSONDecodeError, TypeError): pass score_resp = ProspectScoreResponse( id=prospect.score.id, prospect_id=prospect.score.prospect_id, score=prospect.score.score, technical_health_score=prospect.score.technical_health_score, modernity_score=prospect.score.modernity_score, business_value_score=prospect.score.business_value_score, engagement_score=prospect.score.engagement_score, reason_flags=reason_flags, score_breakdown=score_breakdown, lead_tier=prospect.score.lead_tier, notes=prospect.score.notes, created_at=prospect.score.created_at, updated_at=prospect.score.updated_at, ) return ProspectResponse( id=prospect.id, channel=prospect.channel.value if prospect.channel else "digital", business_name=prospect.business_name, domain_name=prospect.domain_name, status=prospect.status.value if prospect.status else "pending", source=prospect.source, has_website=prospect.has_website, uses_https=prospect.uses_https, http_status_code=prospect.http_status_code, address=prospect.address, city=prospect.city, postal_code=prospect.postal_code, country=prospect.country, notes=prospect.notes, tags=tags, location_lat=prospect.location_lat, location_lng=prospect.location_lng, created_at=prospect.created_at, updated_at=prospect.updated_at, score=score_resp, primary_email=primary_email, primary_phone=primary_phone, )