diff --git a/alembic/versions/82ea1b4a3ccb_add_test_run_tables.py b/alembic/versions/82ea1b4a3ccb_add_test_run_tables.py new file mode 100644 index 00000000..2c9f6986 --- /dev/null +++ b/alembic/versions/82ea1b4a3ccb_add_test_run_tables.py @@ -0,0 +1,103 @@ +"""add_test_run_tables + +Revision ID: 82ea1b4a3ccb +Revises: b4c5d6e7f8a9 +Create Date: 2025-12-12 22:48:09.501172 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '82ea1b4a3ccb' +down_revision: Union[str, None] = 'b4c5d6e7f8a9' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Create test_collections table + op.create_table('test_collections', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('total_tests', sa.Integer(), nullable=True), + sa.Column('total_files', sa.Integer(), nullable=True), + sa.Column('total_classes', sa.Integer(), nullable=True), + sa.Column('unit_tests', sa.Integer(), nullable=True), + sa.Column('integration_tests', sa.Integer(), nullable=True), + sa.Column('performance_tests', sa.Integer(), nullable=True), + sa.Column('system_tests', sa.Integer(), nullable=True), + sa.Column('test_files', sa.JSON(), nullable=True), + sa.Column('collected_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_test_collections_id'), 'test_collections', ['id'], unique=False) + + # Create test_runs table + op.create_table('test_runs', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('timestamp', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False), + sa.Column('total_tests', sa.Integer(), nullable=True), + sa.Column('passed', sa.Integer(), nullable=True), + sa.Column('failed', sa.Integer(), nullable=True), + sa.Column('errors', sa.Integer(), nullable=True), + sa.Column('skipped', sa.Integer(), nullable=True), + sa.Column('xfailed', sa.Integer(), nullable=True), + sa.Column('xpassed', sa.Integer(), nullable=True), + sa.Column('coverage_percent', sa.Float(), nullable=True), + sa.Column('duration_seconds', sa.Float(), nullable=True), + sa.Column('triggered_by', sa.String(length=100), nullable=True), + sa.Column('git_commit_hash', sa.String(length=40), nullable=True), + sa.Column('git_branch', sa.String(length=100), nullable=True), + sa.Column('test_path', sa.String(length=500), nullable=True), + sa.Column('pytest_args', sa.String(length=500), nullable=True), + sa.Column('status', sa.String(length=20), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_test_runs_id'), 'test_runs', ['id'], unique=False) + op.create_index(op.f('ix_test_runs_status'), 'test_runs', ['status'], unique=False) + op.create_index(op.f('ix_test_runs_timestamp'), 'test_runs', ['timestamp'], unique=False) + + # Create test_results table + op.create_table('test_results', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('run_id', sa.Integer(), nullable=False), + sa.Column('node_id', sa.String(length=500), nullable=False), + sa.Column('test_name', sa.String(length=200), nullable=False), + sa.Column('test_file', sa.String(length=300), nullable=False), + sa.Column('test_class', sa.String(length=200), nullable=True), + sa.Column('outcome', sa.String(length=20), nullable=False), + sa.Column('duration_seconds', sa.Float(), nullable=True), + sa.Column('error_message', sa.Text(), nullable=True), + sa.Column('traceback', sa.Text(), nullable=True), + sa.Column('markers', sa.JSON(), nullable=True), + sa.Column('parameters', sa.JSON(), nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False), + sa.ForeignKeyConstraint(['run_id'], ['test_runs.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_test_results_id'), 'test_results', ['id'], unique=False) + op.create_index(op.f('ix_test_results_node_id'), 'test_results', ['node_id'], unique=False) + op.create_index(op.f('ix_test_results_outcome'), 'test_results', ['outcome'], unique=False) + op.create_index(op.f('ix_test_results_run_id'), 'test_results', ['run_id'], unique=False) + + +def downgrade() -> None: + # Drop test_results table first (has foreign key to test_runs) + op.drop_index(op.f('ix_test_results_run_id'), table_name='test_results') + op.drop_index(op.f('ix_test_results_outcome'), table_name='test_results') + op.drop_index(op.f('ix_test_results_node_id'), table_name='test_results') + op.drop_index(op.f('ix_test_results_id'), table_name='test_results') + op.drop_table('test_results') + + # Drop test_runs table + op.drop_index(op.f('ix_test_runs_timestamp'), table_name='test_runs') + op.drop_index(op.f('ix_test_runs_status'), table_name='test_runs') + op.drop_index(op.f('ix_test_runs_id'), table_name='test_runs') + op.drop_table('test_runs') + + # Drop test_collections table + op.drop_index(op.f('ix_test_collections_id'), table_name='test_collections') + op.drop_table('test_collections') diff --git a/app/api/v1/admin/__init__.py b/app/api/v1/admin/__init__.py index e66dac43..cd085c1b 100644 --- a/app/api/v1/admin/__init__.py +++ b/app/api/v1/admin/__init__.py @@ -37,6 +37,7 @@ from . import ( notifications, products, settings, + tests, users, vendor_domains, vendor_products, @@ -142,5 +143,10 @@ router.include_router( code_quality.router, prefix="/code-quality", tags=["admin-code-quality"] ) +# Include test runner endpoints +router.include_router( + tests.router, prefix="/tests", tags=["admin-tests"] +) + # Export the router __all__ = ["router"] diff --git a/app/api/v1/admin/tests.py b/app/api/v1/admin/tests.py new file mode 100644 index 00000000..de49c475 --- /dev/null +++ b/app/api/v1/admin/tests.py @@ -0,0 +1,309 @@ +""" +Test Runner API Endpoints +RESTful API for running pytest and viewing test results +""" + +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel, Field +from sqlalchemy.orm import Session + +from app.api.deps import get_current_admin_api +from app.core.database import get_db +from app.services.test_runner_service import test_runner_service +from models.database.user import User + +router = APIRouter() + + +# Pydantic Models for API + + +class TestRunResponse(BaseModel): + """Response model for a test run""" + + id: int + timestamp: str + total_tests: int + passed: int + failed: int + errors: int + skipped: int + xfailed: int + xpassed: int + pass_rate: float + duration_seconds: float + coverage_percent: float | None + triggered_by: str | None + git_commit_hash: str | None + git_branch: str | None + test_path: str | None + status: str + + class Config: + from_attributes = True + + +class TestResultResponse(BaseModel): + """Response model for a single test result""" + + id: int + node_id: str + test_name: str + test_file: str + test_class: str | None + outcome: str + duration_seconds: float + error_message: str | None + traceback: str | None + + class Config: + from_attributes = True + + +class RunTestsRequest(BaseModel): + """Request model for running tests""" + + test_path: str = Field("tests", description="Path to tests to run") + extra_args: list[str] | None = Field(None, description="Additional pytest arguments") + + +class TestDashboardStatsResponse(BaseModel): + """Response model for dashboard statistics""" + + # Current run stats + total_tests: int + passed: int + failed: int + errors: int + skipped: int + pass_rate: float + duration_seconds: float + coverage_percent: float | None + last_run: str | None + last_run_status: str | None + + # Collection stats + total_test_files: int + + # Trend and breakdown data + trend: list[dict] + by_category: dict + top_failing: list[dict] + + +# API Endpoints + + +@router.post("/run", response_model=TestRunResponse) +async def run_tests( + request: RunTestsRequest | None = None, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Run pytest and store results + + Requires admin authentication. Runs pytest on the specified path + and stores results in the database. + """ + test_path = request.test_path if request else "tests" + extra_args = request.extra_args if request else None + + run = test_runner_service.run_tests( + db, + test_path=test_path, + triggered_by=f"manual:{current_user.username}", + extra_args=extra_args, + ) + db.commit() + + return TestRunResponse( + id=run.id, + timestamp=run.timestamp.isoformat(), + total_tests=run.total_tests, + passed=run.passed, + failed=run.failed, + errors=run.errors, + skipped=run.skipped, + xfailed=run.xfailed, + xpassed=run.xpassed, + pass_rate=run.pass_rate, + duration_seconds=run.duration_seconds, + coverage_percent=run.coverage_percent, + triggered_by=run.triggered_by, + git_commit_hash=run.git_commit_hash, + git_branch=run.git_branch, + test_path=run.test_path, + status=run.status, + ) + + +@router.get("/runs", response_model=list[TestRunResponse]) +async def list_runs( + limit: int = Query(20, ge=1, le=100, description="Number of runs to return"), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Get test run history + + Returns recent test runs for trend analysis. + """ + runs = test_runner_service.get_run_history(db, limit=limit) + + return [ + TestRunResponse( + id=run.id, + timestamp=run.timestamp.isoformat(), + total_tests=run.total_tests, + passed=run.passed, + failed=run.failed, + errors=run.errors, + skipped=run.skipped, + xfailed=run.xfailed, + xpassed=run.xpassed, + pass_rate=run.pass_rate, + duration_seconds=run.duration_seconds, + coverage_percent=run.coverage_percent, + triggered_by=run.triggered_by, + git_commit_hash=run.git_commit_hash, + git_branch=run.git_branch, + test_path=run.test_path, + status=run.status, + ) + for run in runs + ] + + +@router.get("/runs/{run_id}", response_model=TestRunResponse) +async def get_run( + run_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Get a specific test run + """ + run = test_runner_service.get_run_by_id(db, run_id) + + if not run: + from app.exceptions.base import ResourceNotFoundException + raise ResourceNotFoundException("TestRun", str(run_id)) + + return TestRunResponse( + id=run.id, + timestamp=run.timestamp.isoformat(), + total_tests=run.total_tests, + passed=run.passed, + failed=run.failed, + errors=run.errors, + skipped=run.skipped, + xfailed=run.xfailed, + xpassed=run.xpassed, + pass_rate=run.pass_rate, + duration_seconds=run.duration_seconds, + coverage_percent=run.coverage_percent, + triggered_by=run.triggered_by, + git_commit_hash=run.git_commit_hash, + git_branch=run.git_branch, + test_path=run.test_path, + status=run.status, + ) + + +@router.get("/runs/{run_id}/results", response_model=list[TestResultResponse]) +async def get_run_results( + run_id: int, + outcome: str | None = Query(None, description="Filter by outcome (passed, failed, error, skipped)"), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Get test results for a specific run + """ + results = test_runner_service.get_run_results(db, run_id, outcome=outcome) + + return [ + TestResultResponse( + id=r.id, + node_id=r.node_id, + test_name=r.test_name, + test_file=r.test_file, + test_class=r.test_class, + outcome=r.outcome, + duration_seconds=r.duration_seconds, + error_message=r.error_message, + traceback=r.traceback, + ) + for r in results + ] + + +@router.get("/runs/{run_id}/failures", response_model=list[TestResultResponse]) +async def get_run_failures( + run_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Get failed tests from a specific run + """ + failures = test_runner_service.get_failed_tests(db, run_id) + + return [ + TestResultResponse( + id=r.id, + node_id=r.node_id, + test_name=r.test_name, + test_file=r.test_file, + test_class=r.test_class, + outcome=r.outcome, + duration_seconds=r.duration_seconds, + error_message=r.error_message, + traceback=r.traceback, + ) + for r in failures + ] + + +@router.get("/stats", response_model=TestDashboardStatsResponse) +async def get_dashboard_stats( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Get dashboard statistics + + Returns comprehensive stats for the testing dashboard including: + - Total counts by outcome + - Pass rate + - Trend data + - Tests by category + - Top failing tests + """ + stats = test_runner_service.get_dashboard_stats(db) + return TestDashboardStatsResponse(**stats) + + +@router.post("/collect") +async def collect_tests( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Collect test information without running tests + + Updates the test collection cache with current test counts. + """ + collection = test_runner_service.collect_tests(db) + db.commit() + + return { + "total_tests": collection.total_tests, + "total_files": collection.total_files, + "unit_tests": collection.unit_tests, + "integration_tests": collection.integration_tests, + "performance_tests": collection.performance_tests, + "system_tests": collection.system_tests, + "collected_at": collection.collected_at.isoformat(), + } diff --git a/app/routes/admin_pages.py b/app/routes/admin_pages.py index 7f0e49cd..88e19fcd 100644 --- a/app/routes/admin_pages.py +++ b/app/routes/admin_pages.py @@ -789,14 +789,33 @@ async def admin_icons_page( @router.get("/testing", response_class=HTMLResponse, include_in_schema=False) +async def admin_testing_dashboard( + request: Request, + current_user: User = Depends(get_current_admin_from_cookie_or_header), + db: Session = Depends(get_db), +): + """ + Render testing dashboard page. + pytest results and test coverage overview. + """ + return templates.TemplateResponse( + "admin/testing-dashboard.html", + { + "request": request, + "user": current_user, + }, + ) + + +@router.get("/testing-hub", response_class=HTMLResponse, include_in_schema=False) async def admin_testing_hub( request: Request, current_user: User = Depends(get_current_admin_from_cookie_or_header), db: Session = Depends(get_db), ): """ - Render testing hub page. - Central hub for all test suites and QA tools. + Render manual testing hub page. + Central hub for all manual test suites and QA tools. """ return templates.TemplateResponse( "admin/testing-hub.html", diff --git a/app/services/test_runner_service.py b/app/services/test_runner_service.py new file mode 100644 index 00000000..280f0d76 --- /dev/null +++ b/app/services/test_runner_service.py @@ -0,0 +1,444 @@ +""" +Test Runner Service +Service for running pytest and storing results +""" + +import json +import logging +import re +import subprocess +import tempfile +from datetime import UTC, datetime +from pathlib import Path + +from sqlalchemy import desc, func +from sqlalchemy.orm import Session + +from models.database.test_run import TestCollection, TestResult, TestRun + +logger = logging.getLogger(__name__) + + +class TestRunnerService: + """Service for managing pytest test runs""" + + def __init__(self): + self.project_root = Path(__file__).parent.parent.parent + + def run_tests( + self, + db: Session, + test_path: str = "tests", + triggered_by: str = "manual", + extra_args: list[str] | None = None, + ) -> TestRun: + """ + Run pytest and store results in database + + Args: + db: Database session + test_path: Path to tests (relative to project root) + triggered_by: Who triggered the run + extra_args: Additional pytest arguments + + Returns: + TestRun object with results + """ + # Create test run record + test_run = TestRun( + timestamp=datetime.now(UTC), + triggered_by=triggered_by, + test_path=test_path, + status="running", + ) + db.add(test_run) + db.flush() # Get the ID + + try: + # Get git info + test_run.git_commit_hash = self._get_git_commit() + test_run.git_branch = self._get_git_branch() + + # Build pytest command with JSON output + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json_report_path = f.name + + pytest_args = [ + "python", "-m", "pytest", + test_path, + f"--json-report", + f"--json-report-file={json_report_path}", + "-v", + "--tb=short", + ] + + if extra_args: + pytest_args.extend(extra_args) + + test_run.pytest_args = " ".join(pytest_args) + + # Run pytest + start_time = datetime.now(UTC) + result = subprocess.run( + pytest_args, + cwd=str(self.project_root), + capture_output=True, + text=True, + timeout=600, # 10 minute timeout + ) + end_time = datetime.now(UTC) + + test_run.duration_seconds = (end_time - start_time).total_seconds() + + # Parse JSON report + try: + with open(json_report_path, 'r') as f: + report = json.load(f) + + self._process_json_report(db, test_run, report) + except FileNotFoundError: + # Fallback to parsing stdout + self._parse_pytest_output(test_run, result.stdout, result.stderr) + finally: + # Clean up temp file + try: + Path(json_report_path).unlink() + except: + pass + + # Set final status + if test_run.failed > 0 or test_run.errors > 0: + test_run.status = "failed" + else: + test_run.status = "passed" + + except subprocess.TimeoutExpired: + test_run.status = "error" + logger.error("Pytest run timed out") + except Exception as e: + test_run.status = "error" + logger.error(f"Error running tests: {e}") + + return test_run + + def _process_json_report(self, db: Session, test_run: TestRun, report: dict): + """Process pytest-json-report output""" + summary = report.get("summary", {}) + + test_run.total_tests = summary.get("total", 0) + test_run.passed = summary.get("passed", 0) + test_run.failed = summary.get("failed", 0) + test_run.errors = summary.get("error", 0) + test_run.skipped = summary.get("skipped", 0) + test_run.xfailed = summary.get("xfailed", 0) + test_run.xpassed = summary.get("xpassed", 0) + + # Process individual test results + tests = report.get("tests", []) + for test in tests: + node_id = test.get("nodeid", "") + outcome = test.get("outcome", "unknown") + + # Parse node_id to get file, class, function + test_file, test_class, test_name = self._parse_node_id(node_id) + + # Get failure details + error_message = None + traceback = None + if outcome in ("failed", "error"): + call_info = test.get("call", {}) + if "longrepr" in call_info: + traceback = call_info["longrepr"] + # Extract error message from traceback + if isinstance(traceback, str): + lines = traceback.strip().split('\n') + if lines: + error_message = lines[-1][:500] # Last line, limited length + + test_result = TestResult( + run_id=test_run.id, + node_id=node_id, + test_name=test_name, + test_file=test_file, + test_class=test_class, + outcome=outcome, + duration_seconds=test.get("duration", 0.0), + error_message=error_message, + traceback=traceback, + markers=test.get("keywords", []), + ) + db.add(test_result) + + def _parse_node_id(self, node_id: str) -> tuple[str, str | None, str]: + """Parse pytest node_id into file, class, function""" + # Format: tests/unit/test_foo.py::TestClass::test_method + # or: tests/unit/test_foo.py::test_function + parts = node_id.split("::") + + test_file = parts[0] if parts else "" + test_class = None + test_name = parts[-1] if parts else "" + + if len(parts) == 3: + test_class = parts[1] + elif len(parts) == 2: + # Could be Class::method or file::function + if parts[1].startswith("Test"): + test_class = parts[1] + test_name = parts[1] + + # Handle parametrized tests + if "[" in test_name: + test_name = test_name.split("[")[0] + + return test_file, test_class, test_name + + def _parse_pytest_output(self, test_run: TestRun, stdout: str, stderr: str): + """Fallback parser for pytest text output""" + # Parse summary line like: "10 passed, 2 failed, 1 skipped" + summary_pattern = r"(\d+)\s+(passed|failed|error|skipped|xfailed|xpassed)" + + for match in re.finditer(summary_pattern, stdout): + count = int(match.group(1)) + status = match.group(2) + + if status == "passed": + test_run.passed = count + elif status == "failed": + test_run.failed = count + elif status == "error": + test_run.errors = count + elif status == "skipped": + test_run.skipped = count + elif status == "xfailed": + test_run.xfailed = count + elif status == "xpassed": + test_run.xpassed = count + + test_run.total_tests = ( + test_run.passed + test_run.failed + test_run.errors + + test_run.skipped + test_run.xfailed + test_run.xpassed + ) + + def _get_git_commit(self) -> str | None: + """Get current git commit hash""" + try: + result = subprocess.run( + ["git", "rev-parse", "HEAD"], + cwd=str(self.project_root), + capture_output=True, + text=True, + timeout=5, + ) + return result.stdout.strip()[:40] if result.returncode == 0 else None + except: + return None + + def _get_git_branch(self) -> str | None: + """Get current git branch""" + try: + result = subprocess.run( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + cwd=str(self.project_root), + capture_output=True, + text=True, + timeout=5, + ) + return result.stdout.strip() if result.returncode == 0 else None + except: + return None + + def get_run_history(self, db: Session, limit: int = 20) -> list[TestRun]: + """Get recent test run history""" + return ( + db.query(TestRun) + .order_by(desc(TestRun.timestamp)) + .limit(limit) + .all() + ) + + def get_run_by_id(self, db: Session, run_id: int) -> TestRun | None: + """Get a specific test run with results""" + return db.query(TestRun).filter(TestRun.id == run_id).first() + + def get_failed_tests(self, db: Session, run_id: int) -> list[TestResult]: + """Get failed tests from a run""" + return ( + db.query(TestResult) + .filter( + TestResult.run_id == run_id, + TestResult.outcome.in_(["failed", "error"]) + ) + .all() + ) + + def get_run_results( + self, db: Session, run_id: int, outcome: str | None = None + ) -> list[TestResult]: + """Get test results for a specific run, optionally filtered by outcome""" + query = db.query(TestResult).filter(TestResult.run_id == run_id) + + if outcome: + query = query.filter(TestResult.outcome == outcome) + + return query.all() + + def get_dashboard_stats(self, db: Session) -> dict: + """Get statistics for the testing dashboard""" + # Get latest run + latest_run = ( + db.query(TestRun) + .filter(TestRun.status != "running") + .order_by(desc(TestRun.timestamp)) + .first() + ) + + # Get test collection info (or calculate from latest run) + collection = db.query(TestCollection).order_by(desc(TestCollection.collected_at)).first() + + # Get trend data (last 10 runs) + trend_runs = ( + db.query(TestRun) + .filter(TestRun.status != "running") + .order_by(desc(TestRun.timestamp)) + .limit(10) + .all() + ) + + # Calculate stats by category from latest run + by_category = {} + if latest_run: + results = db.query(TestResult).filter(TestResult.run_id == latest_run.id).all() + for result in results: + # Categorize by test path + if "unit" in result.test_file: + category = "Unit Tests" + elif "integration" in result.test_file: + category = "Integration Tests" + elif "performance" in result.test_file: + category = "Performance Tests" + elif "system" in result.test_file: + category = "System Tests" + else: + category = "Other" + + if category not in by_category: + by_category[category] = {"total": 0, "passed": 0, "failed": 0} + by_category[category]["total"] += 1 + if result.outcome == "passed": + by_category[category]["passed"] += 1 + elif result.outcome in ("failed", "error"): + by_category[category]["failed"] += 1 + + # Get top failing tests (across recent runs) + top_failing = ( + db.query( + TestResult.test_name, + TestResult.test_file, + func.count(TestResult.id).label("failure_count") + ) + .filter(TestResult.outcome.in_(["failed", "error"])) + .group_by(TestResult.test_name, TestResult.test_file) + .order_by(desc("failure_count")) + .limit(10) + .all() + ) + + return { + # Current run stats + "total_tests": latest_run.total_tests if latest_run else 0, + "passed": latest_run.passed if latest_run else 0, + "failed": latest_run.failed if latest_run else 0, + "errors": latest_run.errors if latest_run else 0, + "skipped": latest_run.skipped if latest_run else 0, + "pass_rate": round(latest_run.pass_rate, 1) if latest_run else 0, + "duration_seconds": round(latest_run.duration_seconds, 2) if latest_run else 0, + "coverage_percent": latest_run.coverage_percent if latest_run else None, + "last_run": latest_run.timestamp.isoformat() if latest_run else None, + "last_run_status": latest_run.status if latest_run else None, + + # Collection stats + "total_test_files": collection.total_files if collection else 0, + + # Trend data + "trend": [ + { + "timestamp": run.timestamp.isoformat(), + "total": run.total_tests, + "passed": run.passed, + "failed": run.failed, + "pass_rate": round(run.pass_rate, 1), + "duration": round(run.duration_seconds, 1), + } + for run in reversed(trend_runs) + ], + + # By category + "by_category": by_category, + + # Top failing tests + "top_failing": [ + { + "test_name": t.test_name, + "test_file": t.test_file, + "failure_count": t.failure_count, + } + for t in top_failing + ], + } + + def collect_tests(self, db: Session) -> TestCollection: + """Collect test information without running tests""" + collection = TestCollection( + collected_at=datetime.now(UTC), + ) + + try: + # Run pytest --collect-only + result = subprocess.run( + ["python", "-m", "pytest", "--collect-only", "-q", "tests"], + cwd=str(self.project_root), + capture_output=True, + text=True, + timeout=60, + ) + + # Parse output + lines = result.stdout.strip().split('\n') + test_files = {} + + for line in lines: + if "::" in line: + file_path = line.split("::")[0] + if file_path not in test_files: + test_files[file_path] = 0 + test_files[file_path] += 1 + + # Count by category + for file_path, count in test_files.items(): + collection.total_tests += count + collection.total_files += 1 + + if "unit" in file_path: + collection.unit_tests += count + elif "integration" in file_path: + collection.integration_tests += count + elif "performance" in file_path: + collection.performance_tests += count + elif "system" in file_path: + collection.system_tests += count + + collection.test_files = [ + {"file": f, "count": c} + for f, c in sorted(test_files.items(), key=lambda x: -x[1]) + ] + + except Exception as e: + logger.error(f"Error collecting tests: {e}") + + db.add(collection) + return collection + + +# Singleton instance +test_runner_service = TestRunnerService() diff --git a/app/templates/admin/testing-dashboard.html b/app/templates/admin/testing-dashboard.html new file mode 100644 index 00000000..f943e797 --- /dev/null +++ b/app/templates/admin/testing-dashboard.html @@ -0,0 +1,348 @@ +{# app/templates/admin/testing-dashboard.html #} +{% extends "admin/base.html" %} +{% from 'shared/macros/alerts.html' import loading_state, error_state, alert_dynamic %} +{% from 'shared/macros/headers.html' import page_header_flex, refresh_button, action_button %} + +{% block title %}Testing Dashboard{% endblock %} + +{% block alpine_data %}testingDashboard(){% endblock %} + +{% block extra_scripts %} + +{% endblock %} + +{% block content %} +{% call page_header_flex(title='Testing Dashboard', subtitle='pytest results and test coverage') %} + {{ refresh_button(variant='secondary') }} + {{ action_button('Run Tests', 'Running...', 'running', 'runTests()', icon='play') }} +{% endcall %} + +{{ loading_state('Loading test results...') }} + +{{ error_state('Error loading test results') }} + +{{ alert_dynamic(type='success', message_var='successMessage', show_condition='successMessage') }} + + +
+ Total Tests +
++ 0 +
++ Passed +
++ 0 +
++ Failed +
++ 0 +
++ Pass Rate +
++ 0% +
+Skipped
+0
+Duration
+0s
+Coverage
+N/A
+Status
++ NO RUNS +
+No test runs yet
+Run tests to see trend data
+No category data available
+ +| Test Name | +File | +Failures | +
|---|---|---|
| + + | ++ | + + + | +
No failing tests!
+All tests are passing
+| Time | +Path | +Total | +Passed | +Failed | +Pass Rate | +Duration | +Status | +
|---|---|---|---|---|---|---|---|
| + | + | + | + | + | + + + | ++ | + + + | +