# app/modules/dev_tools/tasks/test_runner.py """ Celery tasks for test execution. Wraps the existing execute_test_run function for Celery execution. This is the canonical location - task is re-exported from the legacy location for backward compatibility. """ import logging from app.core.celery_config import celery_app from app.services.test_runner_service import test_runner_service from app.tasks.celery_tasks.base import DatabaseTask from app.modules.dev_tools.models import TestRun logger = logging.getLogger(__name__) @celery_app.task( bind=True, base=DatabaseTask, name="app.modules.dev_tools.tasks.test_runner.execute_test_run", max_retries=1, time_limit=3600, # 1 hour hard limit soft_time_limit=3300, # 55 minutes soft limit ) def execute_test_run( self, run_id: int, test_path: str = "tests", extra_args: list[str] | None = None, ): """ Celery task to execute pytest tests. Args: run_id: ID of the TestRun record test_path: Path to tests (relative to project root) extra_args: Additional pytest arguments Returns: dict: Test run results summary """ with self.get_db() as db: # Get the test run record test_run = db.query(TestRun).filter(TestRun.id == run_id).first() if not test_run: logger.error(f"Test run {run_id} not found") return {"error": f"Test run {run_id} not found"} # Store Celery task ID test_run.celery_task_id = self.request.id db.commit() try: logger.info(f"Starting test execution: Run {run_id}, Path: {test_path}") # Execute the tests test_runner_service._execute_tests(db, test_run, test_path, extra_args) db.commit() logger.info( f"Test run {run_id} completed: " f"status={test_run.status}, passed={test_run.passed}, " f"failed={test_run.failed}, duration={test_run.duration_seconds:.1f}s" ) return { "run_id": run_id, "status": test_run.status, "total_tests": test_run.total_tests, "passed": test_run.passed, "failed": test_run.failed, "errors": test_run.errors, "skipped": test_run.skipped, "coverage_percent": test_run.coverage_percent, "duration_seconds": test_run.duration_seconds, } except Exception as e: logger.error(f"Test run {run_id} failed: {e}", exc_info=True) test_run.status = "error" db.commit() raise # Re-raise for Celery