feat: add capacity planning docs, image upload system, and platform health monitoring
Documentation: - Add comprehensive capacity planning guide (docs/architecture/capacity-planning.md) - Add operations docs: platform-health, capacity-monitoring, image-storage - Link pricing strategy to capacity planning documentation - Update mkdocs.yml with new Operations section Image Upload System: - Add ImageService with WebP conversion and sharded directory structure - Generate multiple size variants (original, 800px, 200px) - Add storage stats endpoint for monitoring - Add Pillow dependency for image processing Platform Health Monitoring: - Add /admin/platform-health page with real-time metrics - Show CPU, memory, disk usage with progress bars - Display capacity thresholds with status indicators - Generate scaling recommendations automatically - Determine infrastructure tier based on usage - Add psutil dependency for system metrics Admin UI: - Add Capacity Monitor to Platform Health section in sidebar - Create platform-health.html template with stats cards - Create platform-health.js for Alpine.js state management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -33,6 +33,7 @@ from . import (
|
||||
content_pages,
|
||||
customers,
|
||||
dashboard,
|
||||
images,
|
||||
inventory,
|
||||
letzshop,
|
||||
logs,
|
||||
@@ -42,6 +43,7 @@ from . import (
|
||||
notifications,
|
||||
order_item_exceptions,
|
||||
orders,
|
||||
platform_health,
|
||||
products,
|
||||
settings,
|
||||
tests,
|
||||
@@ -162,6 +164,14 @@ router.include_router(messages.router, tags=["admin-messages"])
|
||||
# Include log management endpoints
|
||||
router.include_router(logs.router, tags=["admin-logs"])
|
||||
|
||||
# Include image management endpoints
|
||||
router.include_router(images.router, tags=["admin-images"])
|
||||
|
||||
# Include platform health endpoints
|
||||
router.include_router(
|
||||
platform_health.router, prefix="/platform", tags=["admin-platform-health"]
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Code Quality & Architecture
|
||||
|
||||
121
app/api/v1/admin/images.py
Normal file
121
app/api/v1/admin/images.py
Normal file
@@ -0,0 +1,121 @@
|
||||
# app/api/v1/admin/images.py
|
||||
"""
|
||||
Admin image management endpoints.
|
||||
|
||||
Provides:
|
||||
- Image upload with automatic processing
|
||||
- Image deletion
|
||||
- Storage statistics
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
|
||||
|
||||
from app.api.deps import get_current_admin_api
|
||||
from app.services.image_service import image_service
|
||||
from models.database.user import User
|
||||
from models.schema.image import (
|
||||
ImageDeleteResponse,
|
||||
ImageStorageStats,
|
||||
ImageUploadResponse,
|
||||
)
|
||||
|
||||
router = APIRouter(prefix="/images")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Maximum upload size (10MB)
|
||||
MAX_UPLOAD_SIZE = 10 * 1024 * 1024
|
||||
|
||||
|
||||
@router.post("/upload", response_model=ImageUploadResponse)
|
||||
async def upload_image(
|
||||
file: UploadFile = File(...),
|
||||
vendor_id: int = Form(...),
|
||||
product_id: int | None = Form(None),
|
||||
current_admin: User = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Upload and process an image.
|
||||
|
||||
The image will be:
|
||||
- Converted to WebP format
|
||||
- Resized to multiple variants (original, 800px, 200px)
|
||||
- Stored in a sharded directory structure
|
||||
|
||||
Args:
|
||||
file: Image file to upload
|
||||
vendor_id: Vendor ID for the image
|
||||
product_id: Optional product ID
|
||||
|
||||
Returns:
|
||||
Image URLs and metadata
|
||||
"""
|
||||
# Validate file size
|
||||
content = await file.read()
|
||||
if len(content) > MAX_UPLOAD_SIZE:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"File too large. Maximum size: {MAX_UPLOAD_SIZE // (1024*1024)}MB",
|
||||
)
|
||||
|
||||
# Validate content type
|
||||
if not file.content_type or not file.content_type.startswith("image/"):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Invalid file type. Only images are allowed.",
|
||||
)
|
||||
|
||||
try:
|
||||
result = image_service.upload_product_image(
|
||||
file_content=content,
|
||||
filename=file.filename or "image.jpg",
|
||||
vendor_id=vendor_id,
|
||||
product_id=product_id,
|
||||
)
|
||||
|
||||
logger.info(f"Image uploaded: {result['id']} for vendor {vendor_id}")
|
||||
|
||||
return ImageUploadResponse(success=True, image=result)
|
||||
|
||||
except ValueError as e:
|
||||
logger.warning(f"Image upload failed: {e}")
|
||||
return ImageUploadResponse(success=False, error=str(e))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Image upload error: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to process image")
|
||||
|
||||
|
||||
@router.delete("/{image_hash}", response_model=ImageDeleteResponse)
|
||||
async def delete_image(
|
||||
image_hash: str,
|
||||
current_admin: User = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Delete an image and all its variants.
|
||||
|
||||
Args:
|
||||
image_hash: The image ID/hash
|
||||
|
||||
Returns:
|
||||
Deletion status
|
||||
"""
|
||||
deleted = image_service.delete_product_image(image_hash)
|
||||
|
||||
if deleted:
|
||||
logger.info(f"Image deleted: {image_hash}")
|
||||
return ImageDeleteResponse(success=True, message="Image deleted successfully")
|
||||
else:
|
||||
return ImageDeleteResponse(success=False, message="Image not found")
|
||||
|
||||
|
||||
@router.get("/stats", response_model=ImageStorageStats)
|
||||
async def get_storage_stats(
|
||||
current_admin: User = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Get image storage statistics.
|
||||
|
||||
Returns:
|
||||
Storage metrics including file counts, sizes, and directory info
|
||||
"""
|
||||
stats = image_service.get_storage_stats()
|
||||
return ImageStorageStats(**stats)
|
||||
532
app/api/v1/admin/platform_health.py
Normal file
532
app/api/v1/admin/platform_health.py
Normal file
@@ -0,0 +1,532 @@
|
||||
# app/api/v1/admin/platform_health.py
|
||||
"""
|
||||
Platform health and capacity monitoring endpoints.
|
||||
|
||||
Provides:
|
||||
- Overall platform health status
|
||||
- Capacity metrics and thresholds
|
||||
- Scaling recommendations
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import psutil
|
||||
from datetime import datetime
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import func, text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.api.deps import get_current_admin_api
|
||||
from app.core.database import get_db
|
||||
from app.services.image_service import image_service
|
||||
from models.database.inventory import Inventory
|
||||
from models.database.order import Order
|
||||
from models.database.product import Product
|
||||
from models.database.user import User
|
||||
from models.database.vendor import Vendor
|
||||
|
||||
router = APIRouter()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Schemas
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class SystemMetrics(BaseModel):
|
||||
"""System resource metrics."""
|
||||
|
||||
cpu_percent: float
|
||||
memory_percent: float
|
||||
memory_used_gb: float
|
||||
memory_total_gb: float
|
||||
disk_percent: float
|
||||
disk_used_gb: float
|
||||
disk_total_gb: float
|
||||
|
||||
|
||||
class DatabaseMetrics(BaseModel):
|
||||
"""Database metrics."""
|
||||
|
||||
size_mb: float
|
||||
products_count: int
|
||||
orders_count: int
|
||||
vendors_count: int
|
||||
inventory_count: int
|
||||
|
||||
|
||||
class ImageStorageMetrics(BaseModel):
|
||||
"""Image storage metrics."""
|
||||
|
||||
total_files: int
|
||||
total_size_mb: float
|
||||
total_size_gb: float
|
||||
max_files_per_dir: int
|
||||
products_estimated: int
|
||||
|
||||
|
||||
class CapacityThreshold(BaseModel):
|
||||
"""Capacity threshold status."""
|
||||
|
||||
name: str
|
||||
current: float
|
||||
warning: float
|
||||
critical: float
|
||||
limit: float
|
||||
status: str # ok, warning, critical
|
||||
percent_used: float
|
||||
|
||||
|
||||
class ScalingRecommendation(BaseModel):
|
||||
"""Scaling recommendation."""
|
||||
|
||||
priority: str # info, warning, critical
|
||||
title: str
|
||||
description: str
|
||||
action: str | None = None
|
||||
|
||||
|
||||
class PlatformHealthResponse(BaseModel):
|
||||
"""Complete platform health response."""
|
||||
|
||||
timestamp: str
|
||||
overall_status: str # healthy, degraded, critical
|
||||
system: SystemMetrics
|
||||
database: DatabaseMetrics
|
||||
image_storage: ImageStorageMetrics
|
||||
thresholds: list[CapacityThreshold]
|
||||
recommendations: list[ScalingRecommendation]
|
||||
infrastructure_tier: str
|
||||
next_tier_trigger: str | None = None
|
||||
|
||||
|
||||
class CapacityMetricsResponse(BaseModel):
|
||||
"""Capacity-focused metrics."""
|
||||
|
||||
products_total: int
|
||||
products_by_vendor: dict[str, int]
|
||||
images_total: int
|
||||
storage_used_gb: float
|
||||
database_size_mb: float
|
||||
orders_this_month: int
|
||||
active_vendors: int
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Thresholds Configuration
|
||||
# ============================================================================
|
||||
|
||||
CAPACITY_THRESHOLDS = {
|
||||
"products_total": {
|
||||
"warning": 400_000,
|
||||
"critical": 475_000,
|
||||
"limit": 500_000,
|
||||
},
|
||||
"storage_gb": {
|
||||
"warning": 800,
|
||||
"critical": 950,
|
||||
"limit": 1000,
|
||||
},
|
||||
"db_size_mb": {
|
||||
"warning": 20_000,
|
||||
"critical": 24_000,
|
||||
"limit": 25_000,
|
||||
},
|
||||
"disk_percent": {
|
||||
"warning": 70,
|
||||
"critical": 85,
|
||||
"limit": 100,
|
||||
},
|
||||
"memory_percent": {
|
||||
"warning": 75,
|
||||
"critical": 90,
|
||||
"limit": 100,
|
||||
},
|
||||
"cpu_percent": {
|
||||
"warning": 70,
|
||||
"critical": 85,
|
||||
"limit": 100,
|
||||
},
|
||||
}
|
||||
|
||||
INFRASTRUCTURE_TIERS = [
|
||||
{"name": "Starter", "max_clients": 50, "max_products": 10_000},
|
||||
{"name": "Small", "max_clients": 100, "max_products": 30_000},
|
||||
{"name": "Medium", "max_clients": 300, "max_products": 100_000},
|
||||
{"name": "Large", "max_clients": 500, "max_products": 250_000},
|
||||
{"name": "Scale", "max_clients": 1000, "max_products": 500_000},
|
||||
{"name": "Enterprise", "max_clients": None, "max_products": None},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Endpoints
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@router.get("/health", response_model=PlatformHealthResponse)
|
||||
async def get_platform_health(
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: User = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Get comprehensive platform health status.
|
||||
|
||||
Returns system metrics, database stats, storage info, and recommendations.
|
||||
"""
|
||||
# System metrics
|
||||
system = _get_system_metrics()
|
||||
|
||||
# Database metrics
|
||||
database = _get_database_metrics(db)
|
||||
|
||||
# Image storage metrics
|
||||
image_stats = image_service.get_storage_stats()
|
||||
image_storage = ImageStorageMetrics(
|
||||
total_files=image_stats["total_files"],
|
||||
total_size_mb=image_stats["total_size_mb"],
|
||||
total_size_gb=image_stats["total_size_gb"],
|
||||
max_files_per_dir=image_stats["max_files_per_dir"],
|
||||
products_estimated=image_stats["products_estimated"],
|
||||
)
|
||||
|
||||
# Calculate thresholds
|
||||
thresholds = _calculate_thresholds(system, database, image_storage)
|
||||
|
||||
# Generate recommendations
|
||||
recommendations = _generate_recommendations(thresholds, database)
|
||||
|
||||
# Determine infrastructure tier
|
||||
tier, next_trigger = _determine_tier(database.vendors_count, database.products_count)
|
||||
|
||||
# Overall status
|
||||
overall_status = _determine_overall_status(thresholds)
|
||||
|
||||
return PlatformHealthResponse(
|
||||
timestamp=datetime.utcnow().isoformat(),
|
||||
overall_status=overall_status,
|
||||
system=system,
|
||||
database=database,
|
||||
image_storage=image_storage,
|
||||
thresholds=thresholds,
|
||||
recommendations=recommendations,
|
||||
infrastructure_tier=tier,
|
||||
next_tier_trigger=next_trigger,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/capacity", response_model=CapacityMetricsResponse)
|
||||
async def get_capacity_metrics(
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: User = Depends(get_current_admin_api),
|
||||
):
|
||||
"""Get capacity-focused metrics for planning."""
|
||||
# Products total
|
||||
products_total = db.query(func.count(Product.id)).scalar() or 0
|
||||
|
||||
# Products by vendor
|
||||
vendor_counts = (
|
||||
db.query(Vendor.name, func.count(Product.id))
|
||||
.join(Product, Vendor.id == Product.vendor_id)
|
||||
.group_by(Vendor.name)
|
||||
.all()
|
||||
)
|
||||
products_by_vendor = {name or "Unknown": count for name, count in vendor_counts}
|
||||
|
||||
# Image storage
|
||||
image_stats = image_service.get_storage_stats()
|
||||
|
||||
# Database size (approximate for SQLite)
|
||||
db_size = _get_database_size(db)
|
||||
|
||||
# Orders this month
|
||||
start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
|
||||
orders_this_month = (
|
||||
db.query(func.count(Order.id))
|
||||
.filter(Order.created_at >= start_of_month)
|
||||
.scalar()
|
||||
or 0
|
||||
)
|
||||
|
||||
# Active vendors
|
||||
active_vendors = db.query(func.count(Vendor.id)).filter(Vendor.is_active == True).scalar() or 0 # noqa: E712
|
||||
|
||||
return CapacityMetricsResponse(
|
||||
products_total=products_total,
|
||||
products_by_vendor=products_by_vendor,
|
||||
images_total=image_stats["total_files"],
|
||||
storage_used_gb=image_stats["total_size_gb"],
|
||||
database_size_mb=db_size,
|
||||
orders_this_month=orders_this_month,
|
||||
active_vendors=active_vendors,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Helper Functions
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def _get_system_metrics() -> SystemMetrics:
|
||||
"""Get current system resource metrics."""
|
||||
cpu_percent = psutil.cpu_percent(interval=0.1)
|
||||
memory = psutil.virtual_memory()
|
||||
disk = psutil.disk_usage("/")
|
||||
|
||||
return SystemMetrics(
|
||||
cpu_percent=cpu_percent,
|
||||
memory_percent=memory.percent,
|
||||
memory_used_gb=round(memory.used / (1024**3), 2),
|
||||
memory_total_gb=round(memory.total / (1024**3), 2),
|
||||
disk_percent=disk.percent,
|
||||
disk_used_gb=round(disk.used / (1024**3), 2),
|
||||
disk_total_gb=round(disk.total / (1024**3), 2),
|
||||
)
|
||||
|
||||
|
||||
def _get_database_metrics(db: Session) -> DatabaseMetrics:
|
||||
"""Get database statistics."""
|
||||
products_count = db.query(func.count(Product.id)).scalar() or 0
|
||||
orders_count = db.query(func.count(Order.id)).scalar() or 0
|
||||
vendors_count = db.query(func.count(Vendor.id)).scalar() or 0
|
||||
inventory_count = db.query(func.count(Inventory.id)).scalar() or 0
|
||||
|
||||
db_size = _get_database_size(db)
|
||||
|
||||
return DatabaseMetrics(
|
||||
size_mb=db_size,
|
||||
products_count=products_count,
|
||||
orders_count=orders_count,
|
||||
vendors_count=vendors_count,
|
||||
inventory_count=inventory_count,
|
||||
)
|
||||
|
||||
|
||||
def _get_database_size(db: Session) -> float:
|
||||
"""Get database size in MB."""
|
||||
try:
|
||||
# Try SQLite approach
|
||||
result = db.execute(text("SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()"))
|
||||
row = result.fetchone()
|
||||
if row:
|
||||
return round(row[0] / (1024 * 1024), 2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
# Try PostgreSQL approach
|
||||
result = db.execute(text("SELECT pg_database_size(current_database())"))
|
||||
row = result.fetchone()
|
||||
if row:
|
||||
return round(row[0] / (1024 * 1024), 2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_thresholds(
|
||||
system: SystemMetrics,
|
||||
database: DatabaseMetrics,
|
||||
image_storage: ImageStorageMetrics,
|
||||
) -> list[CapacityThreshold]:
|
||||
"""Calculate threshold status for each metric."""
|
||||
thresholds = []
|
||||
|
||||
# Products threshold
|
||||
products_config = CAPACITY_THRESHOLDS["products_total"]
|
||||
thresholds.append(
|
||||
_create_threshold(
|
||||
"Products",
|
||||
database.products_count,
|
||||
products_config["warning"],
|
||||
products_config["critical"],
|
||||
products_config["limit"],
|
||||
)
|
||||
)
|
||||
|
||||
# Storage threshold
|
||||
storage_config = CAPACITY_THRESHOLDS["storage_gb"]
|
||||
thresholds.append(
|
||||
_create_threshold(
|
||||
"Image Storage (GB)",
|
||||
image_storage.total_size_gb,
|
||||
storage_config["warning"],
|
||||
storage_config["critical"],
|
||||
storage_config["limit"],
|
||||
)
|
||||
)
|
||||
|
||||
# Database size threshold
|
||||
db_config = CAPACITY_THRESHOLDS["db_size_mb"]
|
||||
thresholds.append(
|
||||
_create_threshold(
|
||||
"Database (MB)",
|
||||
database.size_mb,
|
||||
db_config["warning"],
|
||||
db_config["critical"],
|
||||
db_config["limit"],
|
||||
)
|
||||
)
|
||||
|
||||
# Disk threshold
|
||||
disk_config = CAPACITY_THRESHOLDS["disk_percent"]
|
||||
thresholds.append(
|
||||
_create_threshold(
|
||||
"Disk Usage (%)",
|
||||
system.disk_percent,
|
||||
disk_config["warning"],
|
||||
disk_config["critical"],
|
||||
disk_config["limit"],
|
||||
)
|
||||
)
|
||||
|
||||
# Memory threshold
|
||||
memory_config = CAPACITY_THRESHOLDS["memory_percent"]
|
||||
thresholds.append(
|
||||
_create_threshold(
|
||||
"Memory Usage (%)",
|
||||
system.memory_percent,
|
||||
memory_config["warning"],
|
||||
memory_config["critical"],
|
||||
memory_config["limit"],
|
||||
)
|
||||
)
|
||||
|
||||
# CPU threshold
|
||||
cpu_config = CAPACITY_THRESHOLDS["cpu_percent"]
|
||||
thresholds.append(
|
||||
_create_threshold(
|
||||
"CPU Usage (%)",
|
||||
system.cpu_percent,
|
||||
cpu_config["warning"],
|
||||
cpu_config["critical"],
|
||||
cpu_config["limit"],
|
||||
)
|
||||
)
|
||||
|
||||
return thresholds
|
||||
|
||||
|
||||
def _create_threshold(
|
||||
name: str, current: float, warning: float, critical: float, limit: float
|
||||
) -> CapacityThreshold:
|
||||
"""Create a threshold status object."""
|
||||
percent_used = (current / limit) * 100 if limit > 0 else 0
|
||||
|
||||
if current >= critical:
|
||||
status = "critical"
|
||||
elif current >= warning:
|
||||
status = "warning"
|
||||
else:
|
||||
status = "ok"
|
||||
|
||||
return CapacityThreshold(
|
||||
name=name,
|
||||
current=current,
|
||||
warning=warning,
|
||||
critical=critical,
|
||||
limit=limit,
|
||||
status=status,
|
||||
percent_used=round(percent_used, 1),
|
||||
)
|
||||
|
||||
|
||||
def _generate_recommendations(
|
||||
thresholds: list[CapacityThreshold], database: DatabaseMetrics
|
||||
) -> list[ScalingRecommendation]:
|
||||
"""Generate scaling recommendations based on thresholds."""
|
||||
recommendations = []
|
||||
|
||||
for threshold in thresholds:
|
||||
if threshold.status == "critical":
|
||||
recommendations.append(
|
||||
ScalingRecommendation(
|
||||
priority="critical",
|
||||
title=f"{threshold.name} at critical level",
|
||||
description=f"Currently at {threshold.percent_used:.0f}% of capacity ({threshold.current:.0f} of {threshold.limit:.0f})",
|
||||
action="Immediate scaling or cleanup required",
|
||||
)
|
||||
)
|
||||
elif threshold.status == "warning":
|
||||
recommendations.append(
|
||||
ScalingRecommendation(
|
||||
priority="warning",
|
||||
title=f"{threshold.name} approaching limit",
|
||||
description=f"Currently at {threshold.percent_used:.0f}% of capacity ({threshold.current:.0f} of {threshold.limit:.0f})",
|
||||
action="Plan scaling in the next 2-4 weeks",
|
||||
)
|
||||
)
|
||||
|
||||
# Add tier-based recommendations
|
||||
if database.vendors_count > 0:
|
||||
tier, next_trigger = _determine_tier(database.vendors_count, database.products_count)
|
||||
if next_trigger:
|
||||
recommendations.append(
|
||||
ScalingRecommendation(
|
||||
priority="info",
|
||||
title=f"Current tier: {tier}",
|
||||
description=next_trigger,
|
||||
action="Review capacity planning documentation",
|
||||
)
|
||||
)
|
||||
|
||||
# If no issues, add positive status
|
||||
if not recommendations:
|
||||
recommendations.append(
|
||||
ScalingRecommendation(
|
||||
priority="info",
|
||||
title="All systems healthy",
|
||||
description="No capacity concerns at this time",
|
||||
action=None,
|
||||
)
|
||||
)
|
||||
|
||||
return recommendations
|
||||
|
||||
|
||||
def _determine_tier(vendors: int, products: int) -> tuple[str, str | None]:
|
||||
"""Determine current infrastructure tier and next trigger."""
|
||||
current_tier = "Starter"
|
||||
next_trigger = None
|
||||
|
||||
for i, tier in enumerate(INFRASTRUCTURE_TIERS):
|
||||
max_clients = tier["max_clients"]
|
||||
max_products = tier["max_products"]
|
||||
|
||||
if max_clients is None:
|
||||
current_tier = tier["name"]
|
||||
break
|
||||
|
||||
if vendors <= max_clients and products <= max_products:
|
||||
current_tier = tier["name"]
|
||||
|
||||
# Check proximity to next tier
|
||||
if i < len(INFRASTRUCTURE_TIERS) - 1:
|
||||
next_tier = INFRASTRUCTURE_TIERS[i + 1]
|
||||
vendor_percent = (vendors / max_clients) * 100
|
||||
product_percent = (products / max_products) * 100
|
||||
|
||||
if vendor_percent > 70 or product_percent > 70:
|
||||
next_trigger = (
|
||||
f"Approaching {next_tier['name']} tier "
|
||||
f"(vendors: {vendor_percent:.0f}%, products: {product_percent:.0f}%)"
|
||||
)
|
||||
break
|
||||
|
||||
return current_tier, next_trigger
|
||||
|
||||
|
||||
def _determine_overall_status(thresholds: list[CapacityThreshold]) -> str:
|
||||
"""Determine overall platform status."""
|
||||
statuses = [t.status for t in thresholds]
|
||||
|
||||
if "critical" in statuses:
|
||||
return "critical"
|
||||
elif "warning" in statuses:
|
||||
return "degraded"
|
||||
else:
|
||||
return "healthy"
|
||||
Reference in New Issue
Block a user