feat: add Celery task infrastructure for module system

Phase 4 of module migration plan:
- Add ScheduledTask dataclass for declaring Celery Beat tasks
- Add tasks_path and scheduled_tasks fields to ModuleDefinition
- Create ModuleTask base class with database session management
- Create task discovery utilities (discover_module_tasks, build_beat_schedule)
- Update celery_config.py to discover and register module tasks
- Maintain backward compatibility with legacy task modules

Modules can now define tasks in their tasks/ directory and scheduled
tasks in their definition. The infrastructure supports gradual migration
of existing tasks from app/tasks/ to their respective modules.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-27 22:52:01 +01:00
parent 7dbdbd4c7e
commit f1f91abe51
5 changed files with 556 additions and 11 deletions

View File

@@ -8,8 +8,17 @@ It includes:
- Celery Beat schedule for periodic tasks
- Task retry policies
- Sentry integration for error tracking
- Module-based task discovery (discovers tasks from app/modules/*/tasks/)
Task Discovery:
- Legacy tasks: Explicitly listed in the 'include' parameter
- Module tasks: Auto-discovered via discover_module_tasks()
As modules are migrated, their tasks will move from the legacy include list
to automatic discovery from the module's tasks/ directory.
"""
import logging
import os
import sentry_sdk
@@ -17,6 +26,8 @@ from celery import Celery
from celery.schedules import crontab
from sentry_sdk.integrations.celery import CeleryIntegration
logger = logging.getLogger(__name__)
# Redis URL from environment or default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
@@ -34,19 +45,49 @@ if SENTRY_DSN:
send_default_pii=True,
)
# =============================================================================
# TASK DISCOVERY
# =============================================================================
# Legacy tasks (will be migrated to modules over time)
LEGACY_TASK_MODULES = [
"app.tasks.celery_tasks.marketplace",
"app.tasks.celery_tasks.letzshop",
"app.tasks.celery_tasks.subscription",
"app.tasks.celery_tasks.export",
"app.tasks.celery_tasks.code_quality",
"app.tasks.celery_tasks.test_runner",
]
def get_all_task_modules() -> list[str]:
"""
Get all task modules (legacy + module-based).
Returns:
Combined list of legacy task modules and discovered module tasks
"""
all_modules = list(LEGACY_TASK_MODULES)
try:
from app.modules.tasks import discover_module_tasks
module_tasks = discover_module_tasks()
all_modules.extend(module_tasks)
logger.info(f"Discovered {len(module_tasks)} module task packages")
except ImportError as e:
logger.warning(f"Could not import module task discovery: {e}")
except Exception as e:
logger.error(f"Error discovering module tasks: {e}")
return all_modules
# Create Celery application
celery_app = Celery(
"wizamart",
broker=REDIS_URL,
backend=REDIS_URL,
include=[
"app.tasks.celery_tasks.marketplace",
"app.tasks.celery_tasks.letzshop",
"app.tasks.celery_tasks.subscription",
"app.tasks.celery_tasks.export",
"app.tasks.celery_tasks.code_quality",
"app.tasks.celery_tasks.test_runner",
],
include=get_all_task_modules(),
)
# =============================================================================
@@ -98,7 +139,9 @@ celery_app.conf.task_routes = {
# =============================================================================
# CELERY BEAT SCHEDULE - Periodic tasks
# =============================================================================
celery_app.conf.beat_schedule = {
# Legacy scheduled tasks (will be migrated to module definitions)
LEGACY_BEAT_SCHEDULE = {
# Reset usage counters at start of each period
"reset-period-counters-daily": {
"task": "app.tasks.celery_tasks.subscription.reset_period_counters",
@@ -131,6 +174,33 @@ celery_app.conf.beat_schedule = {
},
}
def get_full_beat_schedule() -> dict:
"""
Get complete beat schedule (legacy + module-defined).
Returns:
Merged beat schedule from legacy tasks and module definitions
"""
schedule = dict(LEGACY_BEAT_SCHEDULE)
try:
from app.modules.tasks import build_beat_schedule
module_schedule = build_beat_schedule()
schedule.update(module_schedule)
if module_schedule:
logger.info(f"Added {len(module_schedule)} scheduled tasks from modules")
except ImportError as e:
logger.warning(f"Could not import module beat schedule builder: {e}")
except Exception as e:
logger.error(f"Error building module beat schedule: {e}")
return schedule
celery_app.conf.beat_schedule = get_full_beat_schedule()
# =============================================================================
# QUEUE CONFIGURATION
# =============================================================================

View File

@@ -20,6 +20,7 @@ Module Hierarchy:
└── Modules (Enabled features - Billing, Marketplace, Inventory, etc.)
├── Routes (API + Page routes)
├── Services (Business logic)
├── Tasks (Celery background jobs)
├── Menu Items (Sidebar entries)
├── Templates (UI components)
└── Migrations (Module-specific)
@@ -52,7 +53,14 @@ Usage:
print(f"Module {data.module_code} enabled")
"""
from app.modules.base import ModuleDefinition
from app.modules.base import ModuleDefinition, ScheduledTask
from app.modules.task_base import ModuleTask, DatabaseTask
from app.modules.tasks import (
discover_module_tasks,
build_beat_schedule,
parse_schedule,
get_module_task_routes,
)
from app.modules.registry import (
MODULES,
CORE_MODULES,
@@ -76,6 +84,14 @@ from app.modules.events import (
__all__ = [
# Core types
"ModuleDefinition",
"ScheduledTask",
# Task support
"ModuleTask",
"DatabaseTask",
"discover_module_tasks",
"build_beat_schedule",
"parse_schedule",
"get_module_task_routes",
# Module dictionaries
"MODULES",
"CORE_MODULES",

View File

@@ -26,6 +26,8 @@ Self-Contained Module Structure:
├── exceptions.py # Module-specific exceptions (optional)
├── routes/ # FastAPI routers
├── services/ # Business logic
├── tasks/ # Celery background tasks (optional)
│ └── __init__.py # Task module discovery marker
├── models/ # SQLAlchemy models
├── schemas/ # Pydantic schemas
├── migrations/ # Alembic migrations for this module
@@ -45,6 +47,41 @@ if TYPE_CHECKING:
from models.database.admin_menu_config import FrontendType
@dataclass
class ScheduledTask:
"""
Definition of a Celery Beat scheduled task.
Used in ModuleDefinition to declare scheduled tasks that should be
registered with Celery Beat when the module is loaded.
Attributes:
name: Unique name for the schedule entry (e.g., "billing.reset_counters")
task: Full Python path to the task (e.g., "app.modules.billing.tasks.subscription.reset_period_counters")
schedule: Cron expression string or crontab dict
- String format: "minute hour day_of_month month day_of_week" (e.g., "5 0 * * *")
- Dict format: {"minute": 5, "hour": 0} for crontab kwargs
args: Positional arguments to pass to the task
kwargs: Keyword arguments to pass to the task
options: Celery task options (e.g., {"queue": "scheduled"})
Example:
ScheduledTask(
name="billing.reset_period_counters",
task="app.modules.billing.tasks.subscription.reset_period_counters",
schedule="5 0 * * *", # Daily at 00:05
options={"queue": "scheduled"},
)
"""
name: str
task: str
schedule: str | dict[str, Any]
args: tuple = ()
kwargs: dict[str, Any] = field(default_factory=dict)
options: dict[str, Any] = field(default_factory=dict)
@dataclass
class ModuleDefinition:
"""
@@ -191,6 +228,12 @@ class ModuleDefinition:
locales_path: str | None = None # Relative to module directory
migrations_path: str | None = None # Relative to module directory, e.g., "migrations"
# =========================================================================
# Celery Tasks (optional)
# =========================================================================
tasks_path: str | None = None # Python import path, e.g., "app.modules.billing.tasks"
scheduled_tasks: list[ScheduledTask] = field(default_factory=list)
# =========================================================================
# Menu Item Methods
# =========================================================================
@@ -355,7 +398,7 @@ class ModuleDefinition:
Get the Python import path for a module component.
Args:
component: One of "services", "models", "schemas", "exceptions"
component: One of "services", "models", "schemas", "exceptions", "tasks"
Returns:
Import path string, or None if not configured
@@ -365,9 +408,51 @@ class ModuleDefinition:
"models": self.models_path,
"schemas": self.schemas_path,
"exceptions": self.exceptions_path,
"tasks": self.tasks_path,
}
return paths.get(component)
def get_tasks_module(self) -> str | None:
"""
Get the Python import path for this module's tasks.
Returns the explicitly configured tasks_path, or infers it from
the module code if the module is self-contained.
Returns:
Import path string (e.g., "app.modules.billing.tasks"), or None
"""
if self.tasks_path:
return self.tasks_path
if self.is_self_contained:
dir_name = self.code.replace("-", "_")
return f"app.modules.{dir_name}.tasks"
return None
def get_tasks_dir(self) -> Path | None:
"""
Get the filesystem path to this module's tasks directory.
Returns:
Path to tasks directory, or None if not configured
"""
tasks_module = self.get_tasks_module()
if not tasks_module:
return None
return self.get_module_dir() / "tasks"
def has_tasks(self) -> bool:
"""
Check if this module has a tasks directory.
Returns:
True if tasks directory exists and contains __init__.py
"""
tasks_dir = self.get_tasks_dir()
if not tasks_dir:
return False
return tasks_dir.exists() and (tasks_dir / "__init__.py").exists()
def validate_structure(self) -> list[str]:
"""
Validate that self-contained module has expected directory structure.

172
app/modules/task_base.py Normal file
View File

@@ -0,0 +1,172 @@
# app/modules/task_base.py
"""
Base Celery task class for module tasks.
Provides a ModuleTask base class that handles:
- Database session lifecycle (create/close)
- Context manager pattern for session usage
- Proper cleanup on task completion or failure
- Structured logging with task context
This is the standard base class for all Celery tasks defined within modules.
It replaces the legacy app/tasks/celery_tasks/base.py for new module tasks.
Usage:
from app.core.celery_config import celery_app
from app.modules.task_base import ModuleTask
@celery_app.task(bind=True, base=ModuleTask)
def my_task(self, arg1, arg2):
with self.get_db() as db:
# Use db session
result = db.query(Model).filter(...).all()
return result
"""
import logging
from contextlib import contextmanager
from typing import Any
from celery import Task
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
logger = logging.getLogger(__name__)
class ModuleTask(Task):
"""
Base Celery task with database session management.
Provides:
- Database session context manager
- Automatic session cleanup on success/failure
- Structured logging for task lifecycle events
- Retry tracking
All module tasks should use this as their base class for consistent
behavior and proper resource management.
Example:
@celery_app.task(bind=True, base=ModuleTask)
def process_subscription(self, subscription_id: int):
with self.get_db() as db:
subscription = db.query(Subscription).get(subscription_id)
# Process subscription...
return {"status": "processed"}
"""
# Mark as abstract so Celery doesn't register this as a task
abstract = True
@contextmanager
def get_db(self) -> Session:
"""
Context manager for database session.
Yields a database session and ensures proper cleanup on both
success and failure. Commits on success, rolls back on error.
Yields:
Session: SQLAlchemy database session
Example:
with self.get_db() as db:
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
vendor.status = "active"
# Session commits automatically on exit
"""
db = SessionLocal()
try:
yield db
db.commit()
except Exception as e:
logger.error(
f"Database error in task {self.name}: {e}",
extra={
"task_name": self.name,
"task_id": getattr(self.request, "id", None),
"error": str(e),
},
)
db.rollback()
raise
finally:
db.close()
def on_failure(
self,
exc: Exception,
task_id: str,
args: tuple,
kwargs: dict,
einfo: Any,
) -> None:
"""
Called when task fails.
Logs the failure with structured context for debugging and monitoring.
"""
logger.error(
f"Task {self.name}[{task_id}] failed: {exc}",
extra={
"task_name": self.name,
"task_id": task_id,
"args": args,
"kwargs": kwargs,
"exception": str(exc),
"traceback": str(einfo),
},
exc_info=True,
)
def on_success(
self,
retval: Any,
task_id: str,
args: tuple,
kwargs: dict,
) -> None:
"""
Called when task succeeds.
Logs successful completion with task context.
"""
logger.info(
f"Task {self.name}[{task_id}] completed successfully",
extra={
"task_name": self.name,
"task_id": task_id,
},
)
def on_retry(
self,
exc: Exception,
task_id: str,
args: tuple,
kwargs: dict,
einfo: Any,
) -> None:
"""
Called when task is being retried.
Logs retry attempt with reason and retry count.
"""
retry_count = getattr(self.request, "retries", 0)
logger.warning(
f"Task {self.name}[{task_id}] retrying (attempt {retry_count + 1}): {exc}",
extra={
"task_name": self.name,
"task_id": task_id,
"retry_count": retry_count,
"exception": str(exc),
},
)
# Alias for backward compatibility and clarity
DatabaseTask = ModuleTask
__all__ = ["ModuleTask", "DatabaseTask"]

202
app/modules/tasks.py Normal file
View File

@@ -0,0 +1,202 @@
# app/modules/tasks.py
"""
Module task discovery for Celery.
Provides utilities to:
- Discover task modules from all registered modules
- Build Celery Beat schedules from module definitions
- Parse cron expressions into Celery crontab objects
This module bridges the gap between the module system and Celery,
allowing tasks to be defined within modules and automatically
discovered by Celery.
Usage:
# In celery_config.py
from app.modules.tasks import discover_module_tasks, build_beat_schedule
# Auto-discover tasks from modules
celery_app.autodiscover_tasks(discover_module_tasks())
# Or add to include list
celery_app.conf.include.extend(discover_module_tasks())
# Build beat schedule from module definitions
celery_app.conf.beat_schedule.update(build_beat_schedule())
"""
import logging
from typing import Any
from celery.schedules import crontab
logger = logging.getLogger(__name__)
def discover_module_tasks() -> list[str]:
"""
Discover task modules from all registered modules.
Scans all modules in the registry and returns import paths for
modules that have a tasks directory with an __init__.py file.
Returns:
List of task module import paths (e.g., ["app.modules.billing.tasks"])
Example:
>>> discover_module_tasks()
['app.modules.billing.tasks', 'app.modules.marketplace.tasks']
"""
# Import here to avoid circular imports
from app.modules.registry import MODULES
task_modules = []
for module in MODULES.values():
tasks_module = module.get_tasks_module()
if tasks_module and module.has_tasks():
task_modules.append(tasks_module)
logger.debug(f"Discovered tasks module: {tasks_module}")
logger.info(f"Discovered {len(task_modules)} module task packages")
return task_modules
def build_beat_schedule() -> dict[str, dict[str, Any]]:
"""
Build Celery Beat schedule from module scheduled_tasks.
Iterates through all modules and collects their scheduled_tasks,
converting them into Celery Beat schedule format.
Returns:
Dict suitable for celery_app.conf.beat_schedule
Example:
>>> schedule = build_beat_schedule()
>>> schedule
{
'billing.reset_period_counters': {
'task': 'app.modules.billing.tasks.subscription.reset_period_counters',
'schedule': crontab(hour=0, minute=5),
'args': (),
'kwargs': {},
'options': {'queue': 'scheduled'},
},
...
}
"""
# Import here to avoid circular imports
from app.modules.registry import MODULES
schedule: dict[str, dict[str, Any]] = {}
for module in MODULES.values():
for task in module.scheduled_tasks:
schedule[task.name] = {
"task": task.task,
"schedule": parse_schedule(task.schedule),
"args": task.args,
"kwargs": task.kwargs,
"options": task.options,
}
logger.debug(f"Added scheduled task: {task.name} -> {task.task}")
logger.info(f"Built beat schedule with {len(schedule)} tasks from modules")
return schedule
def parse_schedule(schedule: str | dict[str, Any]) -> crontab:
"""
Parse schedule string or dict into Celery crontab.
Supports two formats:
1. Cron string: "minute hour day_of_month month day_of_week"
Example: "5 0 * * *" (daily at 00:05)
2. Dict with crontab kwargs:
Example: {"minute": 5, "hour": 0}
Args:
schedule: Cron expression string or crontab kwargs dict
Returns:
Celery crontab object
Raises:
ValueError: If schedule format is invalid
Examples:
>>> parse_schedule("5 0 * * *")
<crontab: 5 0 * * * (m/h/dM/MY/d)>
>>> parse_schedule({"minute": 30, "hour": "*/2"})
<crontab: 30 */2 * * * (m/h/dM/MY/d)>
>>> parse_schedule({"minute": 0, "hour": 3, "day_of_week": 0})
<crontab: 0 3 * * 0 (m/h/dM/MY/d)>
"""
if isinstance(schedule, dict):
return crontab(**schedule)
if isinstance(schedule, str):
parts = schedule.split()
if len(parts) == 5:
return crontab(
minute=parts[0],
hour=parts[1],
day_of_month=parts[2],
month_of_year=parts[3],
day_of_week=parts[4],
)
raise ValueError(
f"Invalid cron schedule format: '{schedule}'. "
"Expected 5 space-separated fields: minute hour day_of_month month day_of_week"
)
raise ValueError(
f"Invalid schedule type: {type(schedule).__name__}. "
"Expected str (cron expression) or dict (crontab kwargs)"
)
def get_module_task_routes() -> dict[str, dict[str, str]]:
"""
Build task routing configuration from modules.
Creates routing rules that direct module tasks to appropriate queues
based on module configuration.
Returns:
Dict suitable for celery_app.conf.task_routes
Note:
This provides default routing. Modules can specify queue preferences
in their scheduled_tasks options, which take precedence.
"""
# Import here to avoid circular imports
from app.modules.registry import MODULES
routes: dict[str, dict[str, str]] = {}
for module in MODULES.values():
tasks_module = module.get_tasks_module()
if tasks_module and module.has_tasks():
# Default routing based on module type
if module.is_internal:
# Internal modules (dev-tools, monitoring) use long_running queue
queue = "long_running"
else:
# Default queue for most modules
queue = "default"
routes[f"{tasks_module}.*"] = {"queue": queue}
return routes
__all__ = [
"discover_module_tasks",
"build_beat_schedule",
"parse_schedule",
"get_module_task_routes",
]