fix(celery): bind @shared_task to our app so async dispatch reaches redis
Some checks failed
Some checks failed
When the FastAPI process called send_notification_email.delay() it
crashed with kombu OperationalError "[Errno 111] Connection refused"
because the published task was going to amqp://localhost// instead
of our configured redis broker.
Root cause: tasks decorated with @shared_task bind to whatever
Celery considers the "current default app" at decoration time. Our
celery_app (with redis broker) was never imported during FastAPI
startup, so the loyalty / billing / etc. task modules registered
their @shared_task functions against Celery's built-in default —
which has broker_url=amqp://localhost// and no RabbitMQ deployed.
Two-part fix:
1. app/core/celery_config.py — call celery_app.set_default() so
any @shared_task evaluated after this module loads binds to our
app rather than the built-in default.
2. main.py — import app.core.celery_config near the top, BEFORE
api_router (which transitively imports the task modules). The
side-effect of the import (creating celery_app + set_default())
must run before any @shared_task decorator fires. An # isort:
split below the import stops the import-sorter from alphabetising
it back behind app.api.main and re-introducing the bug.
User-visible effect: loyalty welcome emails (and every other
@shared_task-based notification) now actually queue + send.
Surfaced this while debugging why enrollment on prod produced a
loyalty_card row but no email_logs row.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -72,6 +72,12 @@ celery_app = Celery(
|
||||
include=get_all_task_modules(),
|
||||
)
|
||||
|
||||
# Mark this as the default app so @shared_task decorators in module task
|
||||
# files bind to it. Otherwise they fall back to Celery's built-in default,
|
||||
# which uses amqp://localhost// and silently fails with "Connection refused"
|
||||
# at .delay() time when no RabbitMQ is deployed.
|
||||
celery_app.set_default()
|
||||
|
||||
# =============================================================================
|
||||
# CELERY CONFIGURATION
|
||||
# =============================================================================
|
||||
|
||||
10
main.py
10
main.py
@@ -32,6 +32,16 @@ from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
|
||||
|
||||
# Bootstrap Celery default app BEFORE importing api_router, which
|
||||
# transitively imports loyalty/billing/etc. task modules. Their @shared_task
|
||||
# decorators bind at import time to whatever Celery considers the current
|
||||
# default app — without our app.set_default() running first, they fall back
|
||||
# to Celery's built-in default (amqp://localhost//) and .delay() raises
|
||||
# ConnectionRefusedError at runtime. The isort: split below stops ruff from
|
||||
# alphabetically reordering this back behind api_router.
|
||||
from app.core import celery_config # noqa: F401 # side-effect: set_default()
|
||||
|
||||
# isort: split
|
||||
from app.api.main import api_router
|
||||
from app.core.config import settings
|
||||
|
||||
|
||||
Reference in New Issue
Block a user