Files
orion/app/tasks/dispatcher.py
Samir Boulahtit 2792414395 feat: add Celery/Redis task queue with feature flag support
Migrate background tasks from FastAPI BackgroundTasks to Celery with Redis
for persistent task queuing, retries, and scheduled jobs.

Key changes:
- Add Celery configuration with Redis broker/backend
- Create task dispatcher with USE_CELERY feature flag for gradual rollout
- Add Celery task wrappers for all background operations:
  - Marketplace imports
  - Letzshop historical imports
  - Product exports
  - Code quality scans
  - Test runs
  - Subscription scheduled tasks (via Celery Beat)
- Add celery_task_id column to job tables for Flower integration
- Add Flower dashboard link to admin background tasks page
- Update docker-compose.yml with worker, beat, and flower services
- Add Makefile targets: celery-worker, celery-beat, celery-dev, flower

When USE_CELERY=false (default), system falls back to FastAPI BackgroundTasks
for development without Redis dependency.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 17:35:16 +01:00

287 lines
8.9 KiB
Python

# app/tasks/dispatcher.py
"""
Task dispatcher with feature flag for gradual Celery migration.
This module provides a unified interface for dispatching background tasks.
Based on the USE_CELERY setting, tasks are either:
- Sent to Celery for persistent, reliable execution
- Run via FastAPI BackgroundTasks (fire-and-forget)
This allows for gradual rollout and instant rollback.
"""
import logging
from typing import Any
from fastapi import BackgroundTasks
from app.core.config import settings
logger = logging.getLogger(__name__)
class TaskDispatcher:
"""
Dispatches tasks to either Celery or FastAPI BackgroundTasks.
Usage:
from app.tasks.dispatcher import task_dispatcher
# In an API endpoint:
task_id = task_dispatcher.dispatch_marketplace_import(
background_tasks=background_tasks,
job_id=job.id,
url=url,
marketplace=marketplace,
vendor_id=vendor_id,
)
"""
@property
def use_celery(self) -> bool:
"""Check if Celery is enabled."""
return settings.use_celery
def dispatch_marketplace_import(
self,
background_tasks: BackgroundTasks,
job_id: int,
url: str,
marketplace: str,
vendor_id: int,
batch_size: int = 1000,
language: str = "en",
) -> str | None:
"""
Dispatch marketplace import task.
Args:
background_tasks: FastAPI BackgroundTasks instance
job_id: ID of the MarketplaceImportJob record
url: URL to the CSV file
marketplace: Name of the marketplace
vendor_id: ID of the vendor
batch_size: Number of rows per batch
language: Language code for translations
Returns:
str | None: Celery task ID if using Celery, None otherwise
"""
if self.use_celery:
from app.tasks.celery_tasks.marketplace import process_marketplace_import
task = process_marketplace_import.delay(
job_id=job_id,
url=url,
marketplace=marketplace,
vendor_id=vendor_id,
batch_size=batch_size,
language=language,
)
logger.info(f"Dispatched marketplace import to Celery: task_id={task.id}")
return task.id
else:
from app.tasks.background_tasks import process_marketplace_import
background_tasks.add_task(
process_marketplace_import,
job_id=job_id,
url=url,
marketplace=marketplace,
vendor_id=vendor_id,
batch_size=batch_size,
language=language,
)
logger.info("Dispatched marketplace import to BackgroundTasks")
return None
def dispatch_historical_import(
self,
background_tasks: BackgroundTasks,
job_id: int,
vendor_id: int,
) -> str | None:
"""
Dispatch Letzshop historical import task.
Args:
background_tasks: FastAPI BackgroundTasks instance
job_id: ID of the LetzshopHistoricalImportJob record
vendor_id: ID of the vendor
Returns:
str | None: Celery task ID if using Celery, None otherwise
"""
if self.use_celery:
from app.tasks.celery_tasks.letzshop import process_historical_import
task = process_historical_import.delay(job_id=job_id, vendor_id=vendor_id)
logger.info(f"Dispatched historical import to Celery: task_id={task.id}")
return task.id
else:
from app.tasks.letzshop_tasks import process_historical_import
background_tasks.add_task(
process_historical_import,
job_id=job_id,
vendor_id=vendor_id,
)
logger.info("Dispatched historical import to BackgroundTasks")
return None
def dispatch_code_quality_scan(
self,
background_tasks: BackgroundTasks,
scan_id: int,
) -> str | None:
"""
Dispatch code quality scan task.
Args:
background_tasks: FastAPI BackgroundTasks instance
scan_id: ID of the ArchitectureScan record
Returns:
str | None: Celery task ID if using Celery, None otherwise
"""
if self.use_celery:
from app.tasks.celery_tasks.code_quality import execute_code_quality_scan
task = execute_code_quality_scan.delay(scan_id=scan_id)
logger.info(f"Dispatched code quality scan to Celery: task_id={task.id}")
return task.id
else:
from app.tasks.code_quality_tasks import execute_code_quality_scan
background_tasks.add_task(execute_code_quality_scan, scan_id=scan_id)
logger.info("Dispatched code quality scan to BackgroundTasks")
return None
def dispatch_test_run(
self,
background_tasks: BackgroundTasks,
run_id: int,
test_path: str = "tests",
extra_args: list[str] | None = None,
) -> str | None:
"""
Dispatch test run task.
Args:
background_tasks: FastAPI BackgroundTasks instance
run_id: ID of the TestRun record
test_path: Path to tests
extra_args: Additional pytest arguments
Returns:
str | None: Celery task ID if using Celery, None otherwise
"""
if self.use_celery:
from app.tasks.celery_tasks.test_runner import execute_test_run
task = execute_test_run.delay(
run_id=run_id,
test_path=test_path,
extra_args=extra_args,
)
logger.info(f"Dispatched test run to Celery: task_id={task.id}")
return task.id
else:
from app.tasks.test_runner_tasks import execute_test_run
background_tasks.add_task(
execute_test_run,
run_id=run_id,
test_path=test_path,
extra_args=extra_args,
)
logger.info("Dispatched test run to BackgroundTasks")
return None
def dispatch_product_export(
self,
vendor_id: int,
triggered_by: str,
include_inactive: bool = False,
) -> str | None:
"""
Dispatch product export task (Celery only).
This task is only available via Celery as it's designed for
asynchronous batch exports. For synchronous exports, use
the export service directly.
Args:
vendor_id: ID of the vendor to export
triggered_by: User identifier
include_inactive: Whether to include inactive products
Returns:
str | None: Celery task ID if using Celery, None otherwise
"""
if self.use_celery:
from app.tasks.celery_tasks.export import export_vendor_products_to_folder
task = export_vendor_products_to_folder.delay(
vendor_id=vendor_id,
triggered_by=triggered_by,
include_inactive=include_inactive,
)
logger.info(f"Dispatched product export to Celery: task_id={task.id}")
return task.id
else:
logger.warning(
"Product export task requires Celery. "
"Use letzshop_export_service directly for synchronous export."
)
return None
def get_task_status(self, task_id: str) -> dict[str, Any]:
"""
Get the status of a Celery task.
Args:
task_id: Celery task ID
Returns:
dict: Task status info including state and result
"""
if not self.use_celery:
return {"error": "Celery not enabled"}
from app.core.celery_config import celery_app
result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"state": result.state,
"ready": result.ready(),
"successful": result.successful() if result.ready() else None,
"result": result.result if result.ready() else None,
}
def revoke_task(self, task_id: str, terminate: bool = False) -> bool:
"""
Revoke (cancel) a Celery task.
Args:
task_id: Celery task ID to revoke
terminate: If True, terminate running task; if False, just prevent execution
Returns:
bool: True if revocation was sent
"""
if not self.use_celery:
logger.warning("Cannot revoke task: Celery not enabled")
return False
from app.core.celery_config import celery_app
celery_app.control.revoke(task_id, terminate=terminate)
logger.info(f"Revoked Celery task: task_id={task_id}, terminate={terminate}")
return True
# Singleton instance
task_dispatcher = TaskDispatcher()