# app/modules/monitoring/services/background_tasks_service.py """ Background Tasks Service Service for monitoring background tasks across the system """ from datetime import UTC, datetime from sqlalchemy import case, desc, func from sqlalchemy.orm import Session from app.modules.dev_tools.models import ArchitectureScan, TestRun from app.modules.marketplace.models import MarketplaceImportJob class BackgroundTasksService: """Service for monitoring background tasks""" def get_import_jobs( self, db: Session, status: str | None = None, limit: int = 50 ) -> list[MarketplaceImportJob]: """Get import jobs with optional status filter""" query = db.query(MarketplaceImportJob) if status: query = query.filter(MarketplaceImportJob.status == status) return query.order_by(desc(MarketplaceImportJob.created_at)).limit(limit).all() def get_test_runs( self, db: Session, status: str | None = None, limit: int = 50 ) -> list[TestRun]: """Get test runs with optional status filter""" query = db.query(TestRun) if status: query = query.filter(TestRun.status == status) return query.order_by(desc(TestRun.timestamp)).limit(limit).all() def get_running_imports(self, db: Session) -> list[MarketplaceImportJob]: """Get currently running import jobs""" return ( db.query(MarketplaceImportJob) .filter(MarketplaceImportJob.status == "processing") .all() ) def get_running_test_runs(self, db: Session) -> list[TestRun]: """Get currently running test runs""" # noqa: SVC-005 - Platform-level, TestRuns not store-scoped return db.query(TestRun).filter(TestRun.status == "running").all() def get_import_stats(self, db: Session) -> dict: """Get import job statistics""" today_start = datetime.now(UTC).replace( hour=0, minute=0, second=0, microsecond=0 ) stats = db.query( func.count(MarketplaceImportJob.id).label("total"), func.sum( case((MarketplaceImportJob.status == "processing", 1), else_=0) ).label("running"), func.sum( case( ( MarketplaceImportJob.status.in_( ["completed", "completed_with_errors"] ), 1, ), else_=0, ) ).label("completed"), func.sum( case((MarketplaceImportJob.status == "failed", 1), else_=0) ).label("failed"), ).first() today_count = ( db.query(func.count(MarketplaceImportJob.id)) .filter(MarketplaceImportJob.created_at >= today_start) .scalar() or 0 ) return { "total": stats.total or 0, "running": stats.running or 0, "completed": stats.completed or 0, "failed": stats.failed or 0, "today": today_count, } def get_test_run_stats(self, db: Session) -> dict: """Get test run statistics""" today_start = datetime.now(UTC).replace( hour=0, minute=0, second=0, microsecond=0 ) stats = db.query( func.count(TestRun.id).label("total"), func.sum(case((TestRun.status == "running", 1), else_=0)).label( "running" ), func.sum(case((TestRun.status == "passed", 1), else_=0)).label( "completed" ), func.sum( case((TestRun.status.in_(["failed", "error"]), 1), else_=0) ).label("failed"), func.avg(TestRun.duration_seconds).label("avg_duration"), ).first() today_count = ( db.query(func.count(TestRun.id)) .filter(TestRun.timestamp >= today_start) .scalar() or 0 ) return { "total": stats.total or 0, "running": stats.running or 0, "completed": stats.completed or 0, "failed": stats.failed or 0, "today": today_count, "avg_duration": round(stats.avg_duration or 0, 1), } def get_code_quality_scans( self, db: Session, status: str | None = None, limit: int = 50 ) -> list[ArchitectureScan]: """Get code quality scans with optional status filter""" query = db.query(ArchitectureScan) if status: query = query.filter(ArchitectureScan.status == status) return query.order_by(desc(ArchitectureScan.timestamp)).limit(limit).all() def get_running_scans(self, db: Session) -> list[ArchitectureScan]: """Get currently running code quality scans""" return ( db.query(ArchitectureScan) .filter(ArchitectureScan.status.in_(["pending", "running"])) .all() ) def get_scan_stats(self, db: Session) -> dict: """Get code quality scan statistics""" today_start = datetime.now(UTC).replace( hour=0, minute=0, second=0, microsecond=0 ) stats = db.query( func.count(ArchitectureScan.id).label("total"), func.sum( case( (ArchitectureScan.status.in_(["pending", "running"]), 1), else_=0 ) ).label("running"), func.sum( case( ( ArchitectureScan.status.in_( ["completed", "completed_with_warnings"] ), 1, ), else_=0, ) ).label("completed"), func.sum( case((ArchitectureScan.status == "failed", 1), else_=0) ).label("failed"), func.avg(ArchitectureScan.duration_seconds).label("avg_duration"), ).first() today_count = ( db.query(func.count(ArchitectureScan.id)) .filter(ArchitectureScan.timestamp >= today_start) .scalar() or 0 ) return { "total": stats.total or 0, "running": stats.running or 0, "completed": stats.completed or 0, "failed": stats.failed or 0, "today": today_count, "avg_duration": round(stats.avg_duration or 0, 1), } # Singleton instance background_tasks_service = BackgroundTasksService()