# app/api/v1/admin/platform_health.py """ Platform health and capacity monitoring endpoints. Provides: - Overall platform health status - Capacity metrics and thresholds - Scaling recommendations """ import logging import os import platform import psutil from datetime import datetime from fastapi import APIRouter, Depends from pydantic import BaseModel from sqlalchemy import func, text from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db from app.services.image_service import image_service from models.database.inventory import Inventory from models.database.order import Order from models.database.product import Product from models.database.user import User from models.database.vendor import Vendor router = APIRouter() logger = logging.getLogger(__name__) # ============================================================================ # Schemas # ============================================================================ class SystemMetrics(BaseModel): """System resource metrics.""" cpu_percent: float memory_percent: float memory_used_gb: float memory_total_gb: float disk_percent: float disk_used_gb: float disk_total_gb: float class DatabaseMetrics(BaseModel): """Database metrics.""" size_mb: float products_count: int orders_count: int vendors_count: int inventory_count: int class ImageStorageMetrics(BaseModel): """Image storage metrics.""" total_files: int total_size_mb: float total_size_gb: float max_files_per_dir: int products_estimated: int class CapacityThreshold(BaseModel): """Capacity threshold status.""" name: str current: float warning: float critical: float limit: float status: str # ok, warning, critical percent_used: float class ScalingRecommendation(BaseModel): """Scaling recommendation.""" priority: str # info, warning, critical title: str description: str action: str | None = None class PlatformHealthResponse(BaseModel): """Complete platform health response.""" timestamp: str overall_status: str # healthy, degraded, critical system: SystemMetrics database: DatabaseMetrics image_storage: ImageStorageMetrics thresholds: list[CapacityThreshold] recommendations: list[ScalingRecommendation] infrastructure_tier: str next_tier_trigger: str | None = None class CapacityMetricsResponse(BaseModel): """Capacity-focused metrics.""" products_total: int products_by_vendor: dict[str, int] images_total: int storage_used_gb: float database_size_mb: float orders_this_month: int active_vendors: int # ============================================================================ # Thresholds Configuration # ============================================================================ CAPACITY_THRESHOLDS = { "products_total": { "warning": 400_000, "critical": 475_000, "limit": 500_000, }, "storage_gb": { "warning": 800, "critical": 950, "limit": 1000, }, "db_size_mb": { "warning": 20_000, "critical": 24_000, "limit": 25_000, }, "disk_percent": { "warning": 70, "critical": 85, "limit": 100, }, "memory_percent": { "warning": 75, "critical": 90, "limit": 100, }, "cpu_percent": { "warning": 70, "critical": 85, "limit": 100, }, } INFRASTRUCTURE_TIERS = [ {"name": "Starter", "max_clients": 50, "max_products": 10_000}, {"name": "Small", "max_clients": 100, "max_products": 30_000}, {"name": "Medium", "max_clients": 300, "max_products": 100_000}, {"name": "Large", "max_clients": 500, "max_products": 250_000}, {"name": "Scale", "max_clients": 1000, "max_products": 500_000}, {"name": "Enterprise", "max_clients": None, "max_products": None}, ] # ============================================================================ # Endpoints # ============================================================================ @router.get("/health", response_model=PlatformHealthResponse) async def get_platform_health( db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): """Get comprehensive platform health status. Returns system metrics, database stats, storage info, and recommendations. """ # System metrics system = _get_system_metrics() # Database metrics database = _get_database_metrics(db) # Image storage metrics image_stats = image_service.get_storage_stats() image_storage = ImageStorageMetrics( total_files=image_stats["total_files"], total_size_mb=image_stats["total_size_mb"], total_size_gb=image_stats["total_size_gb"], max_files_per_dir=image_stats["max_files_per_dir"], products_estimated=image_stats["products_estimated"], ) # Calculate thresholds thresholds = _calculate_thresholds(system, database, image_storage) # Generate recommendations recommendations = _generate_recommendations(thresholds, database) # Determine infrastructure tier tier, next_trigger = _determine_tier(database.vendors_count, database.products_count) # Overall status overall_status = _determine_overall_status(thresholds) return PlatformHealthResponse( timestamp=datetime.utcnow().isoformat(), overall_status=overall_status, system=system, database=database, image_storage=image_storage, thresholds=thresholds, recommendations=recommendations, infrastructure_tier=tier, next_tier_trigger=next_trigger, ) @router.get("/capacity", response_model=CapacityMetricsResponse) async def get_capacity_metrics( db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): """Get capacity-focused metrics for planning.""" # Products total products_total = db.query(func.count(Product.id)).scalar() or 0 # Products by vendor vendor_counts = ( db.query(Vendor.name, func.count(Product.id)) .join(Product, Vendor.id == Product.vendor_id) .group_by(Vendor.name) .all() ) products_by_vendor = {name or "Unknown": count for name, count in vendor_counts} # Image storage image_stats = image_service.get_storage_stats() # Database size (approximate for SQLite) db_size = _get_database_size(db) # Orders this month start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0) orders_this_month = ( db.query(func.count(Order.id)) .filter(Order.created_at >= start_of_month) .scalar() or 0 ) # Active vendors active_vendors = db.query(func.count(Vendor.id)).filter(Vendor.is_active == True).scalar() or 0 # noqa: E712 return CapacityMetricsResponse( products_total=products_total, products_by_vendor=products_by_vendor, images_total=image_stats["total_files"], storage_used_gb=image_stats["total_size_gb"], database_size_mb=db_size, orders_this_month=orders_this_month, active_vendors=active_vendors, ) # ============================================================================ # Helper Functions # ============================================================================ def _get_system_metrics() -> SystemMetrics: """Get current system resource metrics.""" cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() disk = psutil.disk_usage("/") return SystemMetrics( cpu_percent=cpu_percent, memory_percent=memory.percent, memory_used_gb=round(memory.used / (1024**3), 2), memory_total_gb=round(memory.total / (1024**3), 2), disk_percent=disk.percent, disk_used_gb=round(disk.used / (1024**3), 2), disk_total_gb=round(disk.total / (1024**3), 2), ) def _get_database_metrics(db: Session) -> DatabaseMetrics: """Get database statistics.""" products_count = db.query(func.count(Product.id)).scalar() or 0 orders_count = db.query(func.count(Order.id)).scalar() or 0 vendors_count = db.query(func.count(Vendor.id)).scalar() or 0 inventory_count = db.query(func.count(Inventory.id)).scalar() or 0 db_size = _get_database_size(db) return DatabaseMetrics( size_mb=db_size, products_count=products_count, orders_count=orders_count, vendors_count=vendors_count, inventory_count=inventory_count, ) def _get_database_size(db: Session) -> float: """Get database size in MB.""" try: # Try SQLite approach result = db.execute(text("SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()")) row = result.fetchone() if row: return round(row[0] / (1024 * 1024), 2) except Exception: pass try: # Try PostgreSQL approach result = db.execute(text("SELECT pg_database_size(current_database())")) row = result.fetchone() if row: return round(row[0] / (1024 * 1024), 2) except Exception: pass return 0.0 def _calculate_thresholds( system: SystemMetrics, database: DatabaseMetrics, image_storage: ImageStorageMetrics, ) -> list[CapacityThreshold]: """Calculate threshold status for each metric.""" thresholds = [] # Products threshold products_config = CAPACITY_THRESHOLDS["products_total"] thresholds.append( _create_threshold( "Products", database.products_count, products_config["warning"], products_config["critical"], products_config["limit"], ) ) # Storage threshold storage_config = CAPACITY_THRESHOLDS["storage_gb"] thresholds.append( _create_threshold( "Image Storage (GB)", image_storage.total_size_gb, storage_config["warning"], storage_config["critical"], storage_config["limit"], ) ) # Database size threshold db_config = CAPACITY_THRESHOLDS["db_size_mb"] thresholds.append( _create_threshold( "Database (MB)", database.size_mb, db_config["warning"], db_config["critical"], db_config["limit"], ) ) # Disk threshold disk_config = CAPACITY_THRESHOLDS["disk_percent"] thresholds.append( _create_threshold( "Disk Usage (%)", system.disk_percent, disk_config["warning"], disk_config["critical"], disk_config["limit"], ) ) # Memory threshold memory_config = CAPACITY_THRESHOLDS["memory_percent"] thresholds.append( _create_threshold( "Memory Usage (%)", system.memory_percent, memory_config["warning"], memory_config["critical"], memory_config["limit"], ) ) # CPU threshold cpu_config = CAPACITY_THRESHOLDS["cpu_percent"] thresholds.append( _create_threshold( "CPU Usage (%)", system.cpu_percent, cpu_config["warning"], cpu_config["critical"], cpu_config["limit"], ) ) return thresholds def _create_threshold( name: str, current: float, warning: float, critical: float, limit: float ) -> CapacityThreshold: """Create a threshold status object.""" percent_used = (current / limit) * 100 if limit > 0 else 0 if current >= critical: status = "critical" elif current >= warning: status = "warning" else: status = "ok" return CapacityThreshold( name=name, current=current, warning=warning, critical=critical, limit=limit, status=status, percent_used=round(percent_used, 1), ) def _generate_recommendations( thresholds: list[CapacityThreshold], database: DatabaseMetrics ) -> list[ScalingRecommendation]: """Generate scaling recommendations based on thresholds.""" recommendations = [] for threshold in thresholds: if threshold.status == "critical": recommendations.append( ScalingRecommendation( priority="critical", title=f"{threshold.name} at critical level", description=f"Currently at {threshold.percent_used:.0f}% of capacity ({threshold.current:.0f} of {threshold.limit:.0f})", action="Immediate scaling or cleanup required", ) ) elif threshold.status == "warning": recommendations.append( ScalingRecommendation( priority="warning", title=f"{threshold.name} approaching limit", description=f"Currently at {threshold.percent_used:.0f}% of capacity ({threshold.current:.0f} of {threshold.limit:.0f})", action="Plan scaling in the next 2-4 weeks", ) ) # Add tier-based recommendations if database.vendors_count > 0: tier, next_trigger = _determine_tier(database.vendors_count, database.products_count) if next_trigger: recommendations.append( ScalingRecommendation( priority="info", title=f"Current tier: {tier}", description=next_trigger, action="Review capacity planning documentation", ) ) # If no issues, add positive status if not recommendations: recommendations.append( ScalingRecommendation( priority="info", title="All systems healthy", description="No capacity concerns at this time", action=None, ) ) return recommendations def _determine_tier(vendors: int, products: int) -> tuple[str, str | None]: """Determine current infrastructure tier and next trigger.""" current_tier = "Starter" next_trigger = None for i, tier in enumerate(INFRASTRUCTURE_TIERS): max_clients = tier["max_clients"] max_products = tier["max_products"] if max_clients is None: current_tier = tier["name"] break if vendors <= max_clients and products <= max_products: current_tier = tier["name"] # Check proximity to next tier if i < len(INFRASTRUCTURE_TIERS) - 1: next_tier = INFRASTRUCTURE_TIERS[i + 1] vendor_percent = (vendors / max_clients) * 100 product_percent = (products / max_products) * 100 if vendor_percent > 70 or product_percent > 70: next_trigger = ( f"Approaching {next_tier['name']} tier " f"(vendors: {vendor_percent:.0f}%, products: {product_percent:.0f}%)" ) break return current_tier, next_trigger def _determine_overall_status(thresholds: list[CapacityThreshold]) -> str: """Determine overall platform status.""" statuses = [t.status for t in thresholds] if "critical" in statuses: return "critical" elif "warning" in statuses: return "degraded" else: return "healthy"