Files
orion/app/core/celery_config.py
Samir Boulahtit b9a998fb43
Some checks failed
CI / ruff (push) Has been cancelled
CI / pytest (push) Has been cancelled
CI / architecture (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / audit (push) Has been cancelled
CI / docs (push) Has been cancelled
fix(celery): remove stale legacy task module references
2026-02-11 22:58:10 +01:00

191 lines
6.8 KiB
Python

# app/core/celery_config.py
"""
Celery configuration for Wizamart background task processing.
This module configures Celery with Redis as the broker and result backend.
It includes:
- Task routing to separate queues (default, long_running, scheduled)
- Celery Beat schedule for periodic tasks
- Task retry policies
- Sentry integration for error tracking
- Module-based task discovery (discovers tasks from app/modules/*/tasks/)
Task Discovery:
- Legacy tasks: Explicitly listed in the 'include' parameter
- Module tasks: Auto-discovered via discover_module_tasks()
As modules are migrated, their tasks will move from the legacy include list
to automatic discovery from the module's tasks/ directory.
"""
import logging
import os
import sentry_sdk
from celery import Celery
from celery.schedules import crontab
from sentry_sdk.integrations.celery import CeleryIntegration
logger = logging.getLogger(__name__)
# Redis URL from environment or default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# =============================================================================
# SENTRY INITIALIZATION FOR CELERY WORKERS
# =============================================================================
# Celery workers run in separate processes, so Sentry must be initialized here too
SENTRY_DSN = os.getenv("SENTRY_DSN")
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
environment=os.getenv("SENTRY_ENVIRONMENT", "development"),
traces_sample_rate=float(os.getenv("SENTRY_TRACES_SAMPLE_RATE", "0.1")),
integrations=[CeleryIntegration()],
send_default_pii=True,
)
# =============================================================================
# TASK DISCOVERY
# =============================================================================
# Legacy tasks (will be migrated to modules over time)
# MIGRATION STATUS:
# - subscription: MIGRATED to billing module (kept for capture_capacity_snapshot -> monitoring)
# - marketplace, letzshop, export: MIGRATED to marketplace module
# - code_quality, test_runner: Will migrate to dev-tools module
LEGACY_TASK_MODULES: list[str] = [
# All legacy tasks have been migrated to their respective modules.
# Task discovery now happens via app.modules.tasks.discover_module_tasks()
]
def get_all_task_modules() -> list[str]:
"""
Get all task modules (legacy + module-based).
Returns:
Combined list of legacy task modules and discovered module tasks
"""
all_modules = list(LEGACY_TASK_MODULES)
try:
from app.modules.tasks import discover_module_tasks
module_tasks = discover_module_tasks()
all_modules.extend(module_tasks)
logger.info(f"Discovered {len(module_tasks)} module task packages")
except ImportError as e:
logger.warning(f"Could not import module task discovery: {e}")
except Exception as e:
logger.error(f"Error discovering module tasks: {e}")
return all_modules
# Create Celery application
celery_app = Celery(
"wizamart",
broker=REDIS_URL,
backend=REDIS_URL,
include=get_all_task_modules(),
)
# =============================================================================
# CELERY CONFIGURATION
# =============================================================================
celery_app.conf.update(
# Serialization
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# Timezone
timezone="Europe/Luxembourg",
enable_utc=True,
# Broker connection
broker_connection_retry_on_startup=True, # Retry connection on startup (Celery 6.0+)
# Task behavior
task_track_started=True,
task_time_limit=30 * 60, # 30 minutes hard limit
task_soft_time_limit=25 * 60, # 25 minutes soft limit
# Worker settings
worker_prefetch_multiplier=1, # Disable prefetching for long tasks
worker_concurrency=4, # Number of concurrent workers
# Result backend
result_expires=86400, # Results expire after 24 hours
# Retry policy
task_default_retry_delay=60, # 1 minute between retries
task_max_retries=3,
# Task events (required for Flower monitoring)
worker_send_task_events=True,
task_send_sent_event=True,
)
# =============================================================================
# TASK ROUTING - Route tasks to appropriate queues
# =============================================================================
celery_app.conf.task_routes = {
# Long-running import tasks
"app.tasks.celery_tasks.marketplace.*": {"queue": "long_running"},
"app.tasks.celery_tasks.letzshop.*": {"queue": "long_running"},
# Fast export tasks
"app.tasks.celery_tasks.export.*": {"queue": "default"},
# Scheduled subscription tasks
"app.tasks.celery_tasks.subscription.*": {"queue": "scheduled"},
# Code quality and test tasks (can be long)
"app.tasks.celery_tasks.code_quality.*": {"queue": "long_running"},
"app.tasks.celery_tasks.test_runner.*": {"queue": "long_running"},
}
# =============================================================================
# CELERY BEAT SCHEDULE - Periodic tasks
# =============================================================================
# Legacy scheduled tasks (will be migrated to module definitions)
# NOTE: Subscription tasks have been migrated to billing module (see definition.py)
LEGACY_BEAT_SCHEDULE = {
# Capacity snapshot - will be migrated to monitoring module
"capture-capacity-snapshot-daily": {
"task": "app.tasks.celery_tasks.subscription.capture_capacity_snapshot",
"schedule": crontab(hour=0, minute=0), # Midnight daily
"options": {"queue": "scheduled"},
},
}
def get_full_beat_schedule() -> dict:
"""
Get complete beat schedule (legacy + module-defined).
Returns:
Merged beat schedule from legacy tasks and module definitions
"""
schedule = dict(LEGACY_BEAT_SCHEDULE)
try:
from app.modules.tasks import build_beat_schedule
module_schedule = build_beat_schedule()
schedule.update(module_schedule)
if module_schedule:
logger.info(f"Added {len(module_schedule)} scheduled tasks from modules")
except ImportError as e:
logger.warning(f"Could not import module beat schedule builder: {e}")
except Exception as e:
logger.error(f"Error building module beat schedule: {e}")
return schedule
celery_app.conf.beat_schedule = get_full_beat_schedule()
# =============================================================================
# QUEUE CONFIGURATION
# =============================================================================
celery_app.conf.task_queues = {
"default": {"exchange": "default", "routing_key": "default"},
"long_running": {"exchange": "long_running", "routing_key": "long_running"},
"scheduled": {"exchange": "scheduled", "routing_key": "scheduled"},
}
celery_app.conf.task_default_queue = "default"