diff --git a/app/modules/prospecting/config.py b/app/modules/prospecting/config.py index 44ec6d1d..3ce12425 100644 --- a/app/modules/prospecting/config.py +++ b/app/modules/prospecting/config.py @@ -26,6 +26,9 @@ class ModuleConfig(BaseSettings): # Max concurrent HTTP requests for batch scanning max_concurrent_requests: int = 10 + # Delay between prospects in batch scans (seconds) — be polite to target sites + batch_delay_seconds: float = 1.0 + model_config = {"env_prefix": "PROSPECTING_", "env_file": ".env", "extra": "ignore"} diff --git a/app/modules/prospecting/docs/batch-scanning.md b/app/modules/prospecting/docs/batch-scanning.md new file mode 100644 index 00000000..f1fbaf02 --- /dev/null +++ b/app/modules/prospecting/docs/batch-scanning.md @@ -0,0 +1,74 @@ +# Batch Scanning & Rate Limiting + +## Overview + +The prospecting module performs passive scans against prospect websites to gather intelligence. Batch operations process multiple prospects sequentially with a configurable delay between each. + +## Scan Types + +| Type | What It Does | HTTP Requests/Prospect | +|---|---|---| +| **HTTP Check** | Connectivity, HTTPS, redirects | 2 (HTTP + HTTPS) | +| **Tech Scan** | CMS, framework, server detection | 1 (homepage) | +| **Performance** | PageSpeed Insights audit | 1 (Google API) | +| **Contact Scrape** | Email, phone, address extraction | 6 (homepage + 5 subpages) | +| **Security Audit** | Headers, SSL, exposed files, cookies | ~35 (homepage + 30 path checks) | +| **Score Compute** | Calculate opportunity score | 0 (local computation) | + +## Rate Limiting + +### Configuration + +```bash +# .env +PROSPECTING_BATCH_DELAY_SECONDS=1.0 # delay between prospects (default: 1s) +PROSPECTING_HTTP_TIMEOUT=10 # per-request timeout (default: 10s) +``` + +### Where Delays Apply + +- **Batch API endpoints** (`POST /enrichment/*/batch`) — 1s delay between prospects +- **Celery background tasks** (`scan_tasks.py`) — same 1s delay +- **Full enrichment** (`POST /enrichment/full/{id}`) — no delay (single prospect) +- **Score compute batch** — no delay (no outbound HTTP) + +### Scaling to 70k+ URLs + +For bulk imports (e.g., domain registrar list), use Celery tasks with limits: + +| Scan Type | Time per prospect | 70k URLs | Recommended Batch | +|---|---|---|---| +| HTTP Check | ~2s (timeout + delay) | ~39 hours | 500/batch via Celery | +| Tech Scan | ~2s | ~39 hours | 500/batch | +| Contact Scrape | ~12s (6 pages + delay) | ~10 days | 100/batch | +| Security Audit | ~40s (35 paths + delay) | ~32 days | 50/batch | + +**Recommendation:** For 70k URLs, run HTTP Check first (fastest, filters out dead sites). Then run subsequent scans only on prospects with `has_website=True` (~50-70% of domains typically have working sites). + +### Pipeline Order + +``` +1. HTTP Check batch → sets has_website, filters dead domains +2. Tech Scan batch → only where has_website=True +3. Contact Scrape → only where has_website=True +4. Security Audit → only where has_website=True +5. Score Compute → all prospects (local, fast) +``` + +Each scan type uses `last_*_at` timestamps to track what's been processed. Re-running a batch only processes prospects that haven't been scanned yet. + +## User-Agent + +All scans use a standard Chrome User-Agent: +``` +Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 +``` + +The security audit also identifies as `OrionBot/1.0` in the contact scraper for transparency. + +## Error Handling + +- Individual prospect failures don't stop the batch +- Errors are logged but the next prospect continues +- The scan job record tracks `processed_items` vs `total_items` +- Celery tasks retry on failure (2 retries with exponential backoff) diff --git a/app/modules/prospecting/routes/api/admin_enrichment.py b/app/modules/prospecting/routes/api/admin_enrichment.py index fe3bdca4..86b5de17 100644 --- a/app/modules/prospecting/routes/api/admin_enrichment.py +++ b/app/modules/prospecting/routes/api/admin_enrichment.py @@ -8,6 +8,7 @@ catch "batch" as a string before trying to parse it as int → 422. """ import logging +import time from fastapi import APIRouter, Depends, Path, Query from fastapi.responses import HTMLResponse @@ -15,6 +16,7 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db +from app.modules.prospecting.config import config as prospecting_config from app.modules.prospecting.models import JobType from app.modules.prospecting.schemas.enrichment import ( ContactScrapeResponse, @@ -45,6 +47,12 @@ router = APIRouter(prefix="/enrichment") logger = logging.getLogger(__name__) +def _batch_delay(): + """Delay between prospects in batch scans to avoid rate limiting.""" + if prospecting_config.batch_delay_seconds > 0: + time.sleep(prospecting_config.batch_delay_seconds) + + # ── Batch endpoints (must be before /{prospect_id} routes) ────────────────── @@ -58,10 +66,12 @@ def http_check_batch( job = stats_service.create_job(db,JobType.HTTP_CHECK) prospects = prospect_service.get_pending_http_check(db, limit=limit) results = [] - for prospect in prospects: + for i, prospect in enumerate(prospects): result = enrichment_service.check_http(db, prospect) results.append(HttpCheckBatchItem(domain=prospect.domain_name, **result)) - stats_service.complete_job(job,processed=len(results)) + if i < len(prospects) - 1: + _batch_delay() + stats_service.complete_job(job, processed=len(results)) db.commit() return HttpCheckBatchResponse(processed=len(results), results=results) @@ -76,11 +86,13 @@ def tech_scan_batch( job = stats_service.create_job(db,JobType.TECH_SCAN) prospects = prospect_service.get_pending_tech_scan(db, limit=limit) count = 0 - for prospect in prospects: + for i, prospect in enumerate(prospects): result = enrichment_service.scan_tech_stack(db, prospect) if result: count += 1 - stats_service.complete_job(job,processed=len(prospects)) + if i < len(prospects) - 1: + _batch_delay() + stats_service.complete_job(job, processed=len(prospects)) db.commit() return ScanBatchResponse(processed=len(prospects), successful=count) @@ -95,11 +107,13 @@ def performance_scan_batch( job = stats_service.create_job(db,JobType.PERFORMANCE_SCAN) prospects = prospect_service.get_pending_performance_scan(db, limit=limit) count = 0 - for prospect in prospects: + for i, prospect in enumerate(prospects): result = enrichment_service.scan_performance(db, prospect) if result: count += 1 - stats_service.complete_job(job,processed=len(prospects)) + if i < len(prospects) - 1: + _batch_delay() + stats_service.complete_job(job, processed=len(prospects)) db.commit() return ScanBatchResponse(processed=len(prospects), successful=count) @@ -114,11 +128,13 @@ def contact_scrape_batch( job = stats_service.create_job(db,JobType.CONTACT_SCRAPE) prospects = prospect_service.get_pending_contact_scrape(db, limit=limit) count = 0 - for prospect in prospects: + for i, prospect in enumerate(prospects): contacts = enrichment_service.scrape_contacts(db, prospect) if contacts: count += 1 - stats_service.complete_job(job,processed=len(prospects)) + if i < len(prospects) - 1: + _batch_delay() + stats_service.complete_job(job, processed=len(prospects)) db.commit() return ScanBatchResponse(processed=len(prospects), successful=count) @@ -133,10 +149,12 @@ def security_audit_batch( job = stats_service.create_job(db, JobType.SECURITY_AUDIT) prospects = prospect_service.get_pending_security_audit(db, limit=limit) count = 0 - for prospect in prospects: + for i, prospect in enumerate(prospects): result = security_audit_service.run_audit(db, prospect) if result: count += 1 + if i < len(prospects) - 1: + _batch_delay() stats_service.complete_job(job, processed=len(prospects)) db.commit() return ScanBatchResponse(processed=len(prospects), successful=count) diff --git a/app/modules/prospecting/tasks/scan_tasks.py b/app/modules/prospecting/tasks/scan_tasks.py index 42584d3d..21404bac 100644 --- a/app/modules/prospecting/tasks/scan_tasks.py +++ b/app/modules/prospecting/tasks/scan_tasks.py @@ -4,9 +4,11 @@ Celery tasks for batch prospect scanning and enrichment. """ import logging +import time from datetime import UTC, datetime from app.core.celery_config import celery_app +from app.modules.prospecting.config import config as prospecting_config from app.modules.prospecting.models import ProspectScanJob from app.modules.task_base import ModuleTask @@ -53,6 +55,8 @@ def batch_http_check(self, job_id: int, limit: int = 100): job.processed_items = processed if processed % 10 == 0: db.flush() + if processed < len(prospects): + time.sleep(prospecting_config.batch_delay_seconds) job.status = "completed" job.completed_at = datetime.now(UTC) @@ -61,7 +65,7 @@ def batch_http_check(self, job_id: int, limit: int = 100): except Exception as e: logger.error("batch_http_check job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" - job.error_message = str(e)[:500] + job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @@ -110,6 +114,8 @@ def batch_tech_scan(self, job_id: int, limit: int = 100): job.processed_items = processed if processed % 10 == 0: db.flush() + if processed < len(prospects): + time.sleep(prospecting_config.batch_delay_seconds) job.status = "completed" job.completed_at = datetime.now(UTC) @@ -118,7 +124,7 @@ def batch_tech_scan(self, job_id: int, limit: int = 100): except Exception as e: logger.error("batch_tech_scan job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" - job.error_message = str(e)[:500] + job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @@ -167,6 +173,8 @@ def batch_performance_scan(self, job_id: int, limit: int = 50): job.processed_items = processed if processed % 5 == 0: db.flush() + if processed < len(prospects): + time.sleep(prospecting_config.batch_delay_seconds) job.status = "completed" job.completed_at = datetime.now(UTC) @@ -175,7 +183,7 @@ def batch_performance_scan(self, job_id: int, limit: int = 50): except Exception as e: logger.error("batch_performance_scan job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" - job.error_message = str(e)[:500] + job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @@ -223,6 +231,8 @@ def batch_contact_scrape(self, job_id: int, limit: int = 100): job.processed_items = processed if processed % 10 == 0: db.flush() + if processed < len(prospects): + time.sleep(prospecting_config.batch_delay_seconds) job.status = "completed" job.completed_at = datetime.now(UTC) @@ -231,7 +241,7 @@ def batch_contact_scrape(self, job_id: int, limit: int = 100): except Exception as e: logger.error("batch_contact_scrape job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" - job.error_message = str(e)[:500] + job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @@ -270,7 +280,7 @@ def batch_score_compute(self, job_id: int, limit: int = 500): except Exception as e: logger.error("batch_score_compute job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" - job.error_message = str(e)[:500] + job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @@ -331,7 +341,7 @@ def full_enrichment(self, job_id: int, prospect_id: int): except Exception as e: logger.error("full_enrichment job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" - job.error_message = str(e)[:500] + job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise