feat: complete dev_tools module self-containment
Migrate dev_tools module to self-contained structure: - routes/api/ - API endpoints - models/architecture_scan.py - Architecture scan models - models/test_run.py - Test run models - schemas/ - Pydantic schemas - services/ - Business logic services - tasks/ - Celery background tasks - exceptions.py - Module exceptions Updated definition.py with self-contained paths. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
85
app/modules/dev_tools/tasks/test_runner.py
Normal file
85
app/modules/dev_tools/tasks/test_runner.py
Normal file
@@ -0,0 +1,85 @@
|
||||
# app/modules/dev_tools/tasks/test_runner.py
|
||||
"""
|
||||
Celery tasks for test execution.
|
||||
|
||||
Wraps the existing execute_test_run function for Celery execution.
|
||||
This is the canonical location - task is re-exported from the legacy location
|
||||
for backward compatibility.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from app.core.celery_config import celery_app
|
||||
from app.services.test_runner_service import test_runner_service
|
||||
from app.tasks.celery_tasks.base import DatabaseTask
|
||||
from app.modules.dev_tools.models import TestRun
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@celery_app.task(
|
||||
bind=True,
|
||||
base=DatabaseTask,
|
||||
name="app.modules.dev_tools.tasks.test_runner.execute_test_run",
|
||||
max_retries=1,
|
||||
time_limit=3600, # 1 hour hard limit
|
||||
soft_time_limit=3300, # 55 minutes soft limit
|
||||
)
|
||||
def execute_test_run(
|
||||
self,
|
||||
run_id: int,
|
||||
test_path: str = "tests",
|
||||
extra_args: list[str] | None = None,
|
||||
):
|
||||
"""
|
||||
Celery task to execute pytest tests.
|
||||
|
||||
Args:
|
||||
run_id: ID of the TestRun record
|
||||
test_path: Path to tests (relative to project root)
|
||||
extra_args: Additional pytest arguments
|
||||
|
||||
Returns:
|
||||
dict: Test run results summary
|
||||
"""
|
||||
with self.get_db() as db:
|
||||
# Get the test run record
|
||||
test_run = db.query(TestRun).filter(TestRun.id == run_id).first()
|
||||
if not test_run:
|
||||
logger.error(f"Test run {run_id} not found")
|
||||
return {"error": f"Test run {run_id} not found"}
|
||||
|
||||
# Store Celery task ID
|
||||
test_run.celery_task_id = self.request.id
|
||||
db.commit()
|
||||
|
||||
try:
|
||||
logger.info(f"Starting test execution: Run {run_id}, Path: {test_path}")
|
||||
|
||||
# Execute the tests
|
||||
test_runner_service._execute_tests(db, test_run, test_path, extra_args)
|
||||
db.commit()
|
||||
|
||||
logger.info(
|
||||
f"Test run {run_id} completed: "
|
||||
f"status={test_run.status}, passed={test_run.passed}, "
|
||||
f"failed={test_run.failed}, duration={test_run.duration_seconds:.1f}s"
|
||||
)
|
||||
|
||||
return {
|
||||
"run_id": run_id,
|
||||
"status": test_run.status,
|
||||
"total_tests": test_run.total_tests,
|
||||
"passed": test_run.passed,
|
||||
"failed": test_run.failed,
|
||||
"errors": test_run.errors,
|
||||
"skipped": test_run.skipped,
|
||||
"coverage_percent": test_run.coverage_percent,
|
||||
"duration_seconds": test_run.duration_seconds,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Test run {run_id} failed: {e}", exc_info=True)
|
||||
test_run.status = "error"
|
||||
db.commit()
|
||||
raise # Re-raise for Celery
|
||||
Reference in New Issue
Block a user