Adds a unified view of all background tasks (imports and test runs) under Platform Monitoring. Includes real-time status polling, statistics overview, and task history. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
190 lines
6.1 KiB
Python
190 lines
6.1 KiB
Python
# app/api/v1/admin/background_tasks.py
|
|
"""
|
|
Background Tasks Monitoring API
|
|
Provides unified view of all background tasks across the system
|
|
"""
|
|
|
|
from datetime import UTC, datetime
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.api.deps import get_current_admin_api
|
|
from app.core.database import get_db
|
|
from app.services.background_tasks_service import background_tasks_service
|
|
from models.database.user import User
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class BackgroundTaskResponse(BaseModel):
|
|
"""Unified background task response"""
|
|
|
|
id: int
|
|
task_type: str # 'import' or 'test_run'
|
|
status: str
|
|
started_at: str | None
|
|
completed_at: str | None
|
|
duration_seconds: float | None
|
|
description: str
|
|
triggered_by: str | None
|
|
error_message: str | None
|
|
details: dict | None
|
|
|
|
|
|
class BackgroundTasksStatsResponse(BaseModel):
|
|
"""Statistics for background tasks"""
|
|
|
|
total_tasks: int
|
|
running: int
|
|
completed: int
|
|
failed: int
|
|
tasks_today: int
|
|
avg_duration_seconds: float | None
|
|
|
|
# By type
|
|
import_jobs: dict
|
|
test_runs: dict
|
|
|
|
|
|
def _convert_import_to_response(job) -> BackgroundTaskResponse:
|
|
"""Convert MarketplaceImportJob to BackgroundTaskResponse"""
|
|
duration = None
|
|
if job.started_at and job.completed_at:
|
|
duration = (job.completed_at - job.started_at).total_seconds()
|
|
elif job.started_at and job.status == "processing":
|
|
duration = (datetime.now(UTC) - job.started_at).total_seconds()
|
|
|
|
return BackgroundTaskResponse(
|
|
id=job.id,
|
|
task_type="import",
|
|
status=job.status,
|
|
started_at=job.started_at.isoformat() if job.started_at else None,
|
|
completed_at=job.completed_at.isoformat() if job.completed_at else None,
|
|
duration_seconds=duration,
|
|
description=f"Import from {job.marketplace}: {job.source_url[:50]}..." if len(job.source_url) > 50 else f"Import from {job.marketplace}: {job.source_url}",
|
|
triggered_by=job.user.username if job.user else None,
|
|
error_message=job.error_message,
|
|
details={
|
|
"marketplace": job.marketplace,
|
|
"vendor_id": job.vendor_id,
|
|
"imported": job.imported_count,
|
|
"updated": job.updated_count,
|
|
"errors": job.error_count,
|
|
"total_processed": job.total_processed,
|
|
},
|
|
)
|
|
|
|
|
|
def _convert_test_run_to_response(run) -> BackgroundTaskResponse:
|
|
"""Convert TestRun to BackgroundTaskResponse"""
|
|
duration = run.duration_seconds
|
|
if run.status == "running" and run.timestamp:
|
|
duration = (datetime.now(UTC) - run.timestamp).total_seconds()
|
|
|
|
return BackgroundTaskResponse(
|
|
id=run.id,
|
|
task_type="test_run",
|
|
status=run.status,
|
|
started_at=run.timestamp.isoformat() if run.timestamp else None,
|
|
completed_at=None,
|
|
duration_seconds=duration,
|
|
description=f"Test run: {run.test_path}",
|
|
triggered_by=run.triggered_by,
|
|
error_message=None,
|
|
details={
|
|
"test_path": run.test_path,
|
|
"total_tests": run.total_tests,
|
|
"passed": run.passed,
|
|
"failed": run.failed,
|
|
"errors": run.errors,
|
|
"pass_rate": run.pass_rate,
|
|
"git_branch": run.git_branch,
|
|
},
|
|
)
|
|
|
|
|
|
@router.get("/tasks", response_model=list[BackgroundTaskResponse])
|
|
async def list_background_tasks(
|
|
status: str | None = Query(None, description="Filter by status"),
|
|
task_type: str | None = Query(None, description="Filter by type (import, test_run)"),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_admin_api),
|
|
):
|
|
"""
|
|
List all background tasks across the system
|
|
|
|
Returns a unified view of import jobs and test runs.
|
|
"""
|
|
tasks = []
|
|
|
|
# Get import jobs
|
|
if task_type is None or task_type == "import":
|
|
import_jobs = background_tasks_service.get_import_jobs(db, status=status, limit=limit)
|
|
tasks.extend([_convert_import_to_response(job) for job in import_jobs])
|
|
|
|
# Get test runs
|
|
if task_type is None or task_type == "test_run":
|
|
test_runs = background_tasks_service.get_test_runs(db, status=status, limit=limit)
|
|
tasks.extend([_convert_test_run_to_response(run) for run in test_runs])
|
|
|
|
# Sort by start time (most recent first)
|
|
tasks.sort(
|
|
key=lambda t: t.started_at or "1970-01-01T00:00:00",
|
|
reverse=True,
|
|
)
|
|
|
|
return tasks[:limit]
|
|
|
|
|
|
@router.get("/tasks/stats", response_model=BackgroundTasksStatsResponse)
|
|
async def get_background_tasks_stats(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_admin_api),
|
|
):
|
|
"""
|
|
Get statistics for background tasks
|
|
"""
|
|
import_stats = background_tasks_service.get_import_stats(db)
|
|
test_stats = background_tasks_service.get_test_run_stats(db)
|
|
|
|
# Combined stats
|
|
total_running = import_stats["running"] + test_stats["running"]
|
|
total_completed = import_stats["completed"] + test_stats["completed"]
|
|
total_failed = import_stats["failed"] + test_stats["failed"]
|
|
total_tasks = import_stats["total"] + test_stats["total"]
|
|
|
|
return BackgroundTasksStatsResponse(
|
|
total_tasks=total_tasks,
|
|
running=total_running,
|
|
completed=total_completed,
|
|
failed=total_failed,
|
|
tasks_today=import_stats["today"] + test_stats["today"],
|
|
avg_duration_seconds=test_stats.get("avg_duration"),
|
|
import_jobs=import_stats,
|
|
test_runs=test_stats,
|
|
)
|
|
|
|
|
|
@router.get("/tasks/running", response_model=list[BackgroundTaskResponse])
|
|
async def list_running_tasks(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_admin_api),
|
|
):
|
|
"""
|
|
List currently running background tasks
|
|
"""
|
|
tasks = []
|
|
|
|
# Running imports
|
|
running_imports = background_tasks_service.get_running_imports(db)
|
|
tasks.extend([_convert_import_to_response(job) for job in running_imports])
|
|
|
|
# Running test runs
|
|
running_tests = background_tasks_service.get_running_test_runs(db)
|
|
tasks.extend([_convert_test_run_to_response(run) for run in running_tests])
|
|
|
|
return tasks
|