diff --git a/app/api/v1/admin/__init__.py b/app/api/v1/admin/__init__.py index cd085c1b..6cc3455f 100644 --- a/app/api/v1/admin/__init__.py +++ b/app/api/v1/admin/__init__.py @@ -27,6 +27,7 @@ from fastapi import APIRouter from . import ( audit, auth, + background_tasks, code_quality, companies, content_pages, @@ -118,8 +119,10 @@ router.include_router(marketplace.router, tags=["admin-marketplace"]) # Platform Administration # ============================================================================ -# Include monitoring endpoints (placeholder for future implementation) -# router.include_router(monitoring.router, tags=["admin-monitoring"]) +# Include background tasks monitoring endpoints +router.include_router( + background_tasks.router, prefix="/background-tasks", tags=["admin-background-tasks"] +) # Include audit logging endpoints router.include_router(audit.router, tags=["admin-audit"]) diff --git a/app/api/v1/admin/background_tasks.py b/app/api/v1/admin/background_tasks.py new file mode 100644 index 00000000..19efa712 --- /dev/null +++ b/app/api/v1/admin/background_tasks.py @@ -0,0 +1,189 @@ +# app/api/v1/admin/background_tasks.py +""" +Background Tasks Monitoring API +Provides unified view of all background tasks across the system +""" + +from datetime import UTC, datetime + +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from app.api.deps import get_current_admin_api +from app.core.database import get_db +from app.services.background_tasks_service import background_tasks_service +from models.database.user import User + +router = APIRouter() + + +class BackgroundTaskResponse(BaseModel): + """Unified background task response""" + + id: int + task_type: str # 'import' or 'test_run' + status: str + started_at: str | None + completed_at: str | None + duration_seconds: float | None + description: str + triggered_by: str | None + error_message: str | None + details: dict | None + + +class BackgroundTasksStatsResponse(BaseModel): + """Statistics for background tasks""" + + total_tasks: int + running: int + completed: int + failed: int + tasks_today: int + avg_duration_seconds: float | None + + # By type + import_jobs: dict + test_runs: dict + + +def _convert_import_to_response(job) -> BackgroundTaskResponse: + """Convert MarketplaceImportJob to BackgroundTaskResponse""" + duration = None + if job.started_at and job.completed_at: + duration = (job.completed_at - job.started_at).total_seconds() + elif job.started_at and job.status == "processing": + duration = (datetime.now(UTC) - job.started_at).total_seconds() + + return BackgroundTaskResponse( + id=job.id, + task_type="import", + status=job.status, + started_at=job.started_at.isoformat() if job.started_at else None, + completed_at=job.completed_at.isoformat() if job.completed_at else None, + duration_seconds=duration, + description=f"Import from {job.marketplace}: {job.source_url[:50]}..." if len(job.source_url) > 50 else f"Import from {job.marketplace}: {job.source_url}", + triggered_by=job.user.username if job.user else None, + error_message=job.error_message, + details={ + "marketplace": job.marketplace, + "vendor_id": job.vendor_id, + "imported": job.imported_count, + "updated": job.updated_count, + "errors": job.error_count, + "total_processed": job.total_processed, + }, + ) + + +def _convert_test_run_to_response(run) -> BackgroundTaskResponse: + """Convert TestRun to BackgroundTaskResponse""" + duration = run.duration_seconds + if run.status == "running" and run.timestamp: + duration = (datetime.now(UTC) - run.timestamp).total_seconds() + + return BackgroundTaskResponse( + id=run.id, + task_type="test_run", + status=run.status, + started_at=run.timestamp.isoformat() if run.timestamp else None, + completed_at=None, + duration_seconds=duration, + description=f"Test run: {run.test_path}", + triggered_by=run.triggered_by, + error_message=None, + details={ + "test_path": run.test_path, + "total_tests": run.total_tests, + "passed": run.passed, + "failed": run.failed, + "errors": run.errors, + "pass_rate": run.pass_rate, + "git_branch": run.git_branch, + }, + ) + + +@router.get("/tasks", response_model=list[BackgroundTaskResponse]) +async def list_background_tasks( + status: str | None = Query(None, description="Filter by status"), + task_type: str | None = Query(None, description="Filter by type (import, test_run)"), + limit: int = Query(50, ge=1, le=200), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + List all background tasks across the system + + Returns a unified view of import jobs and test runs. + """ + tasks = [] + + # Get import jobs + if task_type is None or task_type == "import": + import_jobs = background_tasks_service.get_import_jobs(db, status=status, limit=limit) + tasks.extend([_convert_import_to_response(job) for job in import_jobs]) + + # Get test runs + if task_type is None or task_type == "test_run": + test_runs = background_tasks_service.get_test_runs(db, status=status, limit=limit) + tasks.extend([_convert_test_run_to_response(run) for run in test_runs]) + + # Sort by start time (most recent first) + tasks.sort( + key=lambda t: t.started_at or "1970-01-01T00:00:00", + reverse=True, + ) + + return tasks[:limit] + + +@router.get("/tasks/stats", response_model=BackgroundTasksStatsResponse) +async def get_background_tasks_stats( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Get statistics for background tasks + """ + import_stats = background_tasks_service.get_import_stats(db) + test_stats = background_tasks_service.get_test_run_stats(db) + + # Combined stats + total_running = import_stats["running"] + test_stats["running"] + total_completed = import_stats["completed"] + test_stats["completed"] + total_failed = import_stats["failed"] + test_stats["failed"] + total_tasks = import_stats["total"] + test_stats["total"] + + return BackgroundTasksStatsResponse( + total_tasks=total_tasks, + running=total_running, + completed=total_completed, + failed=total_failed, + tasks_today=import_stats["today"] + test_stats["today"], + avg_duration_seconds=test_stats.get("avg_duration"), + import_jobs=import_stats, + test_runs=test_stats, + ) + + +@router.get("/tasks/running", response_model=list[BackgroundTaskResponse]) +async def list_running_tasks( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + List currently running background tasks + """ + tasks = [] + + # Running imports + running_imports = background_tasks_service.get_running_imports(db) + tasks.extend([_convert_import_to_response(job) for job in running_imports]) + + # Running test runs + running_tests = background_tasks_service.get_running_test_runs(db) + tasks.extend([_convert_test_run_to_response(run) for run in running_tests]) + + return tasks diff --git a/app/routes/admin_pages.py b/app/routes/admin_pages.py index 88e19fcd..75f6a77a 100644 --- a/app/routes/admin_pages.py +++ b/app/routes/admin_pages.py @@ -500,6 +500,25 @@ async def admin_imports_page( ) +@router.get("/background-tasks", response_class=HTMLResponse, include_in_schema=False) +async def admin_background_tasks_page( + request: Request, + current_user: User = Depends(get_current_admin_from_cookie_or_header), + db: Session = Depends(get_db), +): + """ + Render background tasks monitoring page. + Shows running and completed background tasks across the system. + """ + return templates.TemplateResponse( + "admin/background-tasks.html", + { + "request": request, + "user": current_user, + }, + ) + + @router.get("/marketplace", response_class=HTMLResponse, include_in_schema=False) async def admin_marketplace_page( request: Request, diff --git a/app/services/background_tasks_service.py b/app/services/background_tasks_service.py new file mode 100644 index 00000000..c4f77f88 --- /dev/null +++ b/app/services/background_tasks_service.py @@ -0,0 +1,116 @@ +# app/services/background_tasks_service.py +""" +Background Tasks Service +Service for monitoring background tasks across the system +""" + +from datetime import UTC, datetime + +from sqlalchemy import desc, func +from sqlalchemy.orm import Session + +from models.database.marketplace_import_job import MarketplaceImportJob +from models.database.test_run import TestRun + + +class BackgroundTasksService: + """Service for monitoring background tasks""" + + def get_import_jobs( + self, db: Session, status: str | None = None, limit: int = 50 + ) -> list[MarketplaceImportJob]: + """Get import jobs with optional status filter""" + query = db.query(MarketplaceImportJob) + if status: + query = query.filter(MarketplaceImportJob.status == status) + return query.order_by(desc(MarketplaceImportJob.created_at)).limit(limit).all() + + def get_test_runs( + self, db: Session, status: str | None = None, limit: int = 50 + ) -> list[TestRun]: + """Get test runs with optional status filter""" + query = db.query(TestRun) + if status: + query = query.filter(TestRun.status == status) + return query.order_by(desc(TestRun.timestamp)).limit(limit).all() + + def get_running_imports(self, db: Session) -> list[MarketplaceImportJob]: + """Get currently running import jobs""" + return ( + db.query(MarketplaceImportJob) + .filter(MarketplaceImportJob.status == "processing") + .all() + ) + + def get_running_test_runs(self, db: Session) -> list[TestRun]: + """Get currently running test runs""" + return db.query(TestRun).filter(TestRun.status == "running").all() + + def get_import_stats(self, db: Session) -> dict: + """Get import job statistics""" + today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) + + stats = db.query( + func.count(MarketplaceImportJob.id).label("total"), + func.sum( + func.case((MarketplaceImportJob.status == "processing", 1), else_=0) + ).label("running"), + func.sum( + func.case( + (MarketplaceImportJob.status.in_(["completed", "completed_with_errors"]), 1), + else_=0, + ) + ).label("completed"), + func.sum( + func.case((MarketplaceImportJob.status == "failed", 1), else_=0) + ).label("failed"), + ).first() + + today_count = ( + db.query(func.count(MarketplaceImportJob.id)) + .filter(MarketplaceImportJob.created_at >= today_start) + .scalar() + or 0 + ) + + return { + "total": stats.total or 0, + "running": stats.running or 0, + "completed": stats.completed or 0, + "failed": stats.failed or 0, + "today": today_count, + } + + def get_test_run_stats(self, db: Session) -> dict: + """Get test run statistics""" + today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) + + stats = db.query( + func.count(TestRun.id).label("total"), + func.sum(func.case((TestRun.status == "running", 1), else_=0)).label("running"), + func.sum(func.case((TestRun.status == "passed", 1), else_=0)).label("completed"), + func.sum( + func.case((TestRun.status.in_(["failed", "error"]), 1), else_=0) + ).label("failed"), + func.avg(TestRun.duration_seconds).label("avg_duration"), + ).first() + + today_count = ( + db.query(func.count(TestRun.id)) + .filter(TestRun.timestamp >= today_start) + .scalar() + or 0 + ) + + return { + "total": stats.total or 0, + "running": stats.running or 0, + "completed": stats.completed or 0, + "failed": stats.failed or 0, + "today": today_count, + "avg_duration": round(stats.avg_duration or 0, 1), + } + + +# Singleton instance +background_tasks_service = BackgroundTasksService() diff --git a/app/templates/admin/background-tasks.html b/app/templates/admin/background-tasks.html new file mode 100644 index 00000000..2a21d958 --- /dev/null +++ b/app/templates/admin/background-tasks.html @@ -0,0 +1,257 @@ +{# app/templates/admin/background-tasks.html #} +{% extends "admin/base.html" %} +{% from 'shared/macros/alerts.html' import loading_state, error_state %} +{% from 'shared/macros/headers.html' import page_header_flex, refresh_button %} + +{% block title %}Background Tasks{% endblock %} + +{% block alpine_data %}backgroundTasks(){% endblock %} + +{% block extra_scripts %} + +{% endblock %} + +{% block content %} +{% call page_header_flex(title='Background Tasks', subtitle='Monitor running and completed background tasks') %} + {{ refresh_button(variant='secondary') }} +{% endcall %} + +{{ loading_state('Loading tasks...') }} + +{{ error_state('Error loading tasks') }} + + +
Running
+0
+Today
+0
+Failed
+0
+Total
+0
++ Started by + at +
+elapsed
+Total
+Running
+Completed
+Failed
+Total
+Running
+Passed
+Failed
+| Type | +Description | +Started | +Duration | +Triggered By | +Status | +
|---|---|---|---|---|---|
| + + + | ++ + + | ++ | + | + | + + + | +
No tasks found
+