Some checks failed
Celery workers were failing every task that touched the DB with
InvalidRequestError: "expression 'ContentPage' failed to locate a name".
Root cause: the worker process only loads task modules during startup,
not routes or services. Models reached only via SQLAlchemy
string-based relationships (e.g. Platform.relationship("ContentPage"))
were never imported, so the mapper couldn't resolve the name when the
first task tried to open a DB session. The FastAPI process avoids this
because api_router transitively imports the world; the worker doesn't.
Add _preload_all_module_models() in celery_config.py that walks the
module registry and importlib.import_module's each "app.modules.<code>.models"
package. Called at module import time, before any task runs.
Surfaced while finally getting loyalty.send_notification_email registered
on the worker — the task ran, hit the DB to load email settings, and
exploded on the unresolved Platform -> ContentPage relationship.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
217 lines
7.6 KiB
Python
217 lines
7.6 KiB
Python
# app/core/celery_config.py
|
|
"""
|
|
Celery configuration for Orion background task processing.
|
|
|
|
This module configures Celery with Redis as the broker and result backend.
|
|
It includes:
|
|
- Task routing to separate queues (default, long_running, scheduled)
|
|
- Celery Beat schedule for periodic tasks
|
|
- Task retry policies
|
|
- Sentry integration for error tracking
|
|
- Module-based task discovery (discovers tasks from app/modules/*/tasks/)
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
|
|
import sentry_sdk
|
|
from celery import Celery
|
|
from celery.schedules import crontab
|
|
from sentry_sdk.integrations.celery import CeleryIntegration
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Redis URL from environment or default
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
|
|
|
|
# =============================================================================
|
|
# SENTRY INITIALIZATION FOR CELERY WORKERS
|
|
# =============================================================================
|
|
# Celery workers run in separate processes, so Sentry must be initialized here too
|
|
SENTRY_DSN = os.getenv("SENTRY_DSN")
|
|
if SENTRY_DSN:
|
|
sentry_sdk.init(
|
|
dsn=SENTRY_DSN,
|
|
environment=os.getenv("SENTRY_ENVIRONMENT", "development"),
|
|
traces_sample_rate=float(os.getenv("SENTRY_TRACES_SAMPLE_RATE", "0.1")),
|
|
integrations=[CeleryIntegration()],
|
|
send_default_pii=True,
|
|
)
|
|
|
|
# =============================================================================
|
|
# TASK DISCOVERY
|
|
# =============================================================================
|
|
|
|
|
|
def get_all_task_modules() -> list[str]:
|
|
"""
|
|
Get all task modules via module-based discovery.
|
|
|
|
Returns:
|
|
List of discovered module task packages
|
|
"""
|
|
try:
|
|
from app.modules.tasks import discover_module_tasks
|
|
|
|
module_tasks = discover_module_tasks()
|
|
logger.info(f"Discovered {len(module_tasks)} module task packages")
|
|
return module_tasks
|
|
except ImportError as e:
|
|
logger.warning(f"Could not import module task discovery: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error discovering module tasks: {e}")
|
|
|
|
return []
|
|
|
|
|
|
def _preload_all_module_models() -> None:
|
|
"""
|
|
Import every module's models package up front.
|
|
|
|
The Celery worker process loads task modules but not routes/services,
|
|
so models reached only via SQLAlchemy string-based relationships
|
|
(e.g. Platform -> "ContentPage") never get imported. The first DB
|
|
query then explodes with InvalidRequestError when the mapper tries
|
|
to resolve the unknown name. The FastAPI process avoids this because
|
|
api_router transitively imports everything; we replicate that effect
|
|
here for the worker.
|
|
"""
|
|
import importlib
|
|
|
|
try:
|
|
from app.modules.registry import MODULES
|
|
except ImportError as e:
|
|
logger.warning(f"Could not load module registry for model preload: {e}")
|
|
return
|
|
|
|
loaded = 0
|
|
for code in MODULES:
|
|
module_path = f"app.modules.{code}.models"
|
|
try:
|
|
importlib.import_module(module_path)
|
|
loaded += 1
|
|
except ModuleNotFoundError:
|
|
# Module doesn't expose a models package — fine, skip.
|
|
continue
|
|
except Exception as e:
|
|
logger.warning(f"Failed preloading {module_path}: {e}")
|
|
|
|
logger.info(f"Preloaded models for {loaded} modules (SQLAlchemy mapper resolution)")
|
|
|
|
|
|
_preload_all_module_models()
|
|
|
|
|
|
# Create Celery application
|
|
celery_app = Celery(
|
|
"orion",
|
|
broker=REDIS_URL,
|
|
backend=REDIS_URL,
|
|
include=get_all_task_modules(),
|
|
)
|
|
|
|
# Mark this as the default app so @shared_task decorators in module task
|
|
# files bind to it. Otherwise they fall back to Celery's built-in default,
|
|
# which uses amqp://localhost// and silently fails with "Connection refused"
|
|
# at .delay() time when no RabbitMQ is deployed.
|
|
celery_app.set_default()
|
|
|
|
# =============================================================================
|
|
# CELERY CONFIGURATION
|
|
# =============================================================================
|
|
celery_app.conf.update(
|
|
# Serialization
|
|
task_serializer="json",
|
|
accept_content=["json"],
|
|
result_serializer="json",
|
|
# Timezone
|
|
timezone="Europe/Luxembourg",
|
|
enable_utc=True,
|
|
# Broker connection
|
|
broker_connection_retry_on_startup=True, # Retry connection on startup (Celery 6.0+)
|
|
# Task behavior
|
|
task_track_started=True,
|
|
task_time_limit=30 * 60, # 30 minutes hard limit
|
|
task_soft_time_limit=25 * 60, # 25 minutes soft limit
|
|
# Worker settings
|
|
worker_prefetch_multiplier=1, # Disable prefetching for long tasks
|
|
worker_concurrency=2, # Keep low on 4GB servers to avoid OOM
|
|
# Result backend
|
|
result_expires=86400, # Results expire after 24 hours
|
|
# Retry policy
|
|
task_default_retry_delay=60, # 1 minute between retries
|
|
task_max_retries=3,
|
|
# Task events (required for Flower monitoring)
|
|
worker_send_task_events=True,
|
|
task_send_sent_event=True,
|
|
)
|
|
|
|
# =============================================================================
|
|
# TASK ROUTING - Route tasks to appropriate queues
|
|
# =============================================================================
|
|
celery_app.conf.task_routes = {
|
|
# Long-running import tasks
|
|
"app.tasks.celery_tasks.marketplace.*": {"queue": "long_running"},
|
|
"app.tasks.celery_tasks.letzshop.*": {"queue": "long_running"},
|
|
# Fast export tasks
|
|
"app.tasks.celery_tasks.export.*": {"queue": "default"},
|
|
# Scheduled subscription tasks
|
|
"app.tasks.celery_tasks.subscription.*": {"queue": "scheduled"},
|
|
# Code quality and test tasks (can be long)
|
|
"app.tasks.celery_tasks.code_quality.*": {"queue": "long_running"},
|
|
"app.tasks.celery_tasks.test_runner.*": {"queue": "long_running"},
|
|
}
|
|
|
|
# =============================================================================
|
|
# CELERY BEAT SCHEDULE - Periodic tasks
|
|
# =============================================================================
|
|
|
|
# Legacy scheduled tasks (will be migrated to module definitions)
|
|
# NOTE: Subscription tasks have been migrated to billing module (see definition.py)
|
|
LEGACY_BEAT_SCHEDULE = {
|
|
# Capacity snapshot - will be migrated to monitoring module
|
|
"capture-capacity-snapshot-daily": {
|
|
"task": "app.tasks.celery_tasks.subscription.capture_capacity_snapshot",
|
|
"schedule": crontab(hour=0, minute=0), # Midnight daily
|
|
"options": {"queue": "scheduled"},
|
|
},
|
|
}
|
|
|
|
|
|
def get_full_beat_schedule() -> dict:
|
|
"""
|
|
Get complete beat schedule (legacy + module-defined).
|
|
|
|
Returns:
|
|
Merged beat schedule from legacy tasks and module definitions
|
|
"""
|
|
schedule = dict(LEGACY_BEAT_SCHEDULE)
|
|
|
|
try:
|
|
from app.modules.tasks import build_beat_schedule
|
|
|
|
module_schedule = build_beat_schedule()
|
|
schedule.update(module_schedule)
|
|
if module_schedule:
|
|
logger.info(f"Added {len(module_schedule)} scheduled tasks from modules")
|
|
except ImportError as e:
|
|
logger.warning(f"Could not import module beat schedule builder: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error building module beat schedule: {e}")
|
|
|
|
return schedule
|
|
|
|
|
|
celery_app.conf.beat_schedule = get_full_beat_schedule()
|
|
|
|
# =============================================================================
|
|
# QUEUE CONFIGURATION
|
|
# =============================================================================
|
|
celery_app.conf.task_queues = {
|
|
"default": {"exchange": "default", "routing_key": "default"},
|
|
"long_running": {"exchange": "long_running", "routing_key": "long_running"},
|
|
"scheduled": {"exchange": "scheduled", "routing_key": "scheduled"},
|
|
}
|
|
|
|
celery_app.conf.task_default_queue = "default"
|