refactor: move business logic to service layer for architecture compliance

- images.py: delegate file validation (size, type, extension) to image_service
- platform_health.py: extract all database queries to platform_health_service
- Fixes 13 architecture validation errors (API-002, API-003)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-25 17:31:04 +01:00
parent 88e7a52c7e
commit b98c7c553b
4 changed files with 489 additions and 435 deletions

View File

@@ -10,7 +10,7 @@ Provides:
import logging
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
from fastapi import APIRouter, Depends, File, Form, UploadFile
from app.api.deps import get_current_admin_api
from app.services.image_service import image_service
@@ -24,9 +24,6 @@ from models.schema.image import (
router = APIRouter(prefix="/images")
logger = logging.getLogger(__name__)
# Maximum upload size (10MB)
MAX_UPLOAD_SIZE = 10 * 1024 * 1024
@router.post("/upload", response_model=ImageUploadResponse)
async def upload_image(
@@ -50,40 +47,21 @@ async def upload_image(
Returns:
Image URLs and metadata
"""
# Validate file size
# Read file content
content = await file.read()
if len(content) > MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size: {MAX_UPLOAD_SIZE // (1024*1024)}MB",
)
# Validate content type
if not file.content_type or not file.content_type.startswith("image/"):
raise HTTPException(
status_code=400,
detail="Invalid file type. Only images are allowed.",
)
# Delegate all validation and processing to service
result = image_service.upload_product_image(
file_content=content,
filename=file.filename or "image.jpg",
content_type=file.content_type,
vendor_id=vendor_id,
product_id=product_id,
)
try:
result = image_service.upload_product_image(
file_content=content,
filename=file.filename or "image.jpg",
vendor_id=vendor_id,
product_id=product_id,
)
logger.info(f"Image uploaded: {result['id']} for vendor {vendor_id}")
logger.info(f"Image uploaded: {result['id']} for vendor {vendor_id}")
return ImageUploadResponse(success=True, image=result)
except ValueError as e:
logger.warning(f"Image upload failed: {e}")
return ImageUploadResponse(success=False, error=str(e))
except Exception as e:
logger.error(f"Image upload error: {e}")
raise HTTPException(status_code=500, detail="Failed to process image")
return ImageUploadResponse(success=True, image=result)
@router.delete("/{image_hash}", response_model=ImageDeleteResponse)

View File

@@ -9,24 +9,15 @@ Provides:
"""
import logging
import os
import platform
import psutil
from datetime import datetime
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import func, text
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.services.image_service import image_service
from models.database.inventory import Inventory
from models.database.order import Order
from models.database.product import Product
from app.services.platform_health_service import platform_health_service
from models.database.user import User
from models.database.vendor import Vendor
router = APIRouter()
logger = logging.getLogger(__name__)
@@ -116,53 +107,6 @@ class CapacityMetricsResponse(BaseModel):
active_vendors: int
# ============================================================================
# Thresholds Configuration
# ============================================================================
CAPACITY_THRESHOLDS = {
"products_total": {
"warning": 400_000,
"critical": 475_000,
"limit": 500_000,
},
"storage_gb": {
"warning": 800,
"critical": 950,
"limit": 1000,
},
"db_size_mb": {
"warning": 20_000,
"critical": 24_000,
"limit": 25_000,
},
"disk_percent": {
"warning": 70,
"critical": 85,
"limit": 100,
},
"memory_percent": {
"warning": 75,
"critical": 90,
"limit": 100,
},
"cpu_percent": {
"warning": 70,
"critical": 85,
"limit": 100,
},
}
INFRASTRUCTURE_TIERS = [
{"name": "Starter", "max_clients": 50, "max_products": 10_000},
{"name": "Small", "max_clients": 100, "max_products": 30_000},
{"name": "Medium", "max_clients": 300, "max_products": 100_000},
{"name": "Large", "max_clients": 500, "max_products": 250_000},
{"name": "Scale", "max_clients": 1000, "max_products": 500_000},
{"name": "Enterprise", "max_clients": None, "max_products": None},
]
# ============================================================================
# Endpoints
# ============================================================================
@@ -177,44 +121,18 @@ async def get_platform_health(
Returns system metrics, database stats, storage info, and recommendations.
"""
# System metrics
system = _get_system_metrics()
# Database metrics
database = _get_database_metrics(db)
# Image storage metrics
image_stats = image_service.get_storage_stats()
image_storage = ImageStorageMetrics(
total_files=image_stats["total_files"],
total_size_mb=image_stats["total_size_mb"],
total_size_gb=image_stats["total_size_gb"],
max_files_per_dir=image_stats["max_files_per_dir"],
products_estimated=image_stats["products_estimated"],
)
# Calculate thresholds
thresholds = _calculate_thresholds(system, database, image_storage)
# Generate recommendations
recommendations = _generate_recommendations(thresholds, database)
# Determine infrastructure tier
tier, next_trigger = _determine_tier(database.vendors_count, database.products_count)
# Overall status
overall_status = _determine_overall_status(thresholds)
health_data = platform_health_service.get_full_health_report(db)
return PlatformHealthResponse(
timestamp=datetime.utcnow().isoformat(),
overall_status=overall_status,
system=system,
database=database,
image_storage=image_storage,
thresholds=thresholds,
recommendations=recommendations,
infrastructure_tier=tier,
next_tier_trigger=next_trigger,
timestamp=health_data["timestamp"],
overall_status=health_data["overall_status"],
system=SystemMetrics(**health_data["system"]),
database=DatabaseMetrics(**health_data["database"]),
image_storage=ImageStorageMetrics(**health_data["image_storage"]),
thresholds=[CapacityThreshold(**t) for t in health_data["thresholds"]],
recommendations=[ScalingRecommendation(**r) for r in health_data["recommendations"]],
infrastructure_tier=health_data["infrastructure_tier"],
next_tier_trigger=health_data["next_tier_trigger"],
)
@@ -224,309 +142,5 @@ async def get_capacity_metrics(
current_admin: User = Depends(get_current_admin_api),
):
"""Get capacity-focused metrics for planning."""
# Products total
products_total = db.query(func.count(Product.id)).scalar() or 0
# Products by vendor
vendor_counts = (
db.query(Vendor.name, func.count(Product.id))
.join(Product, Vendor.id == Product.vendor_id)
.group_by(Vendor.name)
.all()
)
products_by_vendor = {name or "Unknown": count for name, count in vendor_counts}
# Image storage
image_stats = image_service.get_storage_stats()
# Database size (approximate for SQLite)
db_size = _get_database_size(db)
# Orders this month
start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
orders_this_month = (
db.query(func.count(Order.id))
.filter(Order.created_at >= start_of_month)
.scalar()
or 0
)
# Active vendors
active_vendors = db.query(func.count(Vendor.id)).filter(Vendor.is_active == True).scalar() or 0 # noqa: E712
return CapacityMetricsResponse(
products_total=products_total,
products_by_vendor=products_by_vendor,
images_total=image_stats["total_files"],
storage_used_gb=image_stats["total_size_gb"],
database_size_mb=db_size,
orders_this_month=orders_this_month,
active_vendors=active_vendors,
)
# ============================================================================
# Helper Functions
# ============================================================================
def _get_system_metrics() -> SystemMetrics:
"""Get current system resource metrics."""
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage("/")
return SystemMetrics(
cpu_percent=cpu_percent,
memory_percent=memory.percent,
memory_used_gb=round(memory.used / (1024**3), 2),
memory_total_gb=round(memory.total / (1024**3), 2),
disk_percent=disk.percent,
disk_used_gb=round(disk.used / (1024**3), 2),
disk_total_gb=round(disk.total / (1024**3), 2),
)
def _get_database_metrics(db: Session) -> DatabaseMetrics:
"""Get database statistics."""
products_count = db.query(func.count(Product.id)).scalar() or 0
orders_count = db.query(func.count(Order.id)).scalar() or 0
vendors_count = db.query(func.count(Vendor.id)).scalar() or 0
inventory_count = db.query(func.count(Inventory.id)).scalar() or 0
db_size = _get_database_size(db)
return DatabaseMetrics(
size_mb=db_size,
products_count=products_count,
orders_count=orders_count,
vendors_count=vendors_count,
inventory_count=inventory_count,
)
def _get_database_size(db: Session) -> float:
"""Get database size in MB."""
try:
# Try SQLite approach
result = db.execute(text("SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()"))
row = result.fetchone()
if row:
return round(row[0] / (1024 * 1024), 2)
except Exception:
pass
try:
# Try PostgreSQL approach
result = db.execute(text("SELECT pg_database_size(current_database())"))
row = result.fetchone()
if row:
return round(row[0] / (1024 * 1024), 2)
except Exception:
pass
return 0.0
def _calculate_thresholds(
system: SystemMetrics,
database: DatabaseMetrics,
image_storage: ImageStorageMetrics,
) -> list[CapacityThreshold]:
"""Calculate threshold status for each metric."""
thresholds = []
# Products threshold
products_config = CAPACITY_THRESHOLDS["products_total"]
thresholds.append(
_create_threshold(
"Products",
database.products_count,
products_config["warning"],
products_config["critical"],
products_config["limit"],
)
)
# Storage threshold
storage_config = CAPACITY_THRESHOLDS["storage_gb"]
thresholds.append(
_create_threshold(
"Image Storage (GB)",
image_storage.total_size_gb,
storage_config["warning"],
storage_config["critical"],
storage_config["limit"],
)
)
# Database size threshold
db_config = CAPACITY_THRESHOLDS["db_size_mb"]
thresholds.append(
_create_threshold(
"Database (MB)",
database.size_mb,
db_config["warning"],
db_config["critical"],
db_config["limit"],
)
)
# Disk threshold
disk_config = CAPACITY_THRESHOLDS["disk_percent"]
thresholds.append(
_create_threshold(
"Disk Usage (%)",
system.disk_percent,
disk_config["warning"],
disk_config["critical"],
disk_config["limit"],
)
)
# Memory threshold
memory_config = CAPACITY_THRESHOLDS["memory_percent"]
thresholds.append(
_create_threshold(
"Memory Usage (%)",
system.memory_percent,
memory_config["warning"],
memory_config["critical"],
memory_config["limit"],
)
)
# CPU threshold
cpu_config = CAPACITY_THRESHOLDS["cpu_percent"]
thresholds.append(
_create_threshold(
"CPU Usage (%)",
system.cpu_percent,
cpu_config["warning"],
cpu_config["critical"],
cpu_config["limit"],
)
)
return thresholds
def _create_threshold(
name: str, current: float, warning: float, critical: float, limit: float
) -> CapacityThreshold:
"""Create a threshold status object."""
percent_used = (current / limit) * 100 if limit > 0 else 0
if current >= critical:
status = "critical"
elif current >= warning:
status = "warning"
else:
status = "ok"
return CapacityThreshold(
name=name,
current=current,
warning=warning,
critical=critical,
limit=limit,
status=status,
percent_used=round(percent_used, 1),
)
def _generate_recommendations(
thresholds: list[CapacityThreshold], database: DatabaseMetrics
) -> list[ScalingRecommendation]:
"""Generate scaling recommendations based on thresholds."""
recommendations = []
for threshold in thresholds:
if threshold.status == "critical":
recommendations.append(
ScalingRecommendation(
priority="critical",
title=f"{threshold.name} at critical level",
description=f"Currently at {threshold.percent_used:.0f}% of capacity ({threshold.current:.0f} of {threshold.limit:.0f})",
action="Immediate scaling or cleanup required",
)
)
elif threshold.status == "warning":
recommendations.append(
ScalingRecommendation(
priority="warning",
title=f"{threshold.name} approaching limit",
description=f"Currently at {threshold.percent_used:.0f}% of capacity ({threshold.current:.0f} of {threshold.limit:.0f})",
action="Plan scaling in the next 2-4 weeks",
)
)
# Add tier-based recommendations
if database.vendors_count > 0:
tier, next_trigger = _determine_tier(database.vendors_count, database.products_count)
if next_trigger:
recommendations.append(
ScalingRecommendation(
priority="info",
title=f"Current tier: {tier}",
description=next_trigger,
action="Review capacity planning documentation",
)
)
# If no issues, add positive status
if not recommendations:
recommendations.append(
ScalingRecommendation(
priority="info",
title="All systems healthy",
description="No capacity concerns at this time",
action=None,
)
)
return recommendations
def _determine_tier(vendors: int, products: int) -> tuple[str, str | None]:
"""Determine current infrastructure tier and next trigger."""
current_tier = "Starter"
next_trigger = None
for i, tier in enumerate(INFRASTRUCTURE_TIERS):
max_clients = tier["max_clients"]
max_products = tier["max_products"]
if max_clients is None:
current_tier = tier["name"]
break
if vendors <= max_clients and products <= max_products:
current_tier = tier["name"]
# Check proximity to next tier
if i < len(INFRASTRUCTURE_TIERS) - 1:
next_tier = INFRASTRUCTURE_TIERS[i + 1]
vendor_percent = (vendors / max_clients) * 100
product_percent = (products / max_products) * 100
if vendor_percent > 70 or product_percent > 70:
next_trigger = (
f"Approaching {next_tier['name']} tier "
f"(vendors: {vendor_percent:.0f}%, products: {product_percent:.0f}%)"
)
break
return current_tier, next_trigger
def _determine_overall_status(thresholds: list[CapacityThreshold]) -> str:
"""Determine overall platform status."""
statuses = [t.status for t in thresholds]
if "critical" in statuses:
return "critical"
elif "warning" in statuses:
return "degraded"
else:
return "healthy"
metrics = platform_health_service.get_capacity_metrics(db)
return CapacityMetricsResponse(**metrics)

View File

@@ -12,21 +12,26 @@ Provides:
import hashlib
import logging
import os
import shutil
from datetime import datetime
from io import BytesIO
from pathlib import Path
from PIL import Image
from app.exceptions import ValidationException
logger = logging.getLogger(__name__)
# Maximum upload size (10MB)
MAX_UPLOAD_SIZE = 10 * 1024 * 1024
class ImageService:
"""Service for image upload and management."""
# Supported image formats
ALLOWED_EXTENSIONS = {"jpg", "jpeg", "png", "gif", "webp"}
ALLOWED_CONTENT_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
# Size variants to generate
SIZES = {
@@ -57,6 +62,7 @@ class ImageService:
filename: str,
vendor_id: int,
product_id: int | None = None,
content_type: str | None = None,
) -> dict:
"""Upload and process a product image.
@@ -65,14 +71,30 @@ class ImageService:
filename: Original filename
vendor_id: Vendor ID for path generation
product_id: Optional product ID
content_type: MIME type of the uploaded file
Returns:
Dict with image info and URLs
Raises:
ValidationException: If file is too large or invalid type
"""
# Validate file size
if len(file_content) > MAX_UPLOAD_SIZE:
raise ValidationException(
f"File too large. Maximum size: {MAX_UPLOAD_SIZE // (1024*1024)}MB"
)
# Validate content type
if not content_type or not content_type.startswith("image/"):
raise ValidationException("Invalid file type. Only images are allowed.")
# Validate file extension
ext = self._get_extension(filename)
if ext not in self.ALLOWED_EXTENSIONS:
raise ValueError(f"Invalid file type: {ext}. Allowed: {self.ALLOWED_EXTENSIONS}")
raise ValidationException(
f"Invalid file type: {ext}. Allowed: {', '.join(self.ALLOWED_EXTENSIONS)}"
)
# Generate unique hash for this image
image_hash = self._generate_hash(vendor_id, product_id, filename)

View File

@@ -0,0 +1,440 @@
# app/services/platform_health_service.py
"""
Platform health and capacity monitoring service.
Provides:
- System resource metrics (CPU, memory, disk)
- Database metrics and statistics
- Capacity threshold calculations
- Scaling recommendations
"""
import logging
from datetime import datetime
import psutil
from sqlalchemy import func, text
from sqlalchemy.orm import Session
from app.services.image_service import image_service
from models.database.inventory import Inventory
from models.database.order import Order
from models.database.product import Product
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
# ============================================================================
# Thresholds Configuration
# ============================================================================
CAPACITY_THRESHOLDS = {
"products_total": {
"warning": 400_000,
"critical": 475_000,
"limit": 500_000,
},
"storage_gb": {
"warning": 800,
"critical": 950,
"limit": 1000,
},
"db_size_mb": {
"warning": 20_000,
"critical": 24_000,
"limit": 25_000,
},
"disk_percent": {
"warning": 70,
"critical": 85,
"limit": 100,
},
"memory_percent": {
"warning": 75,
"critical": 90,
"limit": 100,
},
"cpu_percent": {
"warning": 70,
"critical": 85,
"limit": 100,
},
}
INFRASTRUCTURE_TIERS = [
{"name": "Starter", "max_clients": 50, "max_products": 10_000},
{"name": "Small", "max_clients": 100, "max_products": 30_000},
{"name": "Medium", "max_clients": 300, "max_products": 100_000},
{"name": "Large", "max_clients": 500, "max_products": 250_000},
{"name": "Scale", "max_clients": 1000, "max_products": 500_000},
{"name": "Enterprise", "max_clients": None, "max_products": None},
]
class PlatformHealthService:
"""Service for platform health and capacity monitoring."""
def get_system_metrics(self) -> dict:
"""Get current system resource metrics."""
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage("/")
return {
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"memory_used_gb": round(memory.used / (1024**3), 2),
"memory_total_gb": round(memory.total / (1024**3), 2),
"disk_percent": disk.percent,
"disk_used_gb": round(disk.used / (1024**3), 2),
"disk_total_gb": round(disk.total / (1024**3), 2),
}
def get_database_metrics(self, db: Session) -> dict:
"""Get database statistics."""
products_count = db.query(func.count(Product.id)).scalar() or 0
orders_count = db.query(func.count(Order.id)).scalar() or 0
vendors_count = db.query(func.count(Vendor.id)).scalar() or 0
inventory_count = db.query(func.count(Inventory.id)).scalar() or 0
db_size = self._get_database_size(db)
return {
"size_mb": db_size,
"products_count": products_count,
"orders_count": orders_count,
"vendors_count": vendors_count,
"inventory_count": inventory_count,
}
def get_image_storage_metrics(self) -> dict:
"""Get image storage statistics."""
stats = image_service.get_storage_stats()
return {
"total_files": stats["total_files"],
"total_size_mb": stats["total_size_mb"],
"total_size_gb": stats["total_size_gb"],
"max_files_per_dir": stats["max_files_per_dir"],
"products_estimated": stats["products_estimated"],
}
def get_capacity_metrics(self, db: Session) -> dict:
"""Get capacity-focused metrics for planning."""
# Products total
products_total = db.query(func.count(Product.id)).scalar() or 0
# Products by vendor
vendor_counts = (
db.query(Vendor.name, func.count(Product.id))
.join(Product, Vendor.id == Product.vendor_id)
.group_by(Vendor.name)
.all()
)
products_by_vendor = {name or "Unknown": count for name, count in vendor_counts}
# Image storage
image_stats = image_service.get_storage_stats()
# Database size
db_size = self._get_database_size(db)
# Orders this month
start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
orders_this_month = (
db.query(func.count(Order.id))
.filter(Order.created_at >= start_of_month)
.scalar()
or 0
)
# Active vendors
active_vendors = (
db.query(func.count(Vendor.id))
.filter(Vendor.is_active == True) # noqa: E712
.scalar()
or 0
)
return {
"products_total": products_total,
"products_by_vendor": products_by_vendor,
"images_total": image_stats["total_files"],
"storage_used_gb": image_stats["total_size_gb"],
"database_size_mb": db_size,
"orders_this_month": orders_this_month,
"active_vendors": active_vendors,
}
def get_full_health_report(self, db: Session) -> dict:
"""Get comprehensive platform health report."""
# System metrics
system = self.get_system_metrics()
# Database metrics
database = self.get_database_metrics(db)
# Image storage metrics
image_storage = self.get_image_storage_metrics()
# Calculate thresholds
thresholds = self._calculate_thresholds(system, database, image_storage)
# Generate recommendations
recommendations = self._generate_recommendations(thresholds, database)
# Determine infrastructure tier
tier, next_trigger = self._determine_tier(
database["vendors_count"], database["products_count"]
)
# Overall status
overall_status = self._determine_overall_status(thresholds)
return {
"timestamp": datetime.utcnow().isoformat(),
"overall_status": overall_status,
"system": system,
"database": database,
"image_storage": image_storage,
"thresholds": thresholds,
"recommendations": recommendations,
"infrastructure_tier": tier,
"next_tier_trigger": next_trigger,
}
def _get_database_size(self, db: Session) -> float:
"""Get database size in MB."""
try:
# Try SQLite approach
result = db.execute(
text(
"SELECT page_count * page_size as size "
"FROM pragma_page_count(), pragma_page_size()"
)
)
row = result.fetchone()
if row:
return round(row[0] / (1024 * 1024), 2)
except Exception:
pass
try:
# Try PostgreSQL approach
result = db.execute(text("SELECT pg_database_size(current_database())"))
row = result.fetchone()
if row:
return round(row[0] / (1024 * 1024), 2)
except Exception:
pass
return 0.0
def _calculate_thresholds(
self, system: dict, database: dict, image_storage: dict
) -> list[dict]:
"""Calculate threshold status for each metric."""
thresholds = []
# Products threshold
products_config = CAPACITY_THRESHOLDS["products_total"]
thresholds.append(
self._create_threshold(
"Products",
database["products_count"],
products_config["warning"],
products_config["critical"],
products_config["limit"],
)
)
# Storage threshold
storage_config = CAPACITY_THRESHOLDS["storage_gb"]
thresholds.append(
self._create_threshold(
"Image Storage (GB)",
image_storage["total_size_gb"],
storage_config["warning"],
storage_config["critical"],
storage_config["limit"],
)
)
# Database size threshold
db_config = CAPACITY_THRESHOLDS["db_size_mb"]
thresholds.append(
self._create_threshold(
"Database (MB)",
database["size_mb"],
db_config["warning"],
db_config["critical"],
db_config["limit"],
)
)
# Disk threshold
disk_config = CAPACITY_THRESHOLDS["disk_percent"]
thresholds.append(
self._create_threshold(
"Disk Usage (%)",
system["disk_percent"],
disk_config["warning"],
disk_config["critical"],
disk_config["limit"],
)
)
# Memory threshold
memory_config = CAPACITY_THRESHOLDS["memory_percent"]
thresholds.append(
self._create_threshold(
"Memory Usage (%)",
system["memory_percent"],
memory_config["warning"],
memory_config["critical"],
memory_config["limit"],
)
)
# CPU threshold
cpu_config = CAPACITY_THRESHOLDS["cpu_percent"]
thresholds.append(
self._create_threshold(
"CPU Usage (%)",
system["cpu_percent"],
cpu_config["warning"],
cpu_config["critical"],
cpu_config["limit"],
)
)
return thresholds
def _create_threshold(
self, name: str, current: float, warning: float, critical: float, limit: float
) -> dict:
"""Create a threshold status object."""
percent_used = (current / limit) * 100 if limit > 0 else 0
if current >= critical:
status = "critical"
elif current >= warning:
status = "warning"
else:
status = "ok"
return {
"name": name,
"current": current,
"warning": warning,
"critical": critical,
"limit": limit,
"status": status,
"percent_used": round(percent_used, 1),
}
def _generate_recommendations(
self, thresholds: list[dict], database: dict
) -> list[dict]:
"""Generate scaling recommendations based on thresholds."""
recommendations = []
for threshold in thresholds:
if threshold["status"] == "critical":
recommendations.append(
{
"priority": "critical",
"title": f"{threshold['name']} at critical level",
"description": (
f"Currently at {threshold['percent_used']:.0f}% of capacity "
f"({threshold['current']:.0f} of {threshold['limit']:.0f})"
),
"action": "Immediate scaling or cleanup required",
}
)
elif threshold["status"] == "warning":
recommendations.append(
{
"priority": "warning",
"title": f"{threshold['name']} approaching limit",
"description": (
f"Currently at {threshold['percent_used']:.0f}% of capacity "
f"({threshold['current']:.0f} of {threshold['limit']:.0f})"
),
"action": "Plan scaling in the next 2-4 weeks",
}
)
# Add tier-based recommendations
if database["vendors_count"] > 0:
tier, next_trigger = self._determine_tier(
database["vendors_count"], database["products_count"]
)
if next_trigger:
recommendations.append(
{
"priority": "info",
"title": f"Current tier: {tier}",
"description": next_trigger,
"action": "Review capacity planning documentation",
}
)
# If no issues, add positive status
if not recommendations:
recommendations.append(
{
"priority": "info",
"title": "All systems healthy",
"description": "No capacity concerns at this time",
"action": None,
}
)
return recommendations
def _determine_tier(self, vendors: int, products: int) -> tuple[str, str | None]:
"""Determine current infrastructure tier and next trigger."""
current_tier = "Starter"
next_trigger = None
for i, tier in enumerate(INFRASTRUCTURE_TIERS):
max_clients = tier["max_clients"]
max_products = tier["max_products"]
if max_clients is None:
current_tier = tier["name"]
break
if vendors <= max_clients and products <= max_products:
current_tier = tier["name"]
# Check proximity to next tier
if i < len(INFRASTRUCTURE_TIERS) - 1:
next_tier = INFRASTRUCTURE_TIERS[i + 1]
vendor_percent = (vendors / max_clients) * 100
product_percent = (products / max_products) * 100
if vendor_percent > 70 or product_percent > 70:
next_trigger = (
f"Approaching {next_tier['name']} tier "
f"(vendors: {vendor_percent:.0f}%, products: {product_percent:.0f}%)"
)
break
return current_tier, next_trigger
def _determine_overall_status(self, thresholds: list[dict]) -> str:
"""Determine overall platform status."""
statuses = [t["status"] for t in thresholds]
if "critical" in statuses:
return "critical"
elif "warning" in statuses:
return "degraded"
else:
return "healthy"
# Create service instance
platform_health_service = PlatformHealthService()