From 280e719a7a1e41e271f6e29b5468efd1f174c45d Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Fri, 12 Dec 2025 23:28:16 +0100 Subject: [PATCH] feat: add background tasks monitoring dashboard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a unified view of all background tasks (imports and test runs) under Platform Monitoring. Includes real-time status polling, statistics overview, and task history. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- app/api/v1/admin/__init__.py | 7 +- app/api/v1/admin/background_tasks.py | 189 ++++++++++++++++ app/routes/admin_pages.py | 19 ++ app/services/background_tasks_service.py | 116 ++++++++++ app/templates/admin/background-tasks.html | 257 ++++++++++++++++++++++ app/templates/admin/partials/sidebar.html | 1 + static/admin/js/background-tasks.js | 128 +++++++++++ 7 files changed, 715 insertions(+), 2 deletions(-) create mode 100644 app/api/v1/admin/background_tasks.py create mode 100644 app/services/background_tasks_service.py create mode 100644 app/templates/admin/background-tasks.html create mode 100644 static/admin/js/background-tasks.js diff --git a/app/api/v1/admin/__init__.py b/app/api/v1/admin/__init__.py index cd085c1b..6cc3455f 100644 --- a/app/api/v1/admin/__init__.py +++ b/app/api/v1/admin/__init__.py @@ -27,6 +27,7 @@ from fastapi import APIRouter from . import ( audit, auth, + background_tasks, code_quality, companies, content_pages, @@ -118,8 +119,10 @@ router.include_router(marketplace.router, tags=["admin-marketplace"]) # Platform Administration # ============================================================================ -# Include monitoring endpoints (placeholder for future implementation) -# router.include_router(monitoring.router, tags=["admin-monitoring"]) +# Include background tasks monitoring endpoints +router.include_router( + background_tasks.router, prefix="/background-tasks", tags=["admin-background-tasks"] +) # Include audit logging endpoints router.include_router(audit.router, tags=["admin-audit"]) diff --git a/app/api/v1/admin/background_tasks.py b/app/api/v1/admin/background_tasks.py new file mode 100644 index 00000000..19efa712 --- /dev/null +++ b/app/api/v1/admin/background_tasks.py @@ -0,0 +1,189 @@ +# app/api/v1/admin/background_tasks.py +""" +Background Tasks Monitoring API +Provides unified view of all background tasks across the system +""" + +from datetime import UTC, datetime + +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from app.api.deps import get_current_admin_api +from app.core.database import get_db +from app.services.background_tasks_service import background_tasks_service +from models.database.user import User + +router = APIRouter() + + +class BackgroundTaskResponse(BaseModel): + """Unified background task response""" + + id: int + task_type: str # 'import' or 'test_run' + status: str + started_at: str | None + completed_at: str | None + duration_seconds: float | None + description: str + triggered_by: str | None + error_message: str | None + details: dict | None + + +class BackgroundTasksStatsResponse(BaseModel): + """Statistics for background tasks""" + + total_tasks: int + running: int + completed: int + failed: int + tasks_today: int + avg_duration_seconds: float | None + + # By type + import_jobs: dict + test_runs: dict + + +def _convert_import_to_response(job) -> BackgroundTaskResponse: + """Convert MarketplaceImportJob to BackgroundTaskResponse""" + duration = None + if job.started_at and job.completed_at: + duration = (job.completed_at - job.started_at).total_seconds() + elif job.started_at and job.status == "processing": + duration = (datetime.now(UTC) - job.started_at).total_seconds() + + return BackgroundTaskResponse( + id=job.id, + task_type="import", + status=job.status, + started_at=job.started_at.isoformat() if job.started_at else None, + completed_at=job.completed_at.isoformat() if job.completed_at else None, + duration_seconds=duration, + description=f"Import from {job.marketplace}: {job.source_url[:50]}..." if len(job.source_url) > 50 else f"Import from {job.marketplace}: {job.source_url}", + triggered_by=job.user.username if job.user else None, + error_message=job.error_message, + details={ + "marketplace": job.marketplace, + "vendor_id": job.vendor_id, + "imported": job.imported_count, + "updated": job.updated_count, + "errors": job.error_count, + "total_processed": job.total_processed, + }, + ) + + +def _convert_test_run_to_response(run) -> BackgroundTaskResponse: + """Convert TestRun to BackgroundTaskResponse""" + duration = run.duration_seconds + if run.status == "running" and run.timestamp: + duration = (datetime.now(UTC) - run.timestamp).total_seconds() + + return BackgroundTaskResponse( + id=run.id, + task_type="test_run", + status=run.status, + started_at=run.timestamp.isoformat() if run.timestamp else None, + completed_at=None, + duration_seconds=duration, + description=f"Test run: {run.test_path}", + triggered_by=run.triggered_by, + error_message=None, + details={ + "test_path": run.test_path, + "total_tests": run.total_tests, + "passed": run.passed, + "failed": run.failed, + "errors": run.errors, + "pass_rate": run.pass_rate, + "git_branch": run.git_branch, + }, + ) + + +@router.get("/tasks", response_model=list[BackgroundTaskResponse]) +async def list_background_tasks( + status: str | None = Query(None, description="Filter by status"), + task_type: str | None = Query(None, description="Filter by type (import, test_run)"), + limit: int = Query(50, ge=1, le=200), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + List all background tasks across the system + + Returns a unified view of import jobs and test runs. + """ + tasks = [] + + # Get import jobs + if task_type is None or task_type == "import": + import_jobs = background_tasks_service.get_import_jobs(db, status=status, limit=limit) + tasks.extend([_convert_import_to_response(job) for job in import_jobs]) + + # Get test runs + if task_type is None or task_type == "test_run": + test_runs = background_tasks_service.get_test_runs(db, status=status, limit=limit) + tasks.extend([_convert_test_run_to_response(run) for run in test_runs]) + + # Sort by start time (most recent first) + tasks.sort( + key=lambda t: t.started_at or "1970-01-01T00:00:00", + reverse=True, + ) + + return tasks[:limit] + + +@router.get("/tasks/stats", response_model=BackgroundTasksStatsResponse) +async def get_background_tasks_stats( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + Get statistics for background tasks + """ + import_stats = background_tasks_service.get_import_stats(db) + test_stats = background_tasks_service.get_test_run_stats(db) + + # Combined stats + total_running = import_stats["running"] + test_stats["running"] + total_completed = import_stats["completed"] + test_stats["completed"] + total_failed = import_stats["failed"] + test_stats["failed"] + total_tasks = import_stats["total"] + test_stats["total"] + + return BackgroundTasksStatsResponse( + total_tasks=total_tasks, + running=total_running, + completed=total_completed, + failed=total_failed, + tasks_today=import_stats["today"] + test_stats["today"], + avg_duration_seconds=test_stats.get("avg_duration"), + import_jobs=import_stats, + test_runs=test_stats, + ) + + +@router.get("/tasks/running", response_model=list[BackgroundTaskResponse]) +async def list_running_tasks( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_api), +): + """ + List currently running background tasks + """ + tasks = [] + + # Running imports + running_imports = background_tasks_service.get_running_imports(db) + tasks.extend([_convert_import_to_response(job) for job in running_imports]) + + # Running test runs + running_tests = background_tasks_service.get_running_test_runs(db) + tasks.extend([_convert_test_run_to_response(run) for run in running_tests]) + + return tasks diff --git a/app/routes/admin_pages.py b/app/routes/admin_pages.py index 88e19fcd..75f6a77a 100644 --- a/app/routes/admin_pages.py +++ b/app/routes/admin_pages.py @@ -500,6 +500,25 @@ async def admin_imports_page( ) +@router.get("/background-tasks", response_class=HTMLResponse, include_in_schema=False) +async def admin_background_tasks_page( + request: Request, + current_user: User = Depends(get_current_admin_from_cookie_or_header), + db: Session = Depends(get_db), +): + """ + Render background tasks monitoring page. + Shows running and completed background tasks across the system. + """ + return templates.TemplateResponse( + "admin/background-tasks.html", + { + "request": request, + "user": current_user, + }, + ) + + @router.get("/marketplace", response_class=HTMLResponse, include_in_schema=False) async def admin_marketplace_page( request: Request, diff --git a/app/services/background_tasks_service.py b/app/services/background_tasks_service.py new file mode 100644 index 00000000..c4f77f88 --- /dev/null +++ b/app/services/background_tasks_service.py @@ -0,0 +1,116 @@ +# app/services/background_tasks_service.py +""" +Background Tasks Service +Service for monitoring background tasks across the system +""" + +from datetime import UTC, datetime + +from sqlalchemy import desc, func +from sqlalchemy.orm import Session + +from models.database.marketplace_import_job import MarketplaceImportJob +from models.database.test_run import TestRun + + +class BackgroundTasksService: + """Service for monitoring background tasks""" + + def get_import_jobs( + self, db: Session, status: str | None = None, limit: int = 50 + ) -> list[MarketplaceImportJob]: + """Get import jobs with optional status filter""" + query = db.query(MarketplaceImportJob) + if status: + query = query.filter(MarketplaceImportJob.status == status) + return query.order_by(desc(MarketplaceImportJob.created_at)).limit(limit).all() + + def get_test_runs( + self, db: Session, status: str | None = None, limit: int = 50 + ) -> list[TestRun]: + """Get test runs with optional status filter""" + query = db.query(TestRun) + if status: + query = query.filter(TestRun.status == status) + return query.order_by(desc(TestRun.timestamp)).limit(limit).all() + + def get_running_imports(self, db: Session) -> list[MarketplaceImportJob]: + """Get currently running import jobs""" + return ( + db.query(MarketplaceImportJob) + .filter(MarketplaceImportJob.status == "processing") + .all() + ) + + def get_running_test_runs(self, db: Session) -> list[TestRun]: + """Get currently running test runs""" + return db.query(TestRun).filter(TestRun.status == "running").all() + + def get_import_stats(self, db: Session) -> dict: + """Get import job statistics""" + today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) + + stats = db.query( + func.count(MarketplaceImportJob.id).label("total"), + func.sum( + func.case((MarketplaceImportJob.status == "processing", 1), else_=0) + ).label("running"), + func.sum( + func.case( + (MarketplaceImportJob.status.in_(["completed", "completed_with_errors"]), 1), + else_=0, + ) + ).label("completed"), + func.sum( + func.case((MarketplaceImportJob.status == "failed", 1), else_=0) + ).label("failed"), + ).first() + + today_count = ( + db.query(func.count(MarketplaceImportJob.id)) + .filter(MarketplaceImportJob.created_at >= today_start) + .scalar() + or 0 + ) + + return { + "total": stats.total or 0, + "running": stats.running or 0, + "completed": stats.completed or 0, + "failed": stats.failed or 0, + "today": today_count, + } + + def get_test_run_stats(self, db: Session) -> dict: + """Get test run statistics""" + today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) + + stats = db.query( + func.count(TestRun.id).label("total"), + func.sum(func.case((TestRun.status == "running", 1), else_=0)).label("running"), + func.sum(func.case((TestRun.status == "passed", 1), else_=0)).label("completed"), + func.sum( + func.case((TestRun.status.in_(["failed", "error"]), 1), else_=0) + ).label("failed"), + func.avg(TestRun.duration_seconds).label("avg_duration"), + ).first() + + today_count = ( + db.query(func.count(TestRun.id)) + .filter(TestRun.timestamp >= today_start) + .scalar() + or 0 + ) + + return { + "total": stats.total or 0, + "running": stats.running or 0, + "completed": stats.completed or 0, + "failed": stats.failed or 0, + "today": today_count, + "avg_duration": round(stats.avg_duration or 0, 1), + } + + +# Singleton instance +background_tasks_service = BackgroundTasksService() diff --git a/app/templates/admin/background-tasks.html b/app/templates/admin/background-tasks.html new file mode 100644 index 00000000..2a21d958 --- /dev/null +++ b/app/templates/admin/background-tasks.html @@ -0,0 +1,257 @@ +{# app/templates/admin/background-tasks.html #} +{% extends "admin/base.html" %} +{% from 'shared/macros/alerts.html' import loading_state, error_state %} +{% from 'shared/macros/headers.html' import page_header_flex, refresh_button %} + +{% block title %}Background Tasks{% endblock %} + +{% block alpine_data %}backgroundTasks(){% endblock %} + +{% block extra_scripts %} + +{% endblock %} + +{% block content %} +{% call page_header_flex(title='Background Tasks', subtitle='Monitor running and completed background tasks') %} + {{ refresh_button(variant='secondary') }} +{% endcall %} + +{{ loading_state('Loading tasks...') }} + +{{ error_state('Error loading tasks') }} + + +
+ +
+ +
+
+ +
+
+

Running

+

0

+
+
+ + +
+
+ +
+
+

Today

+

0

+
+
+ + +
+
+ +
+
+

Failed

+

0

+
+
+ + +
+
+ +
+
+

Total

+

0

+
+
+
+ + +
+
+

+ + Currently Running +

+
+ +
+
+
+ + +
+ +
+

+ + Import Jobs +

+
+
+

+

Total

+
+
+

+

Running

+
+
+

+

Completed

+
+
+

+

Failed

+
+
+ +
+ + +
+

+ + Test Runs +

+
+
+

+

Total

+
+
+

+

Running

+
+
+

+

Passed

+
+
+

+

Failed

+
+
+ +
+
+ + +
+
+ + + +
+
+ + +
+

+ Recent Tasks +

+
+ + + + + + + + + + + + + + +
TypeDescriptionStartedDurationTriggered ByStatus
+
+ +
+
+{% endblock %} diff --git a/app/templates/admin/partials/sidebar.html b/app/templates/admin/partials/sidebar.html index 561926cf..81ce0039 100644 --- a/app/templates/admin/partials/sidebar.html +++ b/app/templates/admin/partials/sidebar.html @@ -103,6 +103,7 @@ {{ section_header('Platform Monitoring', 'monitoring') }} {% call section_content('monitoring') %} + {{ menu_item('background-tasks', '/admin/background-tasks', 'collection', 'Background Tasks') }} {{ menu_item('imports', '/admin/imports', 'cube', 'Import Jobs') }} {{ menu_item('logs', '/admin/logs', 'document-text', 'Application Logs') }} {% endcall %} diff --git a/static/admin/js/background-tasks.js b/static/admin/js/background-tasks.js new file mode 100644 index 00000000..bc7fd04a --- /dev/null +++ b/static/admin/js/background-tasks.js @@ -0,0 +1,128 @@ +/** + * Background Tasks Monitoring Component + * Manages the background tasks monitoring page + */ + +// Use centralized logger +const backgroundTasksLog = window.LogConfig.createLogger('BACKGROUND-TASKS'); + +function backgroundTasks() { + return { + // Extend base data + ...data(), + + // Set current page for navigation + currentPage: 'background-tasks', + + // Page-specific data + loading: false, + error: null, + filterType: null, + pollInterval: null, + + // Statistics + stats: { + total_tasks: 0, + running: 0, + completed: 0, + failed: 0, + tasks_today: 0, + avg_duration_seconds: null, + import_jobs: {}, + test_runs: {} + }, + + // Tasks + tasks: [], + runningTasks: [], + + async init() { + backgroundTasksLog.info('Initializing background tasks monitor'); + await this.loadStats(); + await this.loadTasks(); + await this.loadRunningTasks(); + + // Poll for updates every 5 seconds + this.pollInterval = setInterval(() => { + this.loadRunningTasks(); + if (this.runningTasks.length > 0) { + this.loadStats(); + } + }, 5000); + }, + + destroy() { + if (this.pollInterval) { + clearInterval(this.pollInterval); + } + }, + + async loadStats() { + try { + const stats = await apiClient.get('/admin/background-tasks/tasks/stats'); + this.stats = stats; + backgroundTasksLog.info('Stats loaded:', stats); + } catch (err) { + backgroundTasksLog.error('Failed to load stats:', err); + } + }, + + async loadTasks() { + this.loading = true; + this.error = null; + + try { + let url = '/admin/background-tasks/tasks?limit=50'; + if (this.filterType) { + url += `&task_type=${this.filterType}`; + } + + const tasks = await apiClient.get(url); + this.tasks = tasks; + backgroundTasksLog.info('Tasks loaded:', tasks.length); + } catch (err) { + backgroundTasksLog.error('Failed to load tasks:', err); + this.error = err.message; + + if (err.message.includes('Unauthorized')) { + window.location.href = '/admin/login'; + } + } finally { + this.loading = false; + } + }, + + async loadRunningTasks() { + try { + const running = await apiClient.get('/admin/background-tasks/tasks/running'); + this.runningTasks = running; + + // Update elapsed time for running tasks + const now = new Date(); + this.runningTasks.forEach(task => { + if (task.started_at) { + const started = new Date(task.started_at); + task.duration_seconds = (now - started) / 1000; + } + }); + } catch (err) { + backgroundTasksLog.error('Failed to load running tasks:', err); + } + }, + + async refresh() { + await this.loadStats(); + await this.loadTasks(); + await this.loadRunningTasks(); + }, + + formatDuration(seconds) { + if (seconds === null || seconds === undefined) return 'N/A'; + if (seconds < 1) return `${Math.round(seconds * 1000)}ms`; + if (seconds < 60) return `${Math.round(seconds)}s`; + const minutes = Math.floor(seconds / 60); + const secs = Math.round(seconds % 60); + return `${minutes}m ${secs}s`; + } + }; +}