# app/modules/prospecting/tasks/scan_tasks.py """ Celery tasks for batch prospect scanning and enrichment. """ import logging import time from datetime import UTC, datetime from app.core.celery_config import celery_app from app.modules.prospecting.config import config as prospecting_config from app.modules.prospecting.models import ProspectScanJob from app.modules.task_base import ModuleTask logger = logging.getLogger(__name__) @celery_app.task( bind=True, base=ModuleTask, name="app.modules.prospecting.tasks.scan_tasks.batch_http_check", max_retries=2, default_retry_delay=60, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=300, ) def batch_http_check(self, job_id: int, limit: int = 100): """Run HTTP connectivity check for pending prospects.""" with self.get_db() as db: job = db.query(ProspectScanJob).filter(ProspectScanJob.id == job_id).first() if not job: logger.error("Scan job %d not found", job_id) return job.celery_task_id = self.request.id job.status = "running" job.started_at = datetime.now(UTC) db.flush() try: from app.modules.prospecting.services.enrichment_service import ( enrichment_service, ) from app.modules.prospecting.services.prospect_service import ( prospect_service, ) prospects = prospect_service.get_pending_http_check(db, limit=limit) job.total_items = len(prospects) db.flush() for processed, prospect in enumerate(prospects, 1): enrichment_service.check_http(db, prospect) job.processed_items = processed if processed % 10 == 0: db.flush() if processed < len(prospects): time.sleep(prospecting_config.batch_delay_seconds) job.status = "completed" job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - background task owns transaction except Exception as e: logger.error("batch_http_check job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @celery_app.task( bind=True, base=ModuleTask, name="app.modules.prospecting.tasks.scan_tasks.batch_tech_scan", max_retries=2, default_retry_delay=60, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=300, ) def batch_tech_scan(self, job_id: int, limit: int = 100): """Run technology scan for pending prospects.""" with self.get_db() as db: job = db.query(ProspectScanJob).filter(ProspectScanJob.id == job_id).first() if not job: logger.error("Scan job %d not found", job_id) return job.celery_task_id = self.request.id job.status = "running" job.started_at = datetime.now(UTC) db.flush() try: from app.modules.prospecting.services.enrichment_service import ( enrichment_service, ) from app.modules.prospecting.services.prospect_service import ( prospect_service, ) prospects = prospect_service.get_pending_tech_scan(db, limit=limit) job.total_items = len(prospects) db.flush() successful = 0 for processed, prospect in enumerate(prospects, 1): result = enrichment_service.scan_tech_stack(db, prospect) if result: successful += 1 job.processed_items = processed if processed % 10 == 0: db.flush() if processed < len(prospects): time.sleep(prospecting_config.batch_delay_seconds) job.status = "completed" job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - background task owns transaction except Exception as e: logger.error("batch_tech_scan job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @celery_app.task( bind=True, base=ModuleTask, name="app.modules.prospecting.tasks.scan_tasks.batch_performance_scan", max_retries=2, default_retry_delay=120, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=600, ) def batch_performance_scan(self, job_id: int, limit: int = 50): """Run PageSpeed performance scan for pending prospects.""" with self.get_db() as db: job = db.query(ProspectScanJob).filter(ProspectScanJob.id == job_id).first() if not job: logger.error("Scan job %d not found", job_id) return job.celery_task_id = self.request.id job.status = "running" job.started_at = datetime.now(UTC) db.flush() try: from app.modules.prospecting.services.enrichment_service import ( enrichment_service, ) from app.modules.prospecting.services.prospect_service import ( prospect_service, ) prospects = prospect_service.get_pending_performance_scan(db, limit=limit) job.total_items = len(prospects) db.flush() successful = 0 for processed, prospect in enumerate(prospects, 1): result = enrichment_service.scan_performance(db, prospect) if result: successful += 1 job.processed_items = processed if processed % 5 == 0: db.flush() if processed < len(prospects): time.sleep(prospecting_config.batch_delay_seconds) job.status = "completed" job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - background task owns transaction except Exception as e: logger.error("batch_performance_scan job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @celery_app.task( bind=True, base=ModuleTask, name="app.modules.prospecting.tasks.scan_tasks.batch_contact_scrape", max_retries=2, default_retry_delay=60, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=300, ) def batch_contact_scrape(self, job_id: int, limit: int = 100): """Scrape contacts for pending prospects.""" with self.get_db() as db: job = db.query(ProspectScanJob).filter(ProspectScanJob.id == job_id).first() if not job: logger.error("Scan job %d not found", job_id) return job.celery_task_id = self.request.id job.status = "running" job.started_at = datetime.now(UTC) db.flush() try: from app.modules.prospecting.services.enrichment_service import ( enrichment_service, ) from app.modules.prospecting.services.prospect_service import ( prospect_service, ) prospects = prospect_service.get_pending_http_check(db, limit=limit) # Only scrape those with websites prospects = [p for p in prospects if p.has_website] job.total_items = len(prospects) db.flush() for processed, prospect in enumerate(prospects, 1): enrichment_service.scrape_contacts(db, prospect) job.processed_items = processed if processed % 10 == 0: db.flush() if processed < len(prospects): time.sleep(prospecting_config.batch_delay_seconds) job.status = "completed" job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - background task owns transaction except Exception as e: logger.error("batch_contact_scrape job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @celery_app.task( bind=True, base=ModuleTask, name="app.modules.prospecting.tasks.scan_tasks.batch_score_compute", max_retries=1, default_retry_delay=30, ) def batch_score_compute(self, job_id: int, limit: int = 500): """Compute or recompute scores for all prospects.""" with self.get_db() as db: job = db.query(ProspectScanJob).filter(ProspectScanJob.id == job_id).first() if not job: logger.error("Scan job %d not found", job_id) return job.celery_task_id = self.request.id job.status = "running" job.started_at = datetime.now(UTC) db.flush() try: from app.modules.prospecting.services.scoring_service import scoring_service count = scoring_service.compute_all(db, limit=limit) job.processed_items = count job.total_items = count job.status = "completed" job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - background task owns transaction except Exception as e: logger.error("batch_score_compute job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise @celery_app.task( bind=True, base=ModuleTask, name="app.modules.prospecting.tasks.scan_tasks.full_enrichment", max_retries=2, default_retry_delay=60, autoretry_for=(Exception,), retry_backoff=True, ) def full_enrichment(self, job_id: int, prospect_id: int): """Run full enrichment pipeline for a single prospect.""" with self.get_db() as db: job = db.query(ProspectScanJob).filter(ProspectScanJob.id == job_id).first() if not job: logger.error("Scan job %d not found", job_id) return job.celery_task_id = self.request.id job.status = "running" job.started_at = datetime.now(UTC) job.total_items = 1 db.flush() try: from app.modules.prospecting.services.enrichment_service import ( enrichment_service, ) from app.modules.prospecting.services.prospect_service import ( prospect_service, ) from app.modules.prospecting.services.scoring_service import scoring_service prospect = prospect_service.get_by_id(db, prospect_id) # HTTP check enrichment_service.check_http(db, prospect) # Tech + Performance + Contacts (if has website) if prospect.has_website: enrichment_service.scan_tech_stack(db, prospect) enrichment_service.scan_performance(db, prospect) enrichment_service.scrape_contacts(db, prospect) # Score db.refresh(prospect) scoring_service.compute_score(db, prospect) job.processed_items = 1 job.status = "completed" job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - background task owns transaction except Exception as e: logger.error("full_enrichment job %d failed: %s", job_id, e, exc_info=True) job.status = "failed" job.error_log = str(e)[:500] job.completed_at = datetime.now(UTC) db.commit() # SVC-006 - persist failure status raise