fix(celery): bind @shared_task to our app so async dispatch reaches redis
Some checks failed
CI / ruff (push) Successful in 19s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled

When the FastAPI process called send_notification_email.delay() it
crashed with kombu OperationalError "[Errno 111] Connection refused"
because the published task was going to amqp://localhost// instead
of our configured redis broker.

Root cause: tasks decorated with @shared_task bind to whatever
Celery considers the "current default app" at decoration time. Our
celery_app (with redis broker) was never imported during FastAPI
startup, so the loyalty / billing / etc. task modules registered
their @shared_task functions against Celery's built-in default —
which has broker_url=amqp://localhost// and no RabbitMQ deployed.

Two-part fix:

  1. app/core/celery_config.py — call celery_app.set_default() so
     any @shared_task evaluated after this module loads binds to our
     app rather than the built-in default.

  2. main.py — import app.core.celery_config near the top, BEFORE
     api_router (which transitively imports the task modules). The
     side-effect of the import (creating celery_app + set_default())
     must run before any @shared_task decorator fires. An # isort:
     split below the import stops the import-sorter from alphabetising
     it back behind app.api.main and re-introducing the bug.

User-visible effect: loyalty welcome emails (and every other
@shared_task-based notification) now actually queue + send.
Surfaced this while debugging why enrollment on prod produced a
loyalty_card row but no email_logs row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-17 22:26:33 +02:00
parent 120532e63f
commit 44c4290916
2 changed files with 16 additions and 0 deletions

View File

@@ -72,6 +72,12 @@ celery_app = Celery(
include=get_all_task_modules(),
)
# Mark this as the default app so @shared_task decorators in module task
# files bind to it. Otherwise they fall back to Celery's built-in default,
# which uses amqp://localhost// and silently fails with "Connection refused"
# at .delay() time when no RabbitMQ is deployed.
celery_app.set_default()
# =============================================================================
# CELERY CONFIGURATION
# =============================================================================

10
main.py
View File

@@ -32,6 +32,16 @@ from sqlalchemy import text
from sqlalchemy.orm import Session
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
# Bootstrap Celery default app BEFORE importing api_router, which
# transitively imports loyalty/billing/etc. task modules. Their @shared_task
# decorators bind at import time to whatever Celery considers the current
# default app — without our app.set_default() running first, they fall back
# to Celery's built-in default (amqp://localhost//) and .delay() raises
# ConnectionRefusedError at runtime. The isort: split below stops ruff from
# alphabetically reordering this back behind api_router.
from app.core import celery_config # noqa: F401 # side-effect: set_default()
# isort: split
from app.api.main import api_router
from app.core.config import settings