# app/modules/monitoring/services/background_tasks_service.py """ Background Tasks Service Service for monitoring background tasks across the system """ from __future__ import annotations from datetime import UTC, datetime from typing import TYPE_CHECKING from sqlalchemy import case, desc, func from sqlalchemy.orm import Session if TYPE_CHECKING: from app.modules.dev_tools.models import ArchitectureScan, TestRun class BackgroundTasksService: """Service for monitoring background tasks""" def get_import_jobs( self, db: Session, status: str | None = None, limit: int = 50 ) -> list: """Get import jobs with optional status filter""" from app.modules.marketplace.services.marketplace_import_job_service import ( marketplace_import_job_service, ) jobs, _ = marketplace_import_job_service.get_all_import_jobs_paginated( db, status=status, limit=limit, ) return jobs def get_test_runs( self, db: Session, status: str | None = None, limit: int = 50 ) -> list[TestRun]: """Get test runs with optional status filter""" from app.modules.dev_tools.models import TestRun as TestRunModel query = db.query(TestRunModel) if status: query = query.filter(TestRunModel.status == status) return query.order_by(desc(TestRunModel.timestamp)).limit(limit).all() def get_running_imports(self, db: Session) -> list: """Get currently running import jobs""" from app.modules.marketplace.services.marketplace_import_job_service import ( marketplace_import_job_service, ) jobs, _ = marketplace_import_job_service.get_all_import_jobs_paginated( db, status="processing", limit=100, ) return jobs def get_running_test_runs(self, db: Session) -> list[TestRun]: """Get currently running test runs""" from app.modules.dev_tools.models import TestRun as TestRunModel # SVC-005 - Platform-level, TestRuns not store-scoped return db.query(TestRunModel).filter(TestRunModel.status == "running").all() # SVC-005 def get_import_stats(self, db: Session) -> dict: """Get import job statistics""" from app.modules.marketplace.services.marketplace_import_job_service import ( marketplace_import_job_service, ) stats = marketplace_import_job_service.get_import_job_stats(db) return { "total": stats.get("total", 0), "running": stats.get("processing", 0), "completed": stats.get("completed", 0), "failed": stats.get("failed", 0), "today": stats.get("today", 0), } def get_test_run_stats(self, db: Session) -> dict: """Get test run statistics""" from app.modules.dev_tools.models import TestRun as TestRunModel today_start = datetime.now(UTC).replace( hour=0, minute=0, second=0, microsecond=0 ) stats = db.query( func.count(TestRunModel.id).label("total"), func.sum(case((TestRunModel.status == "running", 1), else_=0)).label( "running" ), func.sum(case((TestRunModel.status == "passed", 1), else_=0)).label( "completed" ), func.sum( case((TestRunModel.status.in_(["failed", "error"]), 1), else_=0) ).label("failed"), func.avg(TestRunModel.duration_seconds).label("avg_duration"), ).first() today_count = ( db.query(func.count(TestRunModel.id)) .filter(TestRunModel.timestamp >= today_start) .scalar() or 0 ) return { "total": stats.total or 0, "running": stats.running or 0, "completed": stats.completed or 0, "failed": stats.failed or 0, "today": today_count, "avg_duration": round(stats.avg_duration or 0, 1), } def get_code_quality_scans( self, db: Session, status: str | None = None, limit: int = 50 ) -> list[ArchitectureScan]: """Get code quality scans with optional status filter""" from app.modules.dev_tools.models import ArchitectureScan as ScanModel query = db.query(ScanModel) if status: query = query.filter(ScanModel.status == status) return query.order_by(desc(ScanModel.timestamp)).limit(limit).all() def get_running_scans(self, db: Session) -> list[ArchitectureScan]: """Get currently running code quality scans""" from app.modules.dev_tools.models import ArchitectureScan as ScanModel return ( db.query(ScanModel) .filter(ScanModel.status.in_(["pending", "running"])) .all() ) def get_scan_stats(self, db: Session) -> dict: """Get code quality scan statistics""" from app.modules.dev_tools.models import ArchitectureScan as ScanModel today_start = datetime.now(UTC).replace( hour=0, minute=0, second=0, microsecond=0 ) stats = db.query( func.count(ScanModel.id).label("total"), func.sum( case( (ScanModel.status.in_(["pending", "running"]), 1), else_=0 ) ).label("running"), func.sum( case( ( ScanModel.status.in_( ["completed", "completed_with_warnings"] ), 1, ), else_=0, ) ).label("completed"), func.sum( case((ScanModel.status == "failed", 1), else_=0) ).label("failed"), func.avg(ScanModel.duration_seconds).label("avg_duration"), ).first() today_count = ( db.query(func.count(ScanModel.id)) .filter(ScanModel.timestamp >= today_start) .scalar() or 0 ) return { "total": stats.total or 0, "running": stats.running or 0, "completed": stats.completed or 0, "failed": stats.failed or 0, "today": today_count, "avg_duration": round(stats.avg_duration or 0, 1), } # Singleton instance background_tasks_service = BackgroundTasksService()