From b98c7c553b0a38743aaacfd0d91e2c7697dd7e63 Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Thu, 25 Dec 2025 17:31:04 +0100 Subject: [PATCH] refactor: move business logic to service layer for architecture compliance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - images.py: delegate file validation (size, type, extension) to image_service - platform_health.py: extract all database queries to platform_health_service - Fixes 13 architecture validation errors (API-002, API-003) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- app/api/v1/admin/images.py | 46 +-- app/api/v1/admin/platform_health.py | 412 +--------------------- app/services/image_service.py | 26 +- app/services/platform_health_service.py | 440 ++++++++++++++++++++++++ 4 files changed, 489 insertions(+), 435 deletions(-) create mode 100644 app/services/platform_health_service.py diff --git a/app/api/v1/admin/images.py b/app/api/v1/admin/images.py index 99a0cdf3..e5f11ed2 100644 --- a/app/api/v1/admin/images.py +++ b/app/api/v1/admin/images.py @@ -10,7 +10,7 @@ Provides: import logging -from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile +from fastapi import APIRouter, Depends, File, Form, UploadFile from app.api.deps import get_current_admin_api from app.services.image_service import image_service @@ -24,9 +24,6 @@ from models.schema.image import ( router = APIRouter(prefix="/images") logger = logging.getLogger(__name__) -# Maximum upload size (10MB) -MAX_UPLOAD_SIZE = 10 * 1024 * 1024 - @router.post("/upload", response_model=ImageUploadResponse) async def upload_image( @@ -50,40 +47,21 @@ async def upload_image( Returns: Image URLs and metadata """ - # Validate file size + # Read file content content = await file.read() - if len(content) > MAX_UPLOAD_SIZE: - raise HTTPException( - status_code=413, - detail=f"File too large. Maximum size: {MAX_UPLOAD_SIZE // (1024*1024)}MB", - ) - # Validate content type - if not file.content_type or not file.content_type.startswith("image/"): - raise HTTPException( - status_code=400, - detail="Invalid file type. Only images are allowed.", - ) + # Delegate all validation and processing to service + result = image_service.upload_product_image( + file_content=content, + filename=file.filename or "image.jpg", + content_type=file.content_type, + vendor_id=vendor_id, + product_id=product_id, + ) - try: - result = image_service.upload_product_image( - file_content=content, - filename=file.filename or "image.jpg", - vendor_id=vendor_id, - product_id=product_id, - ) + logger.info(f"Image uploaded: {result['id']} for vendor {vendor_id}") - logger.info(f"Image uploaded: {result['id']} for vendor {vendor_id}") - - return ImageUploadResponse(success=True, image=result) - - except ValueError as e: - logger.warning(f"Image upload failed: {e}") - return ImageUploadResponse(success=False, error=str(e)) - - except Exception as e: - logger.error(f"Image upload error: {e}") - raise HTTPException(status_code=500, detail="Failed to process image") + return ImageUploadResponse(success=True, image=result) @router.delete("/{image_hash}", response_model=ImageDeleteResponse) diff --git a/app/api/v1/admin/platform_health.py b/app/api/v1/admin/platform_health.py index fd2d2b25..8b2cc3da 100644 --- a/app/api/v1/admin/platform_health.py +++ b/app/api/v1/admin/platform_health.py @@ -9,24 +9,15 @@ Provides: """ import logging -import os -import platform -import psutil -from datetime import datetime from fastapi import APIRouter, Depends from pydantic import BaseModel -from sqlalchemy import func, text from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db -from app.services.image_service import image_service -from models.database.inventory import Inventory -from models.database.order import Order -from models.database.product import Product +from app.services.platform_health_service import platform_health_service from models.database.user import User -from models.database.vendor import Vendor router = APIRouter() logger = logging.getLogger(__name__) @@ -116,53 +107,6 @@ class CapacityMetricsResponse(BaseModel): active_vendors: int -# ============================================================================ -# Thresholds Configuration -# ============================================================================ - -CAPACITY_THRESHOLDS = { - "products_total": { - "warning": 400_000, - "critical": 475_000, - "limit": 500_000, - }, - "storage_gb": { - "warning": 800, - "critical": 950, - "limit": 1000, - }, - "db_size_mb": { - "warning": 20_000, - "critical": 24_000, - "limit": 25_000, - }, - "disk_percent": { - "warning": 70, - "critical": 85, - "limit": 100, - }, - "memory_percent": { - "warning": 75, - "critical": 90, - "limit": 100, - }, - "cpu_percent": { - "warning": 70, - "critical": 85, - "limit": 100, - }, -} - -INFRASTRUCTURE_TIERS = [ - {"name": "Starter", "max_clients": 50, "max_products": 10_000}, - {"name": "Small", "max_clients": 100, "max_products": 30_000}, - {"name": "Medium", "max_clients": 300, "max_products": 100_000}, - {"name": "Large", "max_clients": 500, "max_products": 250_000}, - {"name": "Scale", "max_clients": 1000, "max_products": 500_000}, - {"name": "Enterprise", "max_clients": None, "max_products": None}, -] - - # ============================================================================ # Endpoints # ============================================================================ @@ -177,44 +121,18 @@ async def get_platform_health( Returns system metrics, database stats, storage info, and recommendations. """ - # System metrics - system = _get_system_metrics() - - # Database metrics - database = _get_database_metrics(db) - - # Image storage metrics - image_stats = image_service.get_storage_stats() - image_storage = ImageStorageMetrics( - total_files=image_stats["total_files"], - total_size_mb=image_stats["total_size_mb"], - total_size_gb=image_stats["total_size_gb"], - max_files_per_dir=image_stats["max_files_per_dir"], - products_estimated=image_stats["products_estimated"], - ) - - # Calculate thresholds - thresholds = _calculate_thresholds(system, database, image_storage) - - # Generate recommendations - recommendations = _generate_recommendations(thresholds, database) - - # Determine infrastructure tier - tier, next_trigger = _determine_tier(database.vendors_count, database.products_count) - - # Overall status - overall_status = _determine_overall_status(thresholds) + health_data = platform_health_service.get_full_health_report(db) return PlatformHealthResponse( - timestamp=datetime.utcnow().isoformat(), - overall_status=overall_status, - system=system, - database=database, - image_storage=image_storage, - thresholds=thresholds, - recommendations=recommendations, - infrastructure_tier=tier, - next_tier_trigger=next_trigger, + timestamp=health_data["timestamp"], + overall_status=health_data["overall_status"], + system=SystemMetrics(**health_data["system"]), + database=DatabaseMetrics(**health_data["database"]), + image_storage=ImageStorageMetrics(**health_data["image_storage"]), + thresholds=[CapacityThreshold(**t) for t in health_data["thresholds"]], + recommendations=[ScalingRecommendation(**r) for r in health_data["recommendations"]], + infrastructure_tier=health_data["infrastructure_tier"], + next_tier_trigger=health_data["next_tier_trigger"], ) @@ -224,309 +142,5 @@ async def get_capacity_metrics( current_admin: User = Depends(get_current_admin_api), ): """Get capacity-focused metrics for planning.""" - # Products total - products_total = db.query(func.count(Product.id)).scalar() or 0 - - # Products by vendor - vendor_counts = ( - db.query(Vendor.name, func.count(Product.id)) - .join(Product, Vendor.id == Product.vendor_id) - .group_by(Vendor.name) - .all() - ) - products_by_vendor = {name or "Unknown": count for name, count in vendor_counts} - - # Image storage - image_stats = image_service.get_storage_stats() - - # Database size (approximate for SQLite) - db_size = _get_database_size(db) - - # Orders this month - start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0) - orders_this_month = ( - db.query(func.count(Order.id)) - .filter(Order.created_at >= start_of_month) - .scalar() - or 0 - ) - - # Active vendors - active_vendors = db.query(func.count(Vendor.id)).filter(Vendor.is_active == True).scalar() or 0 # noqa: E712 - - return CapacityMetricsResponse( - products_total=products_total, - products_by_vendor=products_by_vendor, - images_total=image_stats["total_files"], - storage_used_gb=image_stats["total_size_gb"], - database_size_mb=db_size, - orders_this_month=orders_this_month, - active_vendors=active_vendors, - ) - - -# ============================================================================ -# Helper Functions -# ============================================================================ - - -def _get_system_metrics() -> SystemMetrics: - """Get current system resource metrics.""" - cpu_percent = psutil.cpu_percent(interval=0.1) - memory = psutil.virtual_memory() - disk = psutil.disk_usage("/") - - return SystemMetrics( - cpu_percent=cpu_percent, - memory_percent=memory.percent, - memory_used_gb=round(memory.used / (1024**3), 2), - memory_total_gb=round(memory.total / (1024**3), 2), - disk_percent=disk.percent, - disk_used_gb=round(disk.used / (1024**3), 2), - disk_total_gb=round(disk.total / (1024**3), 2), - ) - - -def _get_database_metrics(db: Session) -> DatabaseMetrics: - """Get database statistics.""" - products_count = db.query(func.count(Product.id)).scalar() or 0 - orders_count = db.query(func.count(Order.id)).scalar() or 0 - vendors_count = db.query(func.count(Vendor.id)).scalar() or 0 - inventory_count = db.query(func.count(Inventory.id)).scalar() or 0 - - db_size = _get_database_size(db) - - return DatabaseMetrics( - size_mb=db_size, - products_count=products_count, - orders_count=orders_count, - vendors_count=vendors_count, - inventory_count=inventory_count, - ) - - -def _get_database_size(db: Session) -> float: - """Get database size in MB.""" - try: - # Try SQLite approach - result = db.execute(text("SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()")) - row = result.fetchone() - if row: - return round(row[0] / (1024 * 1024), 2) - except Exception: - pass - - try: - # Try PostgreSQL approach - result = db.execute(text("SELECT pg_database_size(current_database())")) - row = result.fetchone() - if row: - return round(row[0] / (1024 * 1024), 2) - except Exception: - pass - - return 0.0 - - -def _calculate_thresholds( - system: SystemMetrics, - database: DatabaseMetrics, - image_storage: ImageStorageMetrics, -) -> list[CapacityThreshold]: - """Calculate threshold status for each metric.""" - thresholds = [] - - # Products threshold - products_config = CAPACITY_THRESHOLDS["products_total"] - thresholds.append( - _create_threshold( - "Products", - database.products_count, - products_config["warning"], - products_config["critical"], - products_config["limit"], - ) - ) - - # Storage threshold - storage_config = CAPACITY_THRESHOLDS["storage_gb"] - thresholds.append( - _create_threshold( - "Image Storage (GB)", - image_storage.total_size_gb, - storage_config["warning"], - storage_config["critical"], - storage_config["limit"], - ) - ) - - # Database size threshold - db_config = CAPACITY_THRESHOLDS["db_size_mb"] - thresholds.append( - _create_threshold( - "Database (MB)", - database.size_mb, - db_config["warning"], - db_config["critical"], - db_config["limit"], - ) - ) - - # Disk threshold - disk_config = CAPACITY_THRESHOLDS["disk_percent"] - thresholds.append( - _create_threshold( - "Disk Usage (%)", - system.disk_percent, - disk_config["warning"], - disk_config["critical"], - disk_config["limit"], - ) - ) - - # Memory threshold - memory_config = CAPACITY_THRESHOLDS["memory_percent"] - thresholds.append( - _create_threshold( - "Memory Usage (%)", - system.memory_percent, - memory_config["warning"], - memory_config["critical"], - memory_config["limit"], - ) - ) - - # CPU threshold - cpu_config = CAPACITY_THRESHOLDS["cpu_percent"] - thresholds.append( - _create_threshold( - "CPU Usage (%)", - system.cpu_percent, - cpu_config["warning"], - cpu_config["critical"], - cpu_config["limit"], - ) - ) - - return thresholds - - -def _create_threshold( - name: str, current: float, warning: float, critical: float, limit: float -) -> CapacityThreshold: - """Create a threshold status object.""" - percent_used = (current / limit) * 100 if limit > 0 else 0 - - if current >= critical: - status = "critical" - elif current >= warning: - status = "warning" - else: - status = "ok" - - return CapacityThreshold( - name=name, - current=current, - warning=warning, - critical=critical, - limit=limit, - status=status, - percent_used=round(percent_used, 1), - ) - - -def _generate_recommendations( - thresholds: list[CapacityThreshold], database: DatabaseMetrics -) -> list[ScalingRecommendation]: - """Generate scaling recommendations based on thresholds.""" - recommendations = [] - - for threshold in thresholds: - if threshold.status == "critical": - recommendations.append( - ScalingRecommendation( - priority="critical", - title=f"{threshold.name} at critical level", - description=f"Currently at {threshold.percent_used:.0f}% of capacity ({threshold.current:.0f} of {threshold.limit:.0f})", - action="Immediate scaling or cleanup required", - ) - ) - elif threshold.status == "warning": - recommendations.append( - ScalingRecommendation( - priority="warning", - title=f"{threshold.name} approaching limit", - description=f"Currently at {threshold.percent_used:.0f}% of capacity ({threshold.current:.0f} of {threshold.limit:.0f})", - action="Plan scaling in the next 2-4 weeks", - ) - ) - - # Add tier-based recommendations - if database.vendors_count > 0: - tier, next_trigger = _determine_tier(database.vendors_count, database.products_count) - if next_trigger: - recommendations.append( - ScalingRecommendation( - priority="info", - title=f"Current tier: {tier}", - description=next_trigger, - action="Review capacity planning documentation", - ) - ) - - # If no issues, add positive status - if not recommendations: - recommendations.append( - ScalingRecommendation( - priority="info", - title="All systems healthy", - description="No capacity concerns at this time", - action=None, - ) - ) - - return recommendations - - -def _determine_tier(vendors: int, products: int) -> tuple[str, str | None]: - """Determine current infrastructure tier and next trigger.""" - current_tier = "Starter" - next_trigger = None - - for i, tier in enumerate(INFRASTRUCTURE_TIERS): - max_clients = tier["max_clients"] - max_products = tier["max_products"] - - if max_clients is None: - current_tier = tier["name"] - break - - if vendors <= max_clients and products <= max_products: - current_tier = tier["name"] - - # Check proximity to next tier - if i < len(INFRASTRUCTURE_TIERS) - 1: - next_tier = INFRASTRUCTURE_TIERS[i + 1] - vendor_percent = (vendors / max_clients) * 100 - product_percent = (products / max_products) * 100 - - if vendor_percent > 70 or product_percent > 70: - next_trigger = ( - f"Approaching {next_tier['name']} tier " - f"(vendors: {vendor_percent:.0f}%, products: {product_percent:.0f}%)" - ) - break - - return current_tier, next_trigger - - -def _determine_overall_status(thresholds: list[CapacityThreshold]) -> str: - """Determine overall platform status.""" - statuses = [t.status for t in thresholds] - - if "critical" in statuses: - return "critical" - elif "warning" in statuses: - return "degraded" - else: - return "healthy" + metrics = platform_health_service.get_capacity_metrics(db) + return CapacityMetricsResponse(**metrics) diff --git a/app/services/image_service.py b/app/services/image_service.py index e840ff3f..c3983590 100644 --- a/app/services/image_service.py +++ b/app/services/image_service.py @@ -12,21 +12,26 @@ Provides: import hashlib import logging import os -import shutil from datetime import datetime from io import BytesIO from pathlib import Path from PIL import Image +from app.exceptions import ValidationException + logger = logging.getLogger(__name__) +# Maximum upload size (10MB) +MAX_UPLOAD_SIZE = 10 * 1024 * 1024 + class ImageService: """Service for image upload and management.""" # Supported image formats ALLOWED_EXTENSIONS = {"jpg", "jpeg", "png", "gif", "webp"} + ALLOWED_CONTENT_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} # Size variants to generate SIZES = { @@ -57,6 +62,7 @@ class ImageService: filename: str, vendor_id: int, product_id: int | None = None, + content_type: str | None = None, ) -> dict: """Upload and process a product image. @@ -65,14 +71,30 @@ class ImageService: filename: Original filename vendor_id: Vendor ID for path generation product_id: Optional product ID + content_type: MIME type of the uploaded file Returns: Dict with image info and URLs + + Raises: + ValidationException: If file is too large or invalid type """ + # Validate file size + if len(file_content) > MAX_UPLOAD_SIZE: + raise ValidationException( + f"File too large. Maximum size: {MAX_UPLOAD_SIZE // (1024*1024)}MB" + ) + + # Validate content type + if not content_type or not content_type.startswith("image/"): + raise ValidationException("Invalid file type. Only images are allowed.") + # Validate file extension ext = self._get_extension(filename) if ext not in self.ALLOWED_EXTENSIONS: - raise ValueError(f"Invalid file type: {ext}. Allowed: {self.ALLOWED_EXTENSIONS}") + raise ValidationException( + f"Invalid file type: {ext}. Allowed: {', '.join(self.ALLOWED_EXTENSIONS)}" + ) # Generate unique hash for this image image_hash = self._generate_hash(vendor_id, product_id, filename) diff --git a/app/services/platform_health_service.py b/app/services/platform_health_service.py new file mode 100644 index 00000000..b4e0cca8 --- /dev/null +++ b/app/services/platform_health_service.py @@ -0,0 +1,440 @@ +# app/services/platform_health_service.py +""" +Platform health and capacity monitoring service. + +Provides: +- System resource metrics (CPU, memory, disk) +- Database metrics and statistics +- Capacity threshold calculations +- Scaling recommendations +""" + +import logging +from datetime import datetime + +import psutil +from sqlalchemy import func, text +from sqlalchemy.orm import Session + +from app.services.image_service import image_service +from models.database.inventory import Inventory +from models.database.order import Order +from models.database.product import Product +from models.database.vendor import Vendor + +logger = logging.getLogger(__name__) + + +# ============================================================================ +# Thresholds Configuration +# ============================================================================ + +CAPACITY_THRESHOLDS = { + "products_total": { + "warning": 400_000, + "critical": 475_000, + "limit": 500_000, + }, + "storage_gb": { + "warning": 800, + "critical": 950, + "limit": 1000, + }, + "db_size_mb": { + "warning": 20_000, + "critical": 24_000, + "limit": 25_000, + }, + "disk_percent": { + "warning": 70, + "critical": 85, + "limit": 100, + }, + "memory_percent": { + "warning": 75, + "critical": 90, + "limit": 100, + }, + "cpu_percent": { + "warning": 70, + "critical": 85, + "limit": 100, + }, +} + +INFRASTRUCTURE_TIERS = [ + {"name": "Starter", "max_clients": 50, "max_products": 10_000}, + {"name": "Small", "max_clients": 100, "max_products": 30_000}, + {"name": "Medium", "max_clients": 300, "max_products": 100_000}, + {"name": "Large", "max_clients": 500, "max_products": 250_000}, + {"name": "Scale", "max_clients": 1000, "max_products": 500_000}, + {"name": "Enterprise", "max_clients": None, "max_products": None}, +] + + +class PlatformHealthService: + """Service for platform health and capacity monitoring.""" + + def get_system_metrics(self) -> dict: + """Get current system resource metrics.""" + cpu_percent = psutil.cpu_percent(interval=0.1) + memory = psutil.virtual_memory() + disk = psutil.disk_usage("/") + + return { + "cpu_percent": cpu_percent, + "memory_percent": memory.percent, + "memory_used_gb": round(memory.used / (1024**3), 2), + "memory_total_gb": round(memory.total / (1024**3), 2), + "disk_percent": disk.percent, + "disk_used_gb": round(disk.used / (1024**3), 2), + "disk_total_gb": round(disk.total / (1024**3), 2), + } + + def get_database_metrics(self, db: Session) -> dict: + """Get database statistics.""" + products_count = db.query(func.count(Product.id)).scalar() or 0 + orders_count = db.query(func.count(Order.id)).scalar() or 0 + vendors_count = db.query(func.count(Vendor.id)).scalar() or 0 + inventory_count = db.query(func.count(Inventory.id)).scalar() or 0 + + db_size = self._get_database_size(db) + + return { + "size_mb": db_size, + "products_count": products_count, + "orders_count": orders_count, + "vendors_count": vendors_count, + "inventory_count": inventory_count, + } + + def get_image_storage_metrics(self) -> dict: + """Get image storage statistics.""" + stats = image_service.get_storage_stats() + return { + "total_files": stats["total_files"], + "total_size_mb": stats["total_size_mb"], + "total_size_gb": stats["total_size_gb"], + "max_files_per_dir": stats["max_files_per_dir"], + "products_estimated": stats["products_estimated"], + } + + def get_capacity_metrics(self, db: Session) -> dict: + """Get capacity-focused metrics for planning.""" + # Products total + products_total = db.query(func.count(Product.id)).scalar() or 0 + + # Products by vendor + vendor_counts = ( + db.query(Vendor.name, func.count(Product.id)) + .join(Product, Vendor.id == Product.vendor_id) + .group_by(Vendor.name) + .all() + ) + products_by_vendor = {name or "Unknown": count for name, count in vendor_counts} + + # Image storage + image_stats = image_service.get_storage_stats() + + # Database size + db_size = self._get_database_size(db) + + # Orders this month + start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0) + orders_this_month = ( + db.query(func.count(Order.id)) + .filter(Order.created_at >= start_of_month) + .scalar() + or 0 + ) + + # Active vendors + active_vendors = ( + db.query(func.count(Vendor.id)) + .filter(Vendor.is_active == True) # noqa: E712 + .scalar() + or 0 + ) + + return { + "products_total": products_total, + "products_by_vendor": products_by_vendor, + "images_total": image_stats["total_files"], + "storage_used_gb": image_stats["total_size_gb"], + "database_size_mb": db_size, + "orders_this_month": orders_this_month, + "active_vendors": active_vendors, + } + + def get_full_health_report(self, db: Session) -> dict: + """Get comprehensive platform health report.""" + # System metrics + system = self.get_system_metrics() + + # Database metrics + database = self.get_database_metrics(db) + + # Image storage metrics + image_storage = self.get_image_storage_metrics() + + # Calculate thresholds + thresholds = self._calculate_thresholds(system, database, image_storage) + + # Generate recommendations + recommendations = self._generate_recommendations(thresholds, database) + + # Determine infrastructure tier + tier, next_trigger = self._determine_tier( + database["vendors_count"], database["products_count"] + ) + + # Overall status + overall_status = self._determine_overall_status(thresholds) + + return { + "timestamp": datetime.utcnow().isoformat(), + "overall_status": overall_status, + "system": system, + "database": database, + "image_storage": image_storage, + "thresholds": thresholds, + "recommendations": recommendations, + "infrastructure_tier": tier, + "next_tier_trigger": next_trigger, + } + + def _get_database_size(self, db: Session) -> float: + """Get database size in MB.""" + try: + # Try SQLite approach + result = db.execute( + text( + "SELECT page_count * page_size as size " + "FROM pragma_page_count(), pragma_page_size()" + ) + ) + row = result.fetchone() + if row: + return round(row[0] / (1024 * 1024), 2) + except Exception: + pass + + try: + # Try PostgreSQL approach + result = db.execute(text("SELECT pg_database_size(current_database())")) + row = result.fetchone() + if row: + return round(row[0] / (1024 * 1024), 2) + except Exception: + pass + + return 0.0 + + def _calculate_thresholds( + self, system: dict, database: dict, image_storage: dict + ) -> list[dict]: + """Calculate threshold status for each metric.""" + thresholds = [] + + # Products threshold + products_config = CAPACITY_THRESHOLDS["products_total"] + thresholds.append( + self._create_threshold( + "Products", + database["products_count"], + products_config["warning"], + products_config["critical"], + products_config["limit"], + ) + ) + + # Storage threshold + storage_config = CAPACITY_THRESHOLDS["storage_gb"] + thresholds.append( + self._create_threshold( + "Image Storage (GB)", + image_storage["total_size_gb"], + storage_config["warning"], + storage_config["critical"], + storage_config["limit"], + ) + ) + + # Database size threshold + db_config = CAPACITY_THRESHOLDS["db_size_mb"] + thresholds.append( + self._create_threshold( + "Database (MB)", + database["size_mb"], + db_config["warning"], + db_config["critical"], + db_config["limit"], + ) + ) + + # Disk threshold + disk_config = CAPACITY_THRESHOLDS["disk_percent"] + thresholds.append( + self._create_threshold( + "Disk Usage (%)", + system["disk_percent"], + disk_config["warning"], + disk_config["critical"], + disk_config["limit"], + ) + ) + + # Memory threshold + memory_config = CAPACITY_THRESHOLDS["memory_percent"] + thresholds.append( + self._create_threshold( + "Memory Usage (%)", + system["memory_percent"], + memory_config["warning"], + memory_config["critical"], + memory_config["limit"], + ) + ) + + # CPU threshold + cpu_config = CAPACITY_THRESHOLDS["cpu_percent"] + thresholds.append( + self._create_threshold( + "CPU Usage (%)", + system["cpu_percent"], + cpu_config["warning"], + cpu_config["critical"], + cpu_config["limit"], + ) + ) + + return thresholds + + def _create_threshold( + self, name: str, current: float, warning: float, critical: float, limit: float + ) -> dict: + """Create a threshold status object.""" + percent_used = (current / limit) * 100 if limit > 0 else 0 + + if current >= critical: + status = "critical" + elif current >= warning: + status = "warning" + else: + status = "ok" + + return { + "name": name, + "current": current, + "warning": warning, + "critical": critical, + "limit": limit, + "status": status, + "percent_used": round(percent_used, 1), + } + + def _generate_recommendations( + self, thresholds: list[dict], database: dict + ) -> list[dict]: + """Generate scaling recommendations based on thresholds.""" + recommendations = [] + + for threshold in thresholds: + if threshold["status"] == "critical": + recommendations.append( + { + "priority": "critical", + "title": f"{threshold['name']} at critical level", + "description": ( + f"Currently at {threshold['percent_used']:.0f}% of capacity " + f"({threshold['current']:.0f} of {threshold['limit']:.0f})" + ), + "action": "Immediate scaling or cleanup required", + } + ) + elif threshold["status"] == "warning": + recommendations.append( + { + "priority": "warning", + "title": f"{threshold['name']} approaching limit", + "description": ( + f"Currently at {threshold['percent_used']:.0f}% of capacity " + f"({threshold['current']:.0f} of {threshold['limit']:.0f})" + ), + "action": "Plan scaling in the next 2-4 weeks", + } + ) + + # Add tier-based recommendations + if database["vendors_count"] > 0: + tier, next_trigger = self._determine_tier( + database["vendors_count"], database["products_count"] + ) + if next_trigger: + recommendations.append( + { + "priority": "info", + "title": f"Current tier: {tier}", + "description": next_trigger, + "action": "Review capacity planning documentation", + } + ) + + # If no issues, add positive status + if not recommendations: + recommendations.append( + { + "priority": "info", + "title": "All systems healthy", + "description": "No capacity concerns at this time", + "action": None, + } + ) + + return recommendations + + def _determine_tier(self, vendors: int, products: int) -> tuple[str, str | None]: + """Determine current infrastructure tier and next trigger.""" + current_tier = "Starter" + next_trigger = None + + for i, tier in enumerate(INFRASTRUCTURE_TIERS): + max_clients = tier["max_clients"] + max_products = tier["max_products"] + + if max_clients is None: + current_tier = tier["name"] + break + + if vendors <= max_clients and products <= max_products: + current_tier = tier["name"] + + # Check proximity to next tier + if i < len(INFRASTRUCTURE_TIERS) - 1: + next_tier = INFRASTRUCTURE_TIERS[i + 1] + vendor_percent = (vendors / max_clients) * 100 + product_percent = (products / max_products) * 100 + + if vendor_percent > 70 or product_percent > 70: + next_trigger = ( + f"Approaching {next_tier['name']} tier " + f"(vendors: {vendor_percent:.0f}%, products: {product_percent:.0f}%)" + ) + break + + return current_tier, next_trigger + + def _determine_overall_status(self, thresholds: list[dict]) -> str: + """Determine overall platform status.""" + statuses = [t["status"] for t in thresholds] + + if "critical" in statuses: + return "critical" + elif "warning" in statuses: + return "degraded" + else: + return "healthy" + + +# Create service instance +platform_health_service = PlatformHealthService()