Files
orion/app/modules/task_base.py
Samir Boulahtit 3e650ff863
Some checks failed
CI / ruff (push) Successful in 18s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / pytest (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
fix(task-base): on_failure logging crashes on reserved LogRecord keys
When a Celery task failed, on_failure passed extra={"args": ..., "kwargs": ...}
to logger.error. Python's logging.makeRecord rejects any extra key that
collides with a built-in LogRecord attribute, and "args" is one (used for
printf formatting). The KeyError raised inside the error handler then
cascaded through Celery's trace.handle_failure, masking the real task
exception entirely.

Rename the keys to task_args / task_kwargs.

Surfaced while debugging why the loyalty welcome email never got sent —
the underlying task was failing, but the on_failure handler crashed before
logging the real cause, leaving nothing in worker logs to investigate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 22:45:22 +02:00

173 lines
4.8 KiB
Python

# app/modules/task_base.py
"""
Base Celery task class for module tasks.
Provides a ModuleTask base class that handles:
- Database session lifecycle (create/close)
- Context manager pattern for session usage
- Proper cleanup on task completion or failure
- Structured logging with task context
This is the standard base class for all Celery tasks defined within modules.
It replaces the legacy app/tasks/celery_tasks/base.py for new module tasks.
Usage:
from app.core.celery_config import celery_app
from app.modules.task_base import ModuleTask
@celery_app.task(bind=True, base=ModuleTask)
def my_task(self, arg1, arg2):
with self.get_db() as db:
# Use db session
result = db.query(Model).filter(...).all()
return result
"""
import logging
from contextlib import contextmanager
from typing import Any
from celery import Task
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
logger = logging.getLogger(__name__)
class ModuleTask(Task):
"""
Base Celery task with database session management.
Provides:
- Database session context manager
- Automatic session cleanup on success/failure
- Structured logging for task lifecycle events
- Retry tracking
All module tasks should use this as their base class for consistent
behavior and proper resource management.
Example:
@celery_app.task(bind=True, base=ModuleTask)
def process_subscription(self, subscription_id: int):
with self.get_db() as db:
subscription = db.query(Subscription).get(subscription_id)
# Process subscription...
return {"status": "processed"}
"""
# Mark as abstract so Celery doesn't register this as a task
abstract = True
@contextmanager
def get_db(self) -> Session:
"""
Context manager for database session.
Yields a database session and ensures proper cleanup on both
success and failure. Commits on success, rolls back on error.
Yields:
Session: SQLAlchemy database session
Example:
with self.get_db() as db:
store = db.query(Store).filter(Store.id == store_id).first()
store.status = "active"
# Session commits automatically on exit
"""
db = SessionLocal()
try:
yield db
db.commit()
except Exception as e:
logger.error(
f"Database error in task {self.name}: {e}",
extra={
"task_name": self.name,
"task_id": getattr(self.request, "id", None),
"error": str(e),
},
)
db.rollback()
raise
finally:
db.close()
def on_failure(
self,
exc: Exception,
task_id: str,
args: tuple,
kwargs: dict,
einfo: Any,
) -> None:
"""
Called when task fails.
Logs the failure with structured context for debugging and monitoring.
"""
logger.error(
f"Task {self.name}[{task_id}] failed: {exc}",
extra={
"task_name": self.name,
"task_id": task_id,
# NOT "args"/"kwargs" — both clash with reserved LogRecord
# attribute names and cause logging.makeRecord to raise
# KeyError, which then masks the real task failure.
"task_args": args,
"task_kwargs": kwargs,
"exception": str(exc),
"traceback": str(einfo),
},
exc_info=True,
)
def on_success(
self,
retval: Any,
task_id: str,
args: tuple,
kwargs: dict,
) -> None:
"""
Called when task succeeds.
Logs successful completion with task context.
"""
logger.info(
f"Task {self.name}[{task_id}] completed successfully",
extra={
"task_name": self.name,
"task_id": task_id,
},
)
def on_retry(
self,
exc: Exception,
task_id: str,
args: tuple,
kwargs: dict,
einfo: Any,
) -> None:
"""
Called when task is being retried.
Logs retry attempt with reason and retry count.
"""
retry_count = getattr(self.request, "retries", 0)
logger.warning(
f"Task {self.name}[{task_id}] retrying (attempt {retry_count + 1}): {exc}",
extra={
"task_name": self.name,
"task_id": task_id,
"retry_count": retry_count,
"exception": str(exc),
},
)
__all__ = ["ModuleTask"]