# app/tasks/dispatcher.py """ Task dispatcher for Celery background tasks. This module provides a unified interface for dispatching background tasks to Celery workers. All tasks are dispatched to their canonical locations in the respective modules. Module task locations: - Marketplace: app.modules.marketplace.tasks - Billing: app.modules.billing.tasks - Dev-Tools: app.modules.dev_tools.tasks - Monitoring: app.modules.monitoring.tasks """ import logging from typing import Any from app.core.config import settings logger = logging.getLogger(__name__) class TaskDispatcher: """ Dispatches tasks to Celery workers. Usage: from app.tasks.dispatcher import task_dispatcher # In an API endpoint: task_id = task_dispatcher.dispatch_marketplace_import( job_id=job.id, url=url, marketplace=marketplace, store_id=store_id, ) """ @property def use_celery(self) -> bool: """Check if Celery is enabled.""" return settings.use_celery def _require_celery(self, task_name: str) -> None: """Raise error if Celery is not enabled.""" if not self.use_celery: raise RuntimeError( f"Celery is required for {task_name}. " f"Set USE_CELERY=true in environment." ) def dispatch_marketplace_import( self, job_id: int, url: str, marketplace: str, store_id: int, batch_size: int = 1000, language: str = "en", ) -> str: """ Dispatch marketplace import task. Args: job_id: ID of the MarketplaceImportJob record url: URL to the CSV file marketplace: Name of the marketplace store_id: ID of the store batch_size: Number of rows per batch language: Language code for translations Returns: str: Celery task ID """ self._require_celery("marketplace import") from app.modules.marketplace.tasks import process_marketplace_import task = process_marketplace_import.delay( job_id=job_id, url=url, marketplace=marketplace, store_id=store_id, batch_size=batch_size, language=language, ) logger.info(f"Dispatched marketplace import to Celery: task_id={task.id}") return task.id def dispatch_historical_import( self, job_id: int, store_id: int, ) -> str: """ Dispatch Letzshop historical import task. Args: job_id: ID of the LetzshopHistoricalImportJob record store_id: ID of the store Returns: str: Celery task ID """ self._require_celery("historical import") from app.modules.marketplace.tasks import process_historical_import task = process_historical_import.delay(job_id=job_id, store_id=store_id) logger.info(f"Dispatched historical import to Celery: task_id={task.id}") return task.id def dispatch_code_quality_scan( self, scan_id: int, ) -> str: """ Dispatch code quality scan task. Args: scan_id: ID of the ArchitectureScan record Returns: str: Celery task ID """ self._require_celery("code quality scan") from app.modules.dev_tools.tasks import execute_code_quality_scan task = execute_code_quality_scan.delay(scan_id=scan_id) logger.info(f"Dispatched code quality scan to Celery: task_id={task.id}") return task.id def dispatch_test_run( self, run_id: int, test_path: str = "tests", extra_args: list[str] | None = None, ) -> str: """ Dispatch test run task. Args: run_id: ID of the TestRun record test_path: Path to tests extra_args: Additional pytest arguments Returns: str: Celery task ID """ self._require_celery("test run") from app.modules.dev_tools.tasks import execute_test_run task = execute_test_run.delay( run_id=run_id, test_path=test_path, extra_args=extra_args, ) logger.info(f"Dispatched test run to Celery: task_id={task.id}") return task.id def dispatch_product_export( self, store_id: int, triggered_by: str, include_inactive: bool = False, ) -> str: """ Dispatch product export task. Args: store_id: ID of the store to export triggered_by: User identifier include_inactive: Whether to include inactive products Returns: str: Celery task ID """ self._require_celery("product export") from app.modules.marketplace.tasks import export_store_products_to_folder task = export_store_products_to_folder.delay( store_id=store_id, triggered_by=triggered_by, include_inactive=include_inactive, ) logger.info(f"Dispatched product export to Celery: task_id={task.id}") return task.id def dispatch_capacity_snapshot(self) -> str: """ Dispatch capacity snapshot capture task. Returns: str: Celery task ID """ self._require_celery("capacity snapshot") from app.modules.monitoring.tasks import capture_capacity_snapshot task = capture_capacity_snapshot.delay() logger.info(f"Dispatched capacity snapshot to Celery: task_id={task.id}") return task.id def get_task_status(self, task_id: str) -> dict[str, Any]: """ Get the status of a Celery task. Args: task_id: Celery task ID Returns: dict: Task status info including state and result """ if not self.use_celery: return {"error": "Celery not enabled"} from app.core.celery_config import celery_app result = celery_app.AsyncResult(task_id) return { "task_id": task_id, "state": result.state, "ready": result.ready(), "successful": result.successful() if result.ready() else None, "result": result.result if result.ready() else None, } def revoke_task(self, task_id: str, terminate: bool = False) -> bool: """ Revoke (cancel) a Celery task. Args: task_id: Celery task ID to revoke terminate: If True, terminate running task; if False, just prevent execution Returns: bool: True if revocation was sent """ if not self.use_celery: logger.warning("Cannot revoke task: Celery not enabled") return False from app.core.celery_config import celery_app celery_app.control.revoke(task_id, terminate=terminate) logger.info(f"Revoked Celery task: task_id={task_id}, terminate={terminate}") return True # Singleton instance task_dispatcher = TaskDispatcher()