fix(prospecting): fix scan-jobs batch endpoints and add job tracking
- Reorder routes: batch endpoints before /{prospect_id} to fix FastAPI
route matching (was parsing "batch" as prospect_id → 422)
- Add scan job tracking via stats_service.create_job/complete_job so
the scan-jobs table gets populated after each batch run
- Add contact scrape batch endpoint (POST /contacts/batch) with
get_pending_contact_scrape query
- Fix scan-jobs.js: explicit route map instead of naive replace
- Normalize domain_name on create/update (strip protocol, www, slash)
- Add domain_name to ProspectUpdate schema
- Add proposal for contact scraper enum + regex fixes
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
# app/modules/prospecting/routes/api/admin_enrichment.py
|
||||
"""
|
||||
Admin API routes for enrichment/scanning pipeline.
|
||||
|
||||
NOTE: Batch routes MUST be defined before /{prospect_id} routes.
|
||||
FastAPI matches routes in definition order, and {prospect_id} would
|
||||
catch "batch" as a string before trying to parse it as int → 422.
|
||||
"""
|
||||
|
||||
import logging
|
||||
@@ -10,6 +14,7 @@ from sqlalchemy.orm import Session
|
||||
|
||||
from app.api.deps import get_current_admin_api
|
||||
from app.core.database import get_db
|
||||
from app.modules.prospecting.models import JobType
|
||||
from app.modules.prospecting.schemas.enrichment import (
|
||||
ContactScrapeResponse,
|
||||
FullEnrichmentResponse,
|
||||
@@ -23,12 +28,108 @@ from app.modules.prospecting.schemas.enrichment import (
|
||||
from app.modules.prospecting.services.enrichment_service import enrichment_service
|
||||
from app.modules.prospecting.services.prospect_service import prospect_service
|
||||
from app.modules.prospecting.services.scoring_service import scoring_service
|
||||
from app.modules.prospecting.services.stats_service import stats_service
|
||||
from app.modules.tenancy.schemas.auth import UserContext
|
||||
|
||||
router = APIRouter(prefix="/enrichment")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Batch endpoints (must be before /{prospect_id} routes) ──────────────────
|
||||
|
||||
|
||||
@router.post("/http-check/batch", response_model=HttpCheckBatchResponse)
|
||||
def http_check_batch(
|
||||
limit: int = Query(100, ge=1, le=500),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Run HTTP check for pending prospects."""
|
||||
job = stats_service.create_job(db,JobType.HTTP_CHECK)
|
||||
prospects = prospect_service.get_pending_http_check(db, limit=limit)
|
||||
results = []
|
||||
for prospect in prospects:
|
||||
result = enrichment_service.check_http(db, prospect)
|
||||
results.append(HttpCheckBatchItem(domain=prospect.domain_name, **result))
|
||||
stats_service.complete_job(job,processed=len(results))
|
||||
db.commit()
|
||||
return HttpCheckBatchResponse(processed=len(results), results=results)
|
||||
|
||||
|
||||
@router.post("/tech-scan/batch", response_model=ScanBatchResponse)
|
||||
def tech_scan_batch(
|
||||
limit: int = Query(100, ge=1, le=500),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Run tech scan for pending prospects."""
|
||||
job = stats_service.create_job(db,JobType.TECH_SCAN)
|
||||
prospects = prospect_service.get_pending_tech_scan(db, limit=limit)
|
||||
count = 0
|
||||
for prospect in prospects:
|
||||
result = enrichment_service.scan_tech_stack(db, prospect)
|
||||
if result:
|
||||
count += 1
|
||||
stats_service.complete_job(job,processed=len(prospects))
|
||||
db.commit()
|
||||
return ScanBatchResponse(processed=len(prospects), successful=count)
|
||||
|
||||
|
||||
@router.post("/performance/batch", response_model=ScanBatchResponse)
|
||||
def performance_scan_batch(
|
||||
limit: int = Query(50, ge=1, le=200),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Run performance scan for pending prospects."""
|
||||
job = stats_service.create_job(db,JobType.PERFORMANCE_SCAN)
|
||||
prospects = prospect_service.get_pending_performance_scan(db, limit=limit)
|
||||
count = 0
|
||||
for prospect in prospects:
|
||||
result = enrichment_service.scan_performance(db, prospect)
|
||||
if result:
|
||||
count += 1
|
||||
stats_service.complete_job(job,processed=len(prospects))
|
||||
db.commit()
|
||||
return ScanBatchResponse(processed=len(prospects), successful=count)
|
||||
|
||||
|
||||
@router.post("/contacts/batch", response_model=ScanBatchResponse)
|
||||
def contact_scrape_batch(
|
||||
limit: int = Query(50, ge=1, le=200),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Scrape contacts for pending prospects."""
|
||||
job = stats_service.create_job(db,JobType.CONTACT_SCRAPE)
|
||||
prospects = prospect_service.get_pending_contact_scrape(db, limit=limit)
|
||||
count = 0
|
||||
for prospect in prospects:
|
||||
contacts = enrichment_service.scrape_contacts(db, prospect)
|
||||
if contacts:
|
||||
count += 1
|
||||
stats_service.complete_job(job,processed=len(prospects))
|
||||
db.commit()
|
||||
return ScanBatchResponse(processed=len(prospects), successful=count)
|
||||
|
||||
|
||||
@router.post("/score-compute/batch", response_model=ScoreComputeBatchResponse)
|
||||
def compute_scores_batch(
|
||||
limit: int = Query(500, ge=1, le=5000),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Compute or recompute scores for all prospects."""
|
||||
job = stats_service.create_job(db,JobType.SCORE_COMPUTE)
|
||||
count = scoring_service.compute_all(db, limit=limit)
|
||||
stats_service.complete_job(job,processed=count)
|
||||
db.commit()
|
||||
return ScoreComputeBatchResponse(scored=count)
|
||||
|
||||
|
||||
# ── Single-prospect endpoints ───────────────────────────────────────────────
|
||||
|
||||
|
||||
@router.post("/http-check/{prospect_id}", response_model=HttpCheckResult)
|
||||
def http_check_single(
|
||||
prospect_id: int = Path(...),
|
||||
@@ -42,22 +143,6 @@ def http_check_single(
|
||||
return HttpCheckResult(**result)
|
||||
|
||||
|
||||
@router.post("/http-check/batch", response_model=HttpCheckBatchResponse)
|
||||
def http_check_batch(
|
||||
limit: int = Query(100, ge=1, le=500),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Run HTTP check for pending prospects."""
|
||||
prospects = prospect_service.get_pending_http_check(db, limit=limit)
|
||||
results = []
|
||||
for prospect in prospects:
|
||||
result = enrichment_service.check_http(db, prospect)
|
||||
results.append(HttpCheckBatchItem(domain=prospect.domain_name, **result))
|
||||
db.commit()
|
||||
return HttpCheckBatchResponse(processed=len(results), results=results)
|
||||
|
||||
|
||||
@router.post("/tech-scan/{prospect_id}", response_model=ScanSingleResponse)
|
||||
def tech_scan_single(
|
||||
prospect_id: int = Path(...),
|
||||
@@ -71,23 +156,6 @@ def tech_scan_single(
|
||||
return ScanSingleResponse(domain=prospect.domain_name, profile=profile is not None)
|
||||
|
||||
|
||||
@router.post("/tech-scan/batch", response_model=ScanBatchResponse)
|
||||
def tech_scan_batch(
|
||||
limit: int = Query(100, ge=1, le=500),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Run tech scan for pending prospects."""
|
||||
prospects = prospect_service.get_pending_tech_scan(db, limit=limit)
|
||||
count = 0
|
||||
for prospect in prospects:
|
||||
result = enrichment_service.scan_tech_stack(db, prospect)
|
||||
if result:
|
||||
count += 1
|
||||
db.commit()
|
||||
return ScanBatchResponse(processed=len(prospects), successful=count)
|
||||
|
||||
|
||||
@router.post("/performance/{prospect_id}", response_model=ScanSingleResponse)
|
||||
def performance_scan_single(
|
||||
prospect_id: int = Path(...),
|
||||
@@ -101,23 +169,6 @@ def performance_scan_single(
|
||||
return ScanSingleResponse(domain=prospect.domain_name, profile=profile is not None)
|
||||
|
||||
|
||||
@router.post("/performance/batch", response_model=ScanBatchResponse)
|
||||
def performance_scan_batch(
|
||||
limit: int = Query(50, ge=1, le=200),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Run performance scan for pending prospects."""
|
||||
prospects = prospect_service.get_pending_performance_scan(db, limit=limit)
|
||||
count = 0
|
||||
for prospect in prospects:
|
||||
result = enrichment_service.scan_performance(db, prospect)
|
||||
if result:
|
||||
count += 1
|
||||
db.commit()
|
||||
return ScanBatchResponse(processed=len(prospects), successful=count)
|
||||
|
||||
|
||||
@router.post("/contacts/{prospect_id}", response_model=ContactScrapeResponse)
|
||||
def scrape_contacts_single(
|
||||
prospect_id: int = Path(...),
|
||||
@@ -172,15 +223,3 @@ def full_enrichment(
|
||||
score=score.score,
|
||||
lead_tier=score.lead_tier,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/score-compute/batch", response_model=ScoreComputeBatchResponse)
|
||||
def compute_scores_batch(
|
||||
limit: int = Query(500, ge=1, le=5000),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: UserContext = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Compute or recompute scores for all prospects."""
|
||||
count = scoring_service.compute_all(db, limit=limit)
|
||||
db.commit()
|
||||
return ScoreComputeBatchResponse(scored=count)
|
||||
|
||||
@@ -28,6 +28,7 @@ class ProspectUpdate(BaseModel):
|
||||
"""Schema for updating a prospect."""
|
||||
|
||||
business_name: str | None = Field(None, max_length=255)
|
||||
domain_name: str | None = Field(None, max_length=255)
|
||||
status: str | None = None
|
||||
source: str | None = Field(None, max_length=100)
|
||||
address: str | None = Field(None, max_length=500)
|
||||
|
||||
@@ -94,10 +94,22 @@ class ProspectService:
|
||||
|
||||
return prospects, total
|
||||
|
||||
@staticmethod
|
||||
def _normalize_domain(domain: str) -> str:
|
||||
"""Strip protocol, www prefix, and trailing slash from a domain."""
|
||||
domain = domain.strip()
|
||||
for prefix in ["https://", "http://"]:
|
||||
if domain.lower().startswith(prefix):
|
||||
domain = domain[len(prefix):]
|
||||
if domain.lower().startswith("www."):
|
||||
domain = domain[4:]
|
||||
return domain.rstrip("/")
|
||||
|
||||
def create(self, db: Session, data: dict, captured_by_user_id: int | None = None) -> Prospect:
|
||||
channel = data.get("channel", "digital")
|
||||
|
||||
if channel == "digital" and data.get("domain_name"):
|
||||
data["domain_name"] = self._normalize_domain(data["domain_name"])
|
||||
existing = self.get_by_domain(db, data["domain_name"])
|
||||
if existing:
|
||||
raise DuplicateDomainException(data["domain_name"])
|
||||
@@ -148,7 +160,7 @@ class ProspectService:
|
||||
skipped = 0
|
||||
new_prospects = []
|
||||
for name in domain_names:
|
||||
name = name.strip().lower()
|
||||
name = self._normalize_domain(name).lower()
|
||||
if not name:
|
||||
continue
|
||||
existing = self.get_by_domain(db, name)
|
||||
@@ -171,6 +183,9 @@ class ProspectService:
|
||||
def update(self, db: Session, prospect_id: int, data: dict) -> Prospect:
|
||||
prospect = self.get_by_id(db, prospect_id)
|
||||
|
||||
if "domain_name" in data and data["domain_name"] is not None:
|
||||
prospect.domain_name = self._normalize_domain(data["domain_name"])
|
||||
|
||||
for field in ["business_name", "status", "source", "address", "city", "postal_code", "notes"]:
|
||||
if field in data and data[field] is not None:
|
||||
setattr(prospect, field, data[field])
|
||||
@@ -225,6 +240,17 @@ class ProspectService:
|
||||
.all()
|
||||
)
|
||||
|
||||
def get_pending_contact_scrape(self, db: Session, limit: int = 100) -> list[Prospect]:
|
||||
return (
|
||||
db.query(Prospect)
|
||||
.filter(
|
||||
Prospect.has_website.is_(True),
|
||||
Prospect.last_contact_scrape_at.is_(None),
|
||||
)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
def count_by_status(self, db: Session) -> dict[str, int]:
|
||||
results = db.query(Prospect.status, func.count(Prospect.id)).group_by(Prospect.status).all() # noqa: SVC-005 - prospecting is platform-scoped, not store-scoped
|
||||
return {status.value if hasattr(status, "value") else str(status): count for status, count in results}
|
||||
|
||||
@@ -4,11 +4,14 @@ Statistics service for the prospecting dashboard.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.modules.prospecting.models import (
|
||||
JobStatus,
|
||||
JobType,
|
||||
Prospect,
|
||||
ProspectChannel,
|
||||
ProspectScanJob,
|
||||
@@ -56,6 +59,25 @@ class StatsService:
|
||||
"common_issues": self._get_common_issues(db),
|
||||
}
|
||||
|
||||
def create_job(self, db: Session, job_type: JobType) -> ProspectScanJob:
|
||||
"""Create a scan job record for tracking."""
|
||||
job = ProspectScanJob(
|
||||
job_type=job_type,
|
||||
status=JobStatus.RUNNING,
|
||||
started_at=datetime.now(UTC),
|
||||
)
|
||||
db.add(job)
|
||||
db.flush()
|
||||
return job
|
||||
|
||||
def complete_job(self, job: ProspectScanJob, processed: int, failed: int = 0) -> None:
|
||||
"""Mark a scan job as completed."""
|
||||
job.total_items = processed + failed
|
||||
job.processed_items = processed
|
||||
job.failed_items = failed
|
||||
job.status = JobStatus.COMPLETED
|
||||
job.completed_at = datetime.now(UTC)
|
||||
|
||||
def get_scan_jobs(
|
||||
self,
|
||||
db: Session,
|
||||
|
||||
@@ -52,6 +52,7 @@ function scanJobs() {
|
||||
'http_check': 'http-check',
|
||||
'tech_scan': 'tech-scan',
|
||||
'performance_scan': 'performance',
|
||||
'contact_scrape': 'contacts',
|
||||
'score_compute': 'score-compute',
|
||||
},
|
||||
|
||||
|
||||
@@ -29,6 +29,11 @@
|
||||
<span x-html="$icon('chart-bar', 'w-4 h-4 mr-2')"></span>
|
||||
Performance Scan
|
||||
</button>
|
||||
<button type="button" @click="startBatchJob('contact_scrape')"
|
||||
class="inline-flex items-center px-4 py-2 text-sm font-medium leading-5 text-white transition-colors duration-150 bg-purple-600 border border-transparent rounded-lg hover:bg-purple-700 focus:outline-none">
|
||||
<span x-html="$icon('mail', 'w-4 h-4 mr-2')"></span>
|
||||
Contact Scrape
|
||||
</button>
|
||||
<button type="button" @click="startBatchJob('score_compute')"
|
||||
class="inline-flex items-center px-4 py-2 text-sm font-medium leading-5 text-white transition-colors duration-150 bg-red-600 border border-transparent rounded-lg hover:bg-red-700 focus:outline-none">
|
||||
<span x-html="$icon('cursor-click', 'w-4 h-4 mr-2')"></span>
|
||||
|
||||
51
docs/proposals/prospecting-contact-scraper-fix.md
Normal file
51
docs/proposals/prospecting-contact-scraper-fix.md
Normal file
@@ -0,0 +1,51 @@
|
||||
# Prospecting Contact Scraper — Fix Enum + Improve Regex
|
||||
|
||||
## Problem 1: DB Enum type mismatch
|
||||
|
||||
`ProspectContact.contact_type` is defined as a Python Enum (`contacttype`) in the model, but the DB column was created as a plain `VARCHAR` in the migration. When SQLAlchemy inserts, it casts to `::contacttype` which doesn't exist in PostgreSQL.
|
||||
|
||||
**Error:** `type "contacttype" does not exist`
|
||||
|
||||
**File:** `app/modules/prospecting/models/prospect_contact.py`
|
||||
|
||||
**Fix options:**
|
||||
- A) Change the model column from `Enum(ContactType)` to `String` to match the migration
|
||||
- B) Create an Alembic migration to add the `contacttype` enum to PostgreSQL
|
||||
|
||||
Option A is simpler and consistent with how the scraper creates contacts (using plain strings like `"email"`, `"phone"`).
|
||||
|
||||
## Problem 2: Phone regex too loose and Luxembourg-specific
|
||||
|
||||
The phone pattern `(?:\+352|00352)?[\s.-]?\d{2,3}[\s.-]?\d{2,3}[\s.-]?\d{2,3}` has two issues:
|
||||
|
||||
1. **Too loose** — matches any 6-9 digit sequence (CSS values, timestamps, hex colors, zip codes). On batirenovation-strasbourg.fr it found 120+ false positives.
|
||||
2. **Luxembourg-only** — only recognizes `+352`/`00352` prefix. This is a French site with `+33` numbers.
|
||||
|
||||
**File:** `app/modules/prospecting/services/enrichment_service.py:274`
|
||||
|
||||
**Fix:** Replace with a broader international phone regex:
|
||||
```python
|
||||
phone_pattern = re.compile(
|
||||
r'(?:\+\d{1,3}[\s.-]?)?' # optional international prefix (+33, +352, etc.)
|
||||
r'\(?\d{1,4}\)?[\s.-]?' # area code with optional parens
|
||||
r'\d{2,4}[\s.-]?' # first group
|
||||
r'\d{2,4}(?:[\s.-]?\d{2,4})?' # second group + optional third
|
||||
)
|
||||
```
|
||||
Also add minimum length filter (10+ digits for international numbers) and exclude patterns that look like dates, hex colors, or CSS values.
|
||||
|
||||
## Problem 3: Email with URL-encoded space
|
||||
|
||||
The scraper finds `%20btirenovation@gmail.com` (from an `href="mailto:%20btirenovation@gmail.com"`) alongside the clean `btirenovation@gmail.com`. The `%20` prefix should be stripped.
|
||||
|
||||
**File:** `app/modules/prospecting/services/enrichment_service.py:293-303`
|
||||
|
||||
**Fix:** URL-decode email values before storing, or strip `%20` prefix.
|
||||
|
||||
## Files to change
|
||||
|
||||
| File | Change |
|
||||
|---|---|
|
||||
| `prospecting/models/prospect_contact.py` | Change `contact_type` from `Enum` to `String` |
|
||||
| `prospecting/services/enrichment_service.py` | Improve phone regex, add min-length filter, URL-decode emails |
|
||||
| Alembic migration | If needed for the enum change |
|
||||
Reference in New Issue
Block a user