From 44c4290916aa5a626d1f39192c584a97f8080f53 Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Sun, 17 May 2026 22:26:33 +0200 Subject: [PATCH] fix(celery): bind @shared_task to our app so async dispatch reaches redis MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the FastAPI process called send_notification_email.delay() it crashed with kombu OperationalError "[Errno 111] Connection refused" because the published task was going to amqp://localhost// instead of our configured redis broker. Root cause: tasks decorated with @shared_task bind to whatever Celery considers the "current default app" at decoration time. Our celery_app (with redis broker) was never imported during FastAPI startup, so the loyalty / billing / etc. task modules registered their @shared_task functions against Celery's built-in default — which has broker_url=amqp://localhost// and no RabbitMQ deployed. Two-part fix: 1. app/core/celery_config.py — call celery_app.set_default() so any @shared_task evaluated after this module loads binds to our app rather than the built-in default. 2. main.py — import app.core.celery_config near the top, BEFORE api_router (which transitively imports the task modules). The side-effect of the import (creating celery_app + set_default()) must run before any @shared_task decorator fires. An # isort: split below the import stops the import-sorter from alphabetising it back behind app.api.main and re-introducing the bug. User-visible effect: loyalty welcome emails (and every other @shared_task-based notification) now actually queue + send. Surfaced this while debugging why enrollment on prod produced a loyalty_card row but no email_logs row. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/core/celery_config.py | 6 ++++++ main.py | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/app/core/celery_config.py b/app/core/celery_config.py index 53adb750..58214124 100644 --- a/app/core/celery_config.py +++ b/app/core/celery_config.py @@ -72,6 +72,12 @@ celery_app = Celery( include=get_all_task_modules(), ) +# Mark this as the default app so @shared_task decorators in module task +# files bind to it. Otherwise they fall back to Celery's built-in default, +# which uses amqp://localhost// and silently fails with "Connection refused" +# at .delay() time when no RabbitMQ is deployed. +celery_app.set_default() + # ============================================================================= # CELERY CONFIGURATION # ============================================================================= diff --git a/main.py b/main.py index cad60d4a..51e5d76b 100644 --- a/main.py +++ b/main.py @@ -32,6 +32,16 @@ from sqlalchemy import text from sqlalchemy.orm import Session from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware +# Bootstrap Celery default app BEFORE importing api_router, which +# transitively imports loyalty/billing/etc. task modules. Their @shared_task +# decorators bind at import time to whatever Celery considers the current +# default app — without our app.set_default() running first, they fall back +# to Celery's built-in default (amqp://localhost//) and .delay() raises +# ConnectionRefusedError at runtime. The isort: split below stops ruff from +# alphabetically reordering this back behind api_router. +from app.core import celery_config # noqa: F401 # side-effect: set_default() + +# isort: split from app.api.main import api_router from app.core.config import settings