feat: add background tasks monitoring dashboard
Adds a unified view of all background tasks (imports and test runs) under Platform Monitoring. Includes real-time status polling, statistics overview, and task history. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -27,6 +27,7 @@ from fastapi import APIRouter
|
||||
from . import (
|
||||
audit,
|
||||
auth,
|
||||
background_tasks,
|
||||
code_quality,
|
||||
companies,
|
||||
content_pages,
|
||||
@@ -118,8 +119,10 @@ router.include_router(marketplace.router, tags=["admin-marketplace"])
|
||||
# Platform Administration
|
||||
# ============================================================================
|
||||
|
||||
# Include monitoring endpoints (placeholder for future implementation)
|
||||
# router.include_router(monitoring.router, tags=["admin-monitoring"])
|
||||
# Include background tasks monitoring endpoints
|
||||
router.include_router(
|
||||
background_tasks.router, prefix="/background-tasks", tags=["admin-background-tasks"]
|
||||
)
|
||||
|
||||
# Include audit logging endpoints
|
||||
router.include_router(audit.router, tags=["admin-audit"])
|
||||
|
||||
189
app/api/v1/admin/background_tasks.py
Normal file
189
app/api/v1/admin/background_tasks.py
Normal file
@@ -0,0 +1,189 @@
|
||||
# app/api/v1/admin/background_tasks.py
|
||||
"""
|
||||
Background Tasks Monitoring API
|
||||
Provides unified view of all background tasks across the system
|
||||
"""
|
||||
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.api.deps import get_current_admin_api
|
||||
from app.core.database import get_db
|
||||
from app.services.background_tasks_service import background_tasks_service
|
||||
from models.database.user import User
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
class BackgroundTaskResponse(BaseModel):
|
||||
"""Unified background task response"""
|
||||
|
||||
id: int
|
||||
task_type: str # 'import' or 'test_run'
|
||||
status: str
|
||||
started_at: str | None
|
||||
completed_at: str | None
|
||||
duration_seconds: float | None
|
||||
description: str
|
||||
triggered_by: str | None
|
||||
error_message: str | None
|
||||
details: dict | None
|
||||
|
||||
|
||||
class BackgroundTasksStatsResponse(BaseModel):
|
||||
"""Statistics for background tasks"""
|
||||
|
||||
total_tasks: int
|
||||
running: int
|
||||
completed: int
|
||||
failed: int
|
||||
tasks_today: int
|
||||
avg_duration_seconds: float | None
|
||||
|
||||
# By type
|
||||
import_jobs: dict
|
||||
test_runs: dict
|
||||
|
||||
|
||||
def _convert_import_to_response(job) -> BackgroundTaskResponse:
|
||||
"""Convert MarketplaceImportJob to BackgroundTaskResponse"""
|
||||
duration = None
|
||||
if job.started_at and job.completed_at:
|
||||
duration = (job.completed_at - job.started_at).total_seconds()
|
||||
elif job.started_at and job.status == "processing":
|
||||
duration = (datetime.now(UTC) - job.started_at).total_seconds()
|
||||
|
||||
return BackgroundTaskResponse(
|
||||
id=job.id,
|
||||
task_type="import",
|
||||
status=job.status,
|
||||
started_at=job.started_at.isoformat() if job.started_at else None,
|
||||
completed_at=job.completed_at.isoformat() if job.completed_at else None,
|
||||
duration_seconds=duration,
|
||||
description=f"Import from {job.marketplace}: {job.source_url[:50]}..." if len(job.source_url) > 50 else f"Import from {job.marketplace}: {job.source_url}",
|
||||
triggered_by=job.user.username if job.user else None,
|
||||
error_message=job.error_message,
|
||||
details={
|
||||
"marketplace": job.marketplace,
|
||||
"vendor_id": job.vendor_id,
|
||||
"imported": job.imported_count,
|
||||
"updated": job.updated_count,
|
||||
"errors": job.error_count,
|
||||
"total_processed": job.total_processed,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _convert_test_run_to_response(run) -> BackgroundTaskResponse:
|
||||
"""Convert TestRun to BackgroundTaskResponse"""
|
||||
duration = run.duration_seconds
|
||||
if run.status == "running" and run.timestamp:
|
||||
duration = (datetime.now(UTC) - run.timestamp).total_seconds()
|
||||
|
||||
return BackgroundTaskResponse(
|
||||
id=run.id,
|
||||
task_type="test_run",
|
||||
status=run.status,
|
||||
started_at=run.timestamp.isoformat() if run.timestamp else None,
|
||||
completed_at=None,
|
||||
duration_seconds=duration,
|
||||
description=f"Test run: {run.test_path}",
|
||||
triggered_by=run.triggered_by,
|
||||
error_message=None,
|
||||
details={
|
||||
"test_path": run.test_path,
|
||||
"total_tests": run.total_tests,
|
||||
"passed": run.passed,
|
||||
"failed": run.failed,
|
||||
"errors": run.errors,
|
||||
"pass_rate": run.pass_rate,
|
||||
"git_branch": run.git_branch,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/tasks", response_model=list[BackgroundTaskResponse])
|
||||
async def list_background_tasks(
|
||||
status: str | None = Query(None, description="Filter by status"),
|
||||
task_type: str | None = Query(None, description="Filter by type (import, test_run)"),
|
||||
limit: int = Query(50, ge=1, le=200),
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_admin_api),
|
||||
):
|
||||
"""
|
||||
List all background tasks across the system
|
||||
|
||||
Returns a unified view of import jobs and test runs.
|
||||
"""
|
||||
tasks = []
|
||||
|
||||
# Get import jobs
|
||||
if task_type is None or task_type == "import":
|
||||
import_jobs = background_tasks_service.get_import_jobs(db, status=status, limit=limit)
|
||||
tasks.extend([_convert_import_to_response(job) for job in import_jobs])
|
||||
|
||||
# Get test runs
|
||||
if task_type is None or task_type == "test_run":
|
||||
test_runs = background_tasks_service.get_test_runs(db, status=status, limit=limit)
|
||||
tasks.extend([_convert_test_run_to_response(run) for run in test_runs])
|
||||
|
||||
# Sort by start time (most recent first)
|
||||
tasks.sort(
|
||||
key=lambda t: t.started_at or "1970-01-01T00:00:00",
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
return tasks[:limit]
|
||||
|
||||
|
||||
@router.get("/tasks/stats", response_model=BackgroundTasksStatsResponse)
|
||||
async def get_background_tasks_stats(
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_admin_api),
|
||||
):
|
||||
"""
|
||||
Get statistics for background tasks
|
||||
"""
|
||||
import_stats = background_tasks_service.get_import_stats(db)
|
||||
test_stats = background_tasks_service.get_test_run_stats(db)
|
||||
|
||||
# Combined stats
|
||||
total_running = import_stats["running"] + test_stats["running"]
|
||||
total_completed = import_stats["completed"] + test_stats["completed"]
|
||||
total_failed = import_stats["failed"] + test_stats["failed"]
|
||||
total_tasks = import_stats["total"] + test_stats["total"]
|
||||
|
||||
return BackgroundTasksStatsResponse(
|
||||
total_tasks=total_tasks,
|
||||
running=total_running,
|
||||
completed=total_completed,
|
||||
failed=total_failed,
|
||||
tasks_today=import_stats["today"] + test_stats["today"],
|
||||
avg_duration_seconds=test_stats.get("avg_duration"),
|
||||
import_jobs=import_stats,
|
||||
test_runs=test_stats,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/tasks/running", response_model=list[BackgroundTaskResponse])
|
||||
async def list_running_tasks(
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_admin_api),
|
||||
):
|
||||
"""
|
||||
List currently running background tasks
|
||||
"""
|
||||
tasks = []
|
||||
|
||||
# Running imports
|
||||
running_imports = background_tasks_service.get_running_imports(db)
|
||||
tasks.extend([_convert_import_to_response(job) for job in running_imports])
|
||||
|
||||
# Running test runs
|
||||
running_tests = background_tasks_service.get_running_test_runs(db)
|
||||
tasks.extend([_convert_test_run_to_response(run) for run in running_tests])
|
||||
|
||||
return tasks
|
||||
Reference in New Issue
Block a user