Files
orion/app/modules/monitoring/routes/api/admin_tests.py
Samir Boulahtit 4aa6f76e46
Some checks failed
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled
CI / ruff (push) Successful in 10s
refactor(arch): move auth schemas to tenancy module and add cross-module service methods
Move all auth schemas (UserContext, UserLogin, LoginResponse, etc.) from
legacy models/schema/auth.py to app/modules/tenancy/schemas/auth.py per
MOD-019. Update 84 import sites across 14 modules. Legacy file now
re-exports for backwards compatibility.

Add missing tenancy service methods for cross-module consumers:
- merchant_service.get_merchant_by_owner_id()
- merchant_service.get_merchant_count_for_owner()
- admin_service.get_user_by_id() (public, was private-only)
- platform_service.get_active_store_count()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 23:57:04 +01:00

339 lines
9.2 KiB
Python

# app/modules/monitoring/routes/api/admin_tests.py
"""
Test Runner API Endpoints
RESTful API for running pytest and viewing test results
"""
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.modules.dev_tools.services.test_runner_service import test_runner_service
from app.modules.tenancy.schemas.auth import UserContext
admin_tests_router = APIRouter(prefix="/tests")
# Pydantic Models for API
class TestRunResponse(BaseModel):
"""Response model for a test run"""
id: int
timestamp: str
total_tests: int
passed: int
failed: int
errors: int
skipped: int
xfailed: int
xpassed: int
pass_rate: float
duration_seconds: float
coverage_percent: float | None
triggered_by: str | None
git_commit_hash: str | None
git_branch: str | None
test_path: str | None
status: str
class Config:
from_attributes = True
class TestResultResponse(BaseModel):
"""Response model for a single test result"""
id: int
node_id: str
test_name: str
test_file: str
test_class: str | None
outcome: str
duration_seconds: float
error_message: str | None
traceback: str | None
class Config:
from_attributes = True
class RunTestsRequest(BaseModel):
"""Request model for running tests"""
test_path: str = Field("tests", description="Path to tests to run")
extra_args: list[str] | None = Field(
None, description="Additional pytest arguments"
)
class TestDashboardStatsResponse(BaseModel):
"""Response model for dashboard statistics"""
# Current run stats
total_tests: int
passed: int
failed: int
errors: int
skipped: int
pass_rate: float
duration_seconds: float
coverage_percent: float | None
last_run: str | None
last_run_status: str | None
# Collection stats
total_test_files: int
collected_tests: int
unit_tests: int
integration_tests: int
performance_tests: int
system_tests: int
last_collected: str | None
# Trend and breakdown data
trend: list[dict]
by_category: dict
top_failing: list[dict]
# API Endpoints
@admin_tests_router.post("/run", response_model=TestRunResponse)
async def run_tests(
background_tasks: BackgroundTasks,
request: RunTestsRequest | None = None,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_admin_api),
):
"""
Start a pytest run in the background
Requires admin authentication. Creates a test run record and starts
pytest execution in the background. Returns immediately with the run ID.
Poll GET /runs/{run_id} to check status.
"""
test_path = request.test_path if request else "tests"
extra_args = request.extra_args if request else None
# Create the test run record
run = test_runner_service.create_test_run(
db,
test_path=test_path,
triggered_by=f"manual:{current_user.username}",
)
db.commit()
# Dispatch via task dispatcher (supports Celery or BackgroundTasks)
from app.tasks.dispatcher import task_dispatcher
celery_task_id = task_dispatcher.dispatch_test_run(
background_tasks=background_tasks,
run_id=run.id,
test_path=test_path,
extra_args=extra_args,
)
# Store Celery task ID if using Celery
if celery_task_id:
run.celery_task_id = celery_task_id
db.commit()
return TestRunResponse(
id=run.id,
timestamp=run.timestamp.isoformat(),
total_tests=run.total_tests,
passed=run.passed,
failed=run.failed,
errors=run.errors,
skipped=run.skipped,
xfailed=run.xfailed,
xpassed=run.xpassed,
pass_rate=run.pass_rate,
duration_seconds=run.duration_seconds,
coverage_percent=run.coverage_percent,
triggered_by=run.triggered_by,
git_commit_hash=run.git_commit_hash,
git_branch=run.git_branch,
test_path=run.test_path,
status=run.status,
)
@admin_tests_router.get("/runs", response_model=list[TestRunResponse])
async def list_runs(
limit: int = Query(20, ge=1, le=100, description="Number of runs to return"),
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_admin_api),
):
"""
Get test run history
Returns recent test runs for trend analysis.
"""
runs = test_runner_service.get_run_history(db, limit=limit)
return [
TestRunResponse(
id=run.id,
timestamp=run.timestamp.isoformat(),
total_tests=run.total_tests,
passed=run.passed,
failed=run.failed,
errors=run.errors,
skipped=run.skipped,
xfailed=run.xfailed,
xpassed=run.xpassed,
pass_rate=run.pass_rate,
duration_seconds=run.duration_seconds,
coverage_percent=run.coverage_percent,
triggered_by=run.triggered_by,
git_commit_hash=run.git_commit_hash,
git_branch=run.git_branch,
test_path=run.test_path,
status=run.status,
)
for run in runs
]
@admin_tests_router.get("/runs/{run_id}", response_model=TestRunResponse)
async def get_run(
run_id: int,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_admin_api),
):
"""
Get a specific test run
"""
run = test_runner_service.get_run_by_id(db, run_id)
if not run:
from app.exceptions.base import ResourceNotFoundException
raise ResourceNotFoundException("TestRun", str(run_id))
return TestRunResponse(
id=run.id,
timestamp=run.timestamp.isoformat(),
total_tests=run.total_tests,
passed=run.passed,
failed=run.failed,
errors=run.errors,
skipped=run.skipped,
xfailed=run.xfailed,
xpassed=run.xpassed,
pass_rate=run.pass_rate,
duration_seconds=run.duration_seconds,
coverage_percent=run.coverage_percent,
triggered_by=run.triggered_by,
git_commit_hash=run.git_commit_hash,
git_branch=run.git_branch,
test_path=run.test_path,
status=run.status,
)
@admin_tests_router.get("/runs/{run_id}/results", response_model=list[TestResultResponse])
async def get_run_results(
run_id: int,
outcome: str | None = Query(
None, description="Filter by outcome (passed, failed, error, skipped)"
),
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_admin_api),
):
"""
Get test results for a specific run
"""
results = test_runner_service.get_run_results(db, run_id, outcome=outcome)
return [
TestResultResponse(
id=r.id,
node_id=r.node_id,
test_name=r.test_name,
test_file=r.test_file,
test_class=r.test_class,
outcome=r.outcome,
duration_seconds=r.duration_seconds,
error_message=r.error_message,
traceback=r.traceback,
)
for r in results
]
@admin_tests_router.get("/runs/{run_id}/failures", response_model=list[TestResultResponse])
async def get_run_failures(
run_id: int,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_admin_api),
):
"""
Get failed tests from a specific run
"""
failures = test_runner_service.get_failed_tests(db, run_id)
return [
TestResultResponse(
id=r.id,
node_id=r.node_id,
test_name=r.test_name,
test_file=r.test_file,
test_class=r.test_class,
outcome=r.outcome,
duration_seconds=r.duration_seconds,
error_message=r.error_message,
traceback=r.traceback,
)
for r in failures
]
@admin_tests_router.get("/stats", response_model=TestDashboardStatsResponse)
async def get_dashboard_stats(
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_admin_api),
):
"""
Get dashboard statistics
Returns comprehensive stats for the testing dashboard including:
- Total counts by outcome
- Pass rate
- Trend data
- Tests by category
- Top failing tests
"""
stats = test_runner_service.get_dashboard_stats(db)
return TestDashboardStatsResponse(**stats)
@admin_tests_router.post("/collect")
async def collect_tests(
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_admin_api),
):
"""
Collect test information without running tests
Updates the test collection cache with current test counts.
"""
collection = test_runner_service.collect_tests(db)
db.commit()
return {
"total_tests": collection.total_tests,
"total_files": collection.total_files,
"unit_tests": collection.unit_tests,
"integration_tests": collection.integration_tests,
"performance_tests": collection.performance_tests,
"system_tests": collection.system_tests,
"collected_at": collection.collected_at.isoformat(),
}