diff --git a/app/core/celery_config.py b/app/core/celery_config.py index 91109eea..3fb3d916 100644 --- a/app/core/celery_config.py +++ b/app/core/celery_config.py @@ -8,8 +8,17 @@ It includes: - Celery Beat schedule for periodic tasks - Task retry policies - Sentry integration for error tracking +- Module-based task discovery (discovers tasks from app/modules/*/tasks/) + +Task Discovery: +- Legacy tasks: Explicitly listed in the 'include' parameter +- Module tasks: Auto-discovered via discover_module_tasks() + +As modules are migrated, their tasks will move from the legacy include list +to automatic discovery from the module's tasks/ directory. """ +import logging import os import sentry_sdk @@ -17,6 +26,8 @@ from celery import Celery from celery.schedules import crontab from sentry_sdk.integrations.celery import CeleryIntegration +logger = logging.getLogger(__name__) + # Redis URL from environment or default REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") @@ -34,19 +45,49 @@ if SENTRY_DSN: send_default_pii=True, ) +# ============================================================================= +# TASK DISCOVERY +# ============================================================================= +# Legacy tasks (will be migrated to modules over time) +LEGACY_TASK_MODULES = [ + "app.tasks.celery_tasks.marketplace", + "app.tasks.celery_tasks.letzshop", + "app.tasks.celery_tasks.subscription", + "app.tasks.celery_tasks.export", + "app.tasks.celery_tasks.code_quality", + "app.tasks.celery_tasks.test_runner", +] + + +def get_all_task_modules() -> list[str]: + """ + Get all task modules (legacy + module-based). + + Returns: + Combined list of legacy task modules and discovered module tasks + """ + all_modules = list(LEGACY_TASK_MODULES) + + try: + from app.modules.tasks import discover_module_tasks + + module_tasks = discover_module_tasks() + all_modules.extend(module_tasks) + logger.info(f"Discovered {len(module_tasks)} module task packages") + except ImportError as e: + logger.warning(f"Could not import module task discovery: {e}") + except Exception as e: + logger.error(f"Error discovering module tasks: {e}") + + return all_modules + + # Create Celery application celery_app = Celery( "wizamart", broker=REDIS_URL, backend=REDIS_URL, - include=[ - "app.tasks.celery_tasks.marketplace", - "app.tasks.celery_tasks.letzshop", - "app.tasks.celery_tasks.subscription", - "app.tasks.celery_tasks.export", - "app.tasks.celery_tasks.code_quality", - "app.tasks.celery_tasks.test_runner", - ], + include=get_all_task_modules(), ) # ============================================================================= @@ -98,7 +139,9 @@ celery_app.conf.task_routes = { # ============================================================================= # CELERY BEAT SCHEDULE - Periodic tasks # ============================================================================= -celery_app.conf.beat_schedule = { + +# Legacy scheduled tasks (will be migrated to module definitions) +LEGACY_BEAT_SCHEDULE = { # Reset usage counters at start of each period "reset-period-counters-daily": { "task": "app.tasks.celery_tasks.subscription.reset_period_counters", @@ -131,6 +174,33 @@ celery_app.conf.beat_schedule = { }, } + +def get_full_beat_schedule() -> dict: + """ + Get complete beat schedule (legacy + module-defined). + + Returns: + Merged beat schedule from legacy tasks and module definitions + """ + schedule = dict(LEGACY_BEAT_SCHEDULE) + + try: + from app.modules.tasks import build_beat_schedule + + module_schedule = build_beat_schedule() + schedule.update(module_schedule) + if module_schedule: + logger.info(f"Added {len(module_schedule)} scheduled tasks from modules") + except ImportError as e: + logger.warning(f"Could not import module beat schedule builder: {e}") + except Exception as e: + logger.error(f"Error building module beat schedule: {e}") + + return schedule + + +celery_app.conf.beat_schedule = get_full_beat_schedule() + # ============================================================================= # QUEUE CONFIGURATION # ============================================================================= diff --git a/app/modules/__init__.py b/app/modules/__init__.py index b7941263..6023549f 100644 --- a/app/modules/__init__.py +++ b/app/modules/__init__.py @@ -20,6 +20,7 @@ Module Hierarchy: └── Modules (Enabled features - Billing, Marketplace, Inventory, etc.) ├── Routes (API + Page routes) ├── Services (Business logic) + ├── Tasks (Celery background jobs) ├── Menu Items (Sidebar entries) ├── Templates (UI components) └── Migrations (Module-specific) @@ -52,7 +53,14 @@ Usage: print(f"Module {data.module_code} enabled") """ -from app.modules.base import ModuleDefinition +from app.modules.base import ModuleDefinition, ScheduledTask +from app.modules.task_base import ModuleTask, DatabaseTask +from app.modules.tasks import ( + discover_module_tasks, + build_beat_schedule, + parse_schedule, + get_module_task_routes, +) from app.modules.registry import ( MODULES, CORE_MODULES, @@ -76,6 +84,14 @@ from app.modules.events import ( __all__ = [ # Core types "ModuleDefinition", + "ScheduledTask", + # Task support + "ModuleTask", + "DatabaseTask", + "discover_module_tasks", + "build_beat_schedule", + "parse_schedule", + "get_module_task_routes", # Module dictionaries "MODULES", "CORE_MODULES", diff --git a/app/modules/base.py b/app/modules/base.py index 233f6ed4..1108315a 100644 --- a/app/modules/base.py +++ b/app/modules/base.py @@ -26,6 +26,8 @@ Self-Contained Module Structure: ├── exceptions.py # Module-specific exceptions (optional) ├── routes/ # FastAPI routers ├── services/ # Business logic + ├── tasks/ # Celery background tasks (optional) + │ └── __init__.py # Task module discovery marker ├── models/ # SQLAlchemy models ├── schemas/ # Pydantic schemas ├── migrations/ # Alembic migrations for this module @@ -45,6 +47,41 @@ if TYPE_CHECKING: from models.database.admin_menu_config import FrontendType +@dataclass +class ScheduledTask: + """ + Definition of a Celery Beat scheduled task. + + Used in ModuleDefinition to declare scheduled tasks that should be + registered with Celery Beat when the module is loaded. + + Attributes: + name: Unique name for the schedule entry (e.g., "billing.reset_counters") + task: Full Python path to the task (e.g., "app.modules.billing.tasks.subscription.reset_period_counters") + schedule: Cron expression string or crontab dict + - String format: "minute hour day_of_month month day_of_week" (e.g., "5 0 * * *") + - Dict format: {"minute": 5, "hour": 0} for crontab kwargs + args: Positional arguments to pass to the task + kwargs: Keyword arguments to pass to the task + options: Celery task options (e.g., {"queue": "scheduled"}) + + Example: + ScheduledTask( + name="billing.reset_period_counters", + task="app.modules.billing.tasks.subscription.reset_period_counters", + schedule="5 0 * * *", # Daily at 00:05 + options={"queue": "scheduled"}, + ) + """ + + name: str + task: str + schedule: str | dict[str, Any] + args: tuple = () + kwargs: dict[str, Any] = field(default_factory=dict) + options: dict[str, Any] = field(default_factory=dict) + + @dataclass class ModuleDefinition: """ @@ -191,6 +228,12 @@ class ModuleDefinition: locales_path: str | None = None # Relative to module directory migrations_path: str | None = None # Relative to module directory, e.g., "migrations" + # ========================================================================= + # Celery Tasks (optional) + # ========================================================================= + tasks_path: str | None = None # Python import path, e.g., "app.modules.billing.tasks" + scheduled_tasks: list[ScheduledTask] = field(default_factory=list) + # ========================================================================= # Menu Item Methods # ========================================================================= @@ -355,7 +398,7 @@ class ModuleDefinition: Get the Python import path for a module component. Args: - component: One of "services", "models", "schemas", "exceptions" + component: One of "services", "models", "schemas", "exceptions", "tasks" Returns: Import path string, or None if not configured @@ -365,9 +408,51 @@ class ModuleDefinition: "models": self.models_path, "schemas": self.schemas_path, "exceptions": self.exceptions_path, + "tasks": self.tasks_path, } return paths.get(component) + def get_tasks_module(self) -> str | None: + """ + Get the Python import path for this module's tasks. + + Returns the explicitly configured tasks_path, or infers it from + the module code if the module is self-contained. + + Returns: + Import path string (e.g., "app.modules.billing.tasks"), or None + """ + if self.tasks_path: + return self.tasks_path + if self.is_self_contained: + dir_name = self.code.replace("-", "_") + return f"app.modules.{dir_name}.tasks" + return None + + def get_tasks_dir(self) -> Path | None: + """ + Get the filesystem path to this module's tasks directory. + + Returns: + Path to tasks directory, or None if not configured + """ + tasks_module = self.get_tasks_module() + if not tasks_module: + return None + return self.get_module_dir() / "tasks" + + def has_tasks(self) -> bool: + """ + Check if this module has a tasks directory. + + Returns: + True if tasks directory exists and contains __init__.py + """ + tasks_dir = self.get_tasks_dir() + if not tasks_dir: + return False + return tasks_dir.exists() and (tasks_dir / "__init__.py").exists() + def validate_structure(self) -> list[str]: """ Validate that self-contained module has expected directory structure. diff --git a/app/modules/task_base.py b/app/modules/task_base.py new file mode 100644 index 00000000..92dcf5cd --- /dev/null +++ b/app/modules/task_base.py @@ -0,0 +1,172 @@ +# app/modules/task_base.py +""" +Base Celery task class for module tasks. + +Provides a ModuleTask base class that handles: +- Database session lifecycle (create/close) +- Context manager pattern for session usage +- Proper cleanup on task completion or failure +- Structured logging with task context + +This is the standard base class for all Celery tasks defined within modules. +It replaces the legacy app/tasks/celery_tasks/base.py for new module tasks. + +Usage: + from app.core.celery_config import celery_app + from app.modules.task_base import ModuleTask + + @celery_app.task(bind=True, base=ModuleTask) + def my_task(self, arg1, arg2): + with self.get_db() as db: + # Use db session + result = db.query(Model).filter(...).all() + return result +""" + +import logging +from contextlib import contextmanager +from typing import Any + +from celery import Task +from sqlalchemy.orm import Session + +from app.core.database import SessionLocal + +logger = logging.getLogger(__name__) + + +class ModuleTask(Task): + """ + Base Celery task with database session management. + + Provides: + - Database session context manager + - Automatic session cleanup on success/failure + - Structured logging for task lifecycle events + - Retry tracking + + All module tasks should use this as their base class for consistent + behavior and proper resource management. + + Example: + @celery_app.task(bind=True, base=ModuleTask) + def process_subscription(self, subscription_id: int): + with self.get_db() as db: + subscription = db.query(Subscription).get(subscription_id) + # Process subscription... + return {"status": "processed"} + """ + + # Mark as abstract so Celery doesn't register this as a task + abstract = True + + @contextmanager + def get_db(self) -> Session: + """ + Context manager for database session. + + Yields a database session and ensures proper cleanup on both + success and failure. Commits on success, rolls back on error. + + Yields: + Session: SQLAlchemy database session + + Example: + with self.get_db() as db: + vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() + vendor.status = "active" + # Session commits automatically on exit + """ + db = SessionLocal() + try: + yield db + db.commit() + except Exception as e: + logger.error( + f"Database error in task {self.name}: {e}", + extra={ + "task_name": self.name, + "task_id": getattr(self.request, "id", None), + "error": str(e), + }, + ) + db.rollback() + raise + finally: + db.close() + + def on_failure( + self, + exc: Exception, + task_id: str, + args: tuple, + kwargs: dict, + einfo: Any, + ) -> None: + """ + Called when task fails. + + Logs the failure with structured context for debugging and monitoring. + """ + logger.error( + f"Task {self.name}[{task_id}] failed: {exc}", + extra={ + "task_name": self.name, + "task_id": task_id, + "args": args, + "kwargs": kwargs, + "exception": str(exc), + "traceback": str(einfo), + }, + exc_info=True, + ) + + def on_success( + self, + retval: Any, + task_id: str, + args: tuple, + kwargs: dict, + ) -> None: + """ + Called when task succeeds. + + Logs successful completion with task context. + """ + logger.info( + f"Task {self.name}[{task_id}] completed successfully", + extra={ + "task_name": self.name, + "task_id": task_id, + }, + ) + + def on_retry( + self, + exc: Exception, + task_id: str, + args: tuple, + kwargs: dict, + einfo: Any, + ) -> None: + """ + Called when task is being retried. + + Logs retry attempt with reason and retry count. + """ + retry_count = getattr(self.request, "retries", 0) + logger.warning( + f"Task {self.name}[{task_id}] retrying (attempt {retry_count + 1}): {exc}", + extra={ + "task_name": self.name, + "task_id": task_id, + "retry_count": retry_count, + "exception": str(exc), + }, + ) + + +# Alias for backward compatibility and clarity +DatabaseTask = ModuleTask + +__all__ = ["ModuleTask", "DatabaseTask"] diff --git a/app/modules/tasks.py b/app/modules/tasks.py new file mode 100644 index 00000000..e51f3d8b --- /dev/null +++ b/app/modules/tasks.py @@ -0,0 +1,202 @@ +# app/modules/tasks.py +""" +Module task discovery for Celery. + +Provides utilities to: +- Discover task modules from all registered modules +- Build Celery Beat schedules from module definitions +- Parse cron expressions into Celery crontab objects + +This module bridges the gap between the module system and Celery, +allowing tasks to be defined within modules and automatically +discovered by Celery. + +Usage: + # In celery_config.py + from app.modules.tasks import discover_module_tasks, build_beat_schedule + + # Auto-discover tasks from modules + celery_app.autodiscover_tasks(discover_module_tasks()) + + # Or add to include list + celery_app.conf.include.extend(discover_module_tasks()) + + # Build beat schedule from module definitions + celery_app.conf.beat_schedule.update(build_beat_schedule()) +""" + +import logging +from typing import Any + +from celery.schedules import crontab + +logger = logging.getLogger(__name__) + + +def discover_module_tasks() -> list[str]: + """ + Discover task modules from all registered modules. + + Scans all modules in the registry and returns import paths for + modules that have a tasks directory with an __init__.py file. + + Returns: + List of task module import paths (e.g., ["app.modules.billing.tasks"]) + + Example: + >>> discover_module_tasks() + ['app.modules.billing.tasks', 'app.modules.marketplace.tasks'] + """ + # Import here to avoid circular imports + from app.modules.registry import MODULES + + task_modules = [] + + for module in MODULES.values(): + tasks_module = module.get_tasks_module() + if tasks_module and module.has_tasks(): + task_modules.append(tasks_module) + logger.debug(f"Discovered tasks module: {tasks_module}") + + logger.info(f"Discovered {len(task_modules)} module task packages") + return task_modules + + +def build_beat_schedule() -> dict[str, dict[str, Any]]: + """ + Build Celery Beat schedule from module scheduled_tasks. + + Iterates through all modules and collects their scheduled_tasks, + converting them into Celery Beat schedule format. + + Returns: + Dict suitable for celery_app.conf.beat_schedule + + Example: + >>> schedule = build_beat_schedule() + >>> schedule + { + 'billing.reset_period_counters': { + 'task': 'app.modules.billing.tasks.subscription.reset_period_counters', + 'schedule': crontab(hour=0, minute=5), + 'args': (), + 'kwargs': {}, + 'options': {'queue': 'scheduled'}, + }, + ... + } + """ + # Import here to avoid circular imports + from app.modules.registry import MODULES + + schedule: dict[str, dict[str, Any]] = {} + + for module in MODULES.values(): + for task in module.scheduled_tasks: + schedule[task.name] = { + "task": task.task, + "schedule": parse_schedule(task.schedule), + "args": task.args, + "kwargs": task.kwargs, + "options": task.options, + } + logger.debug(f"Added scheduled task: {task.name} -> {task.task}") + + logger.info(f"Built beat schedule with {len(schedule)} tasks from modules") + return schedule + + +def parse_schedule(schedule: str | dict[str, Any]) -> crontab: + """ + Parse schedule string or dict into Celery crontab. + + Supports two formats: + 1. Cron string: "minute hour day_of_month month day_of_week" + Example: "5 0 * * *" (daily at 00:05) + 2. Dict with crontab kwargs: + Example: {"minute": 5, "hour": 0} + + Args: + schedule: Cron expression string or crontab kwargs dict + + Returns: + Celery crontab object + + Raises: + ValueError: If schedule format is invalid + + Examples: + >>> parse_schedule("5 0 * * *") + + + >>> parse_schedule({"minute": 30, "hour": "*/2"}) + + + >>> parse_schedule({"minute": 0, "hour": 3, "day_of_week": 0}) + + """ + if isinstance(schedule, dict): + return crontab(**schedule) + + if isinstance(schedule, str): + parts = schedule.split() + if len(parts) == 5: + return crontab( + minute=parts[0], + hour=parts[1], + day_of_month=parts[2], + month_of_year=parts[3], + day_of_week=parts[4], + ) + raise ValueError( + f"Invalid cron schedule format: '{schedule}'. " + "Expected 5 space-separated fields: minute hour day_of_month month day_of_week" + ) + + raise ValueError( + f"Invalid schedule type: {type(schedule).__name__}. " + "Expected str (cron expression) or dict (crontab kwargs)" + ) + + +def get_module_task_routes() -> dict[str, dict[str, str]]: + """ + Build task routing configuration from modules. + + Creates routing rules that direct module tasks to appropriate queues + based on module configuration. + + Returns: + Dict suitable for celery_app.conf.task_routes + + Note: + This provides default routing. Modules can specify queue preferences + in their scheduled_tasks options, which take precedence. + """ + # Import here to avoid circular imports + from app.modules.registry import MODULES + + routes: dict[str, dict[str, str]] = {} + + for module in MODULES.values(): + tasks_module = module.get_tasks_module() + if tasks_module and module.has_tasks(): + # Default routing based on module type + if module.is_internal: + # Internal modules (dev-tools, monitoring) use long_running queue + queue = "long_running" + else: + # Default queue for most modules + queue = "default" + + routes[f"{tasks_module}.*"] = {"queue": queue} + + return routes + + +__all__ = [ + "discover_module_tasks", + "build_beat_schedule", + "parse_schedule", + "get_module_task_routes", +]