Phase 4 of module migration plan: - Add ScheduledTask dataclass for declaring Celery Beat tasks - Add tasks_path and scheduled_tasks fields to ModuleDefinition - Create ModuleTask base class with database session management - Create task discovery utilities (discover_module_tasks, build_beat_schedule) - Update celery_config.py to discover and register module tasks - Maintain backward compatibility with legacy task modules Modules can now define tasks in their tasks/ directory and scheduled tasks in their definition. The infrastructure supports gradual migration of existing tasks from app/tasks/ to their respective modules. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
203 lines
5.9 KiB
Python
203 lines
5.9 KiB
Python
# app/modules/tasks.py
|
|
"""
|
|
Module task discovery for Celery.
|
|
|
|
Provides utilities to:
|
|
- Discover task modules from all registered modules
|
|
- Build Celery Beat schedules from module definitions
|
|
- Parse cron expressions into Celery crontab objects
|
|
|
|
This module bridges the gap between the module system and Celery,
|
|
allowing tasks to be defined within modules and automatically
|
|
discovered by Celery.
|
|
|
|
Usage:
|
|
# In celery_config.py
|
|
from app.modules.tasks import discover_module_tasks, build_beat_schedule
|
|
|
|
# Auto-discover tasks from modules
|
|
celery_app.autodiscover_tasks(discover_module_tasks())
|
|
|
|
# Or add to include list
|
|
celery_app.conf.include.extend(discover_module_tasks())
|
|
|
|
# Build beat schedule from module definitions
|
|
celery_app.conf.beat_schedule.update(build_beat_schedule())
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from celery.schedules import crontab
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def discover_module_tasks() -> list[str]:
|
|
"""
|
|
Discover task modules from all registered modules.
|
|
|
|
Scans all modules in the registry and returns import paths for
|
|
modules that have a tasks directory with an __init__.py file.
|
|
|
|
Returns:
|
|
List of task module import paths (e.g., ["app.modules.billing.tasks"])
|
|
|
|
Example:
|
|
>>> discover_module_tasks()
|
|
['app.modules.billing.tasks', 'app.modules.marketplace.tasks']
|
|
"""
|
|
# Import here to avoid circular imports
|
|
from app.modules.registry import MODULES
|
|
|
|
task_modules = []
|
|
|
|
for module in MODULES.values():
|
|
tasks_module = module.get_tasks_module()
|
|
if tasks_module and module.has_tasks():
|
|
task_modules.append(tasks_module)
|
|
logger.debug(f"Discovered tasks module: {tasks_module}")
|
|
|
|
logger.info(f"Discovered {len(task_modules)} module task packages")
|
|
return task_modules
|
|
|
|
|
|
def build_beat_schedule() -> dict[str, dict[str, Any]]:
|
|
"""
|
|
Build Celery Beat schedule from module scheduled_tasks.
|
|
|
|
Iterates through all modules and collects their scheduled_tasks,
|
|
converting them into Celery Beat schedule format.
|
|
|
|
Returns:
|
|
Dict suitable for celery_app.conf.beat_schedule
|
|
|
|
Example:
|
|
>>> schedule = build_beat_schedule()
|
|
>>> schedule
|
|
{
|
|
'billing.reset_period_counters': {
|
|
'task': 'app.modules.billing.tasks.subscription.reset_period_counters',
|
|
'schedule': crontab(hour=0, minute=5),
|
|
'args': (),
|
|
'kwargs': {},
|
|
'options': {'queue': 'scheduled'},
|
|
},
|
|
...
|
|
}
|
|
"""
|
|
# Import here to avoid circular imports
|
|
from app.modules.registry import MODULES
|
|
|
|
schedule: dict[str, dict[str, Any]] = {}
|
|
|
|
for module in MODULES.values():
|
|
for task in module.scheduled_tasks:
|
|
schedule[task.name] = {
|
|
"task": task.task,
|
|
"schedule": parse_schedule(task.schedule),
|
|
"args": task.args,
|
|
"kwargs": task.kwargs,
|
|
"options": task.options,
|
|
}
|
|
logger.debug(f"Added scheduled task: {task.name} -> {task.task}")
|
|
|
|
logger.info(f"Built beat schedule with {len(schedule)} tasks from modules")
|
|
return schedule
|
|
|
|
|
|
def parse_schedule(schedule: str | dict[str, Any]) -> crontab:
|
|
"""
|
|
Parse schedule string or dict into Celery crontab.
|
|
|
|
Supports two formats:
|
|
1. Cron string: "minute hour day_of_month month day_of_week"
|
|
Example: "5 0 * * *" (daily at 00:05)
|
|
2. Dict with crontab kwargs:
|
|
Example: {"minute": 5, "hour": 0}
|
|
|
|
Args:
|
|
schedule: Cron expression string or crontab kwargs dict
|
|
|
|
Returns:
|
|
Celery crontab object
|
|
|
|
Raises:
|
|
ValueError: If schedule format is invalid
|
|
|
|
Examples:
|
|
>>> parse_schedule("5 0 * * *")
|
|
<crontab: 5 0 * * * (m/h/dM/MY/d)>
|
|
|
|
>>> parse_schedule({"minute": 30, "hour": "*/2"})
|
|
<crontab: 30 */2 * * * (m/h/dM/MY/d)>
|
|
|
|
>>> parse_schedule({"minute": 0, "hour": 3, "day_of_week": 0})
|
|
<crontab: 0 3 * * 0 (m/h/dM/MY/d)>
|
|
"""
|
|
if isinstance(schedule, dict):
|
|
return crontab(**schedule)
|
|
|
|
if isinstance(schedule, str):
|
|
parts = schedule.split()
|
|
if len(parts) == 5:
|
|
return crontab(
|
|
minute=parts[0],
|
|
hour=parts[1],
|
|
day_of_month=parts[2],
|
|
month_of_year=parts[3],
|
|
day_of_week=parts[4],
|
|
)
|
|
raise ValueError(
|
|
f"Invalid cron schedule format: '{schedule}'. "
|
|
"Expected 5 space-separated fields: minute hour day_of_month month day_of_week"
|
|
)
|
|
|
|
raise ValueError(
|
|
f"Invalid schedule type: {type(schedule).__name__}. "
|
|
"Expected str (cron expression) or dict (crontab kwargs)"
|
|
)
|
|
|
|
|
|
def get_module_task_routes() -> dict[str, dict[str, str]]:
|
|
"""
|
|
Build task routing configuration from modules.
|
|
|
|
Creates routing rules that direct module tasks to appropriate queues
|
|
based on module configuration.
|
|
|
|
Returns:
|
|
Dict suitable for celery_app.conf.task_routes
|
|
|
|
Note:
|
|
This provides default routing. Modules can specify queue preferences
|
|
in their scheduled_tasks options, which take precedence.
|
|
"""
|
|
# Import here to avoid circular imports
|
|
from app.modules.registry import MODULES
|
|
|
|
routes: dict[str, dict[str, str]] = {}
|
|
|
|
for module in MODULES.values():
|
|
tasks_module = module.get_tasks_module()
|
|
if tasks_module and module.has_tasks():
|
|
# Default routing based on module type
|
|
if module.is_internal:
|
|
# Internal modules (dev-tools, monitoring) use long_running queue
|
|
queue = "long_running"
|
|
else:
|
|
# Default queue for most modules
|
|
queue = "default"
|
|
|
|
routes[f"{tasks_module}.*"] = {"queue": queue}
|
|
|
|
return routes
|
|
|
|
|
|
__all__ = [
|
|
"discover_module_tasks",
|
|
"build_beat_schedule",
|
|
"parse_schedule",
|
|
"get_module_task_routes",
|
|
]
|