Files
orion/app/core/celery_config.py
Samir Boulahtit e9253fbd84 refactor: rename Wizamart to Orion across entire codebase
Replace all ~1,086 occurrences of Wizamart/wizamart/WIZAMART/WizaMart
with Orion/orion/ORION across 184 files. This includes database
identifiers, email addresses, domain references, R2 bucket names,
DNS prefixes, encryption salt, Celery app name, config defaults,
Docker configs, CI configs, documentation, seed data, and templates.

Renames homepage-wizamart.html template to homepage-orion.html.
Fixes duplicate file_pattern key in api.yaml architecture rule.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 16:46:56 +01:00

173 lines
6.0 KiB
Python

# app/core/celery_config.py
"""
Celery configuration for Orion background task processing.
This module configures Celery with Redis as the broker and result backend.
It includes:
- Task routing to separate queues (default, long_running, scheduled)
- Celery Beat schedule for periodic tasks
- Task retry policies
- Sentry integration for error tracking
- Module-based task discovery (discovers tasks from app/modules/*/tasks/)
"""
import logging
import os
import sentry_sdk
from celery import Celery
from celery.schedules import crontab
from sentry_sdk.integrations.celery import CeleryIntegration
logger = logging.getLogger(__name__)
# Redis URL from environment or default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# =============================================================================
# SENTRY INITIALIZATION FOR CELERY WORKERS
# =============================================================================
# Celery workers run in separate processes, so Sentry must be initialized here too
SENTRY_DSN = os.getenv("SENTRY_DSN")
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
environment=os.getenv("SENTRY_ENVIRONMENT", "development"),
traces_sample_rate=float(os.getenv("SENTRY_TRACES_SAMPLE_RATE", "0.1")),
integrations=[CeleryIntegration()],
send_default_pii=True,
)
# =============================================================================
# TASK DISCOVERY
# =============================================================================
def get_all_task_modules() -> list[str]:
"""
Get all task modules via module-based discovery.
Returns:
List of discovered module task packages
"""
try:
from app.modules.tasks import discover_module_tasks
module_tasks = discover_module_tasks()
logger.info(f"Discovered {len(module_tasks)} module task packages")
return module_tasks
except ImportError as e:
logger.warning(f"Could not import module task discovery: {e}")
except Exception as e:
logger.error(f"Error discovering module tasks: {e}")
return []
# Create Celery application
celery_app = Celery(
"orion",
broker=REDIS_URL,
backend=REDIS_URL,
include=get_all_task_modules(),
)
# =============================================================================
# CELERY CONFIGURATION
# =============================================================================
celery_app.conf.update(
# Serialization
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# Timezone
timezone="Europe/Luxembourg",
enable_utc=True,
# Broker connection
broker_connection_retry_on_startup=True, # Retry connection on startup (Celery 6.0+)
# Task behavior
task_track_started=True,
task_time_limit=30 * 60, # 30 minutes hard limit
task_soft_time_limit=25 * 60, # 25 minutes soft limit
# Worker settings
worker_prefetch_multiplier=1, # Disable prefetching for long tasks
worker_concurrency=4, # Number of concurrent workers
# Result backend
result_expires=86400, # Results expire after 24 hours
# Retry policy
task_default_retry_delay=60, # 1 minute between retries
task_max_retries=3,
# Task events (required for Flower monitoring)
worker_send_task_events=True,
task_send_sent_event=True,
)
# =============================================================================
# TASK ROUTING - Route tasks to appropriate queues
# =============================================================================
celery_app.conf.task_routes = {
# Long-running import tasks
"app.tasks.celery_tasks.marketplace.*": {"queue": "long_running"},
"app.tasks.celery_tasks.letzshop.*": {"queue": "long_running"},
# Fast export tasks
"app.tasks.celery_tasks.export.*": {"queue": "default"},
# Scheduled subscription tasks
"app.tasks.celery_tasks.subscription.*": {"queue": "scheduled"},
# Code quality and test tasks (can be long)
"app.tasks.celery_tasks.code_quality.*": {"queue": "long_running"},
"app.tasks.celery_tasks.test_runner.*": {"queue": "long_running"},
}
# =============================================================================
# CELERY BEAT SCHEDULE - Periodic tasks
# =============================================================================
# Legacy scheduled tasks (will be migrated to module definitions)
# NOTE: Subscription tasks have been migrated to billing module (see definition.py)
LEGACY_BEAT_SCHEDULE = {
# Capacity snapshot - will be migrated to monitoring module
"capture-capacity-snapshot-daily": {
"task": "app.tasks.celery_tasks.subscription.capture_capacity_snapshot",
"schedule": crontab(hour=0, minute=0), # Midnight daily
"options": {"queue": "scheduled"},
},
}
def get_full_beat_schedule() -> dict:
"""
Get complete beat schedule (legacy + module-defined).
Returns:
Merged beat schedule from legacy tasks and module definitions
"""
schedule = dict(LEGACY_BEAT_SCHEDULE)
try:
from app.modules.tasks import build_beat_schedule
module_schedule = build_beat_schedule()
schedule.update(module_schedule)
if module_schedule:
logger.info(f"Added {len(module_schedule)} scheduled tasks from modules")
except ImportError as e:
logger.warning(f"Could not import module beat schedule builder: {e}")
except Exception as e:
logger.error(f"Error building module beat schedule: {e}")
return schedule
celery_app.conf.beat_schedule = get_full_beat_schedule()
# =============================================================================
# QUEUE CONFIGURATION
# =============================================================================
celery_app.conf.task_queues = {
"default": {"exchange": "default", "routing_key": "default"},
"long_running": {"exchange": "long_running", "routing_key": "long_running"},
"scheduled": {"exchange": "scheduled", "routing_key": "scheduled"},
}
celery_app.conf.task_default_queue = "default"