feat: implement three-tier module classification and framework layer
Module Classification: - Core (4): core, tenancy, cms, customers - always enabled - Optional (7): payments, billing, inventory, orders, marketplace, analytics, messaging - Internal (2): dev-tools, monitoring - admin-only Key Changes: - Rename platform-admin module to tenancy - Promote CMS and Customers to core modules - Create new payments module (gateway abstractions) - Add billing→payments and orders→payments dependencies - Mark dev-tools and monitoring as internal modules New Infrastructure: - app/modules/events.py: Module event bus (ENABLED, DISABLED, STARTUP, SHUTDOWN) - app/modules/migrations.py: Module-specific migration discovery - app/core/observability.py: Health checks, Prometheus metrics, Sentry integration Enhanced ModuleDefinition: - version, is_internal, permissions - config_schema, default_config - migrations_path - Lifecycle hooks: on_enable, on_disable, on_startup, health_check New Registry Functions: - get_optional_module_codes(), get_internal_module_codes() - is_core_module(), is_internal_module() - get_modules_by_tier(), get_module_tier() Migrations: - zc*: Rename platform-admin to tenancy - zd*: Ensure CMS and Customers enabled for all platforms Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
664
app/core/observability.py
Normal file
664
app/core/observability.py
Normal file
@@ -0,0 +1,664 @@
|
||||
# app/core/observability.py
|
||||
"""
|
||||
Observability framework for the platform.
|
||||
|
||||
Provides infrastructure-level monitoring and health check aggregation:
|
||||
- Prometheus metrics registry and /metrics endpoint
|
||||
- Health check aggregation from modules
|
||||
- Sentry error tracking initialization
|
||||
- External tool integration (Flower, Grafana)
|
||||
|
||||
This is part of the Framework Layer - infrastructure that modules depend on,
|
||||
not a module itself.
|
||||
|
||||
Usage:
|
||||
# In main.py lifespan
|
||||
from app.core.observability import init_observability, shutdown_observability
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
init_observability()
|
||||
yield
|
||||
shutdown_observability()
|
||||
|
||||
# Register health endpoint
|
||||
from app.core.observability import health_router
|
||||
app.include_router(health_router)
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Response
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Health Check Types
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class HealthStatus(str, Enum):
|
||||
"""Health status levels."""
|
||||
|
||||
HEALTHY = "healthy"
|
||||
DEGRADED = "degraded"
|
||||
UNHEALTHY = "unhealthy"
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthCheckResult:
|
||||
"""Result of a health check."""
|
||||
|
||||
name: str
|
||||
status: HealthStatus
|
||||
message: str = ""
|
||||
latency_ms: float = 0.0
|
||||
details: dict[str, Any] = field(default_factory=dict)
|
||||
checked_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
|
||||
|
||||
@dataclass
|
||||
class AggregatedHealth:
|
||||
"""Aggregated health status from all checks."""
|
||||
|
||||
status: HealthStatus
|
||||
checks: list[HealthCheckResult]
|
||||
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for JSON response."""
|
||||
return {
|
||||
"status": self.status.value,
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"checks": [
|
||||
{
|
||||
"name": check.name,
|
||||
"status": check.status.value,
|
||||
"message": check.message,
|
||||
"latency_ms": round(check.latency_ms, 2),
|
||||
"details": check.details,
|
||||
}
|
||||
for check in self.checks
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Health Check Registry
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class HealthCheckRegistry:
|
||||
"""
|
||||
Registry for health check functions.
|
||||
|
||||
Components register their health checks here, and the /health endpoint
|
||||
aggregates all results.
|
||||
|
||||
Example:
|
||||
@health_registry.register("database")
|
||||
def check_database() -> HealthCheckResult:
|
||||
try:
|
||||
db.execute("SELECT 1")
|
||||
return HealthCheckResult(name="database", status=HealthStatus.HEALTHY)
|
||||
except Exception as e:
|
||||
return HealthCheckResult(
|
||||
name="database",
|
||||
status=HealthStatus.UNHEALTHY,
|
||||
message=str(e)
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._checks: dict[str, Callable[[], HealthCheckResult]] = {}
|
||||
|
||||
def register(
|
||||
self,
|
||||
name: str,
|
||||
) -> Callable[[Callable[[], HealthCheckResult]], Callable[[], HealthCheckResult]]:
|
||||
"""
|
||||
Decorator to register a health check.
|
||||
|
||||
Args:
|
||||
name: Name of the health check
|
||||
|
||||
Returns:
|
||||
Decorator function
|
||||
"""
|
||||
|
||||
def decorator(
|
||||
func: Callable[[], HealthCheckResult],
|
||||
) -> Callable[[], HealthCheckResult]:
|
||||
self._checks[name] = func
|
||||
logger.debug(f"Registered health check: {name}")
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
def register_check(
|
||||
self,
|
||||
name: str,
|
||||
check: Callable[[], HealthCheckResult],
|
||||
) -> None:
|
||||
"""
|
||||
Register a health check function directly.
|
||||
|
||||
Args:
|
||||
name: Name of the health check
|
||||
check: Health check function
|
||||
"""
|
||||
self._checks[name] = check
|
||||
logger.debug(f"Registered health check: {name}")
|
||||
|
||||
def unregister(self, name: str) -> bool:
|
||||
"""
|
||||
Unregister a health check.
|
||||
|
||||
Args:
|
||||
name: Name of the health check to remove
|
||||
|
||||
Returns:
|
||||
True if check was found and removed
|
||||
"""
|
||||
if name in self._checks:
|
||||
del self._checks[name]
|
||||
return True
|
||||
return False
|
||||
|
||||
def run_all(self) -> AggregatedHealth:
|
||||
"""
|
||||
Run all registered health checks and aggregate results.
|
||||
|
||||
Returns:
|
||||
Aggregated health status
|
||||
"""
|
||||
results: list[HealthCheckResult] = []
|
||||
overall_status = HealthStatus.HEALTHY
|
||||
|
||||
for name, check_func in self._checks.items():
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
result = check_func()
|
||||
result.latency_ms = (time.perf_counter() - start) * 1000
|
||||
except Exception as e:
|
||||
result = HealthCheckResult(
|
||||
name=name,
|
||||
status=HealthStatus.UNHEALTHY,
|
||||
message=f"Check failed: {e}",
|
||||
latency_ms=(time.perf_counter() - start) * 1000,
|
||||
)
|
||||
logger.exception(f"Health check {name} failed")
|
||||
|
||||
results.append(result)
|
||||
|
||||
# Determine overall status (worst wins)
|
||||
if result.status == HealthStatus.UNHEALTHY:
|
||||
overall_status = HealthStatus.UNHEALTHY
|
||||
elif result.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY:
|
||||
overall_status = HealthStatus.DEGRADED
|
||||
|
||||
return AggregatedHealth(status=overall_status, checks=results)
|
||||
|
||||
def get_check_names(self) -> list[str]:
|
||||
"""Get names of all registered checks."""
|
||||
return list(self._checks.keys())
|
||||
|
||||
|
||||
# Global health check registry
|
||||
health_registry = HealthCheckRegistry()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Prometheus Metrics (Placeholder)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class MetricsRegistry:
|
||||
"""
|
||||
Prometheus metrics registry.
|
||||
|
||||
This is a placeholder implementation. For production, integrate with
|
||||
prometheus_client library.
|
||||
|
||||
Example:
|
||||
from prometheus_client import Counter, Histogram
|
||||
|
||||
request_count = metrics_registry.counter(
|
||||
"http_requests_total",
|
||||
"Total HTTP requests",
|
||||
["method", "endpoint", "status"]
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._metrics: dict[str, Any] = {}
|
||||
self._enabled = False
|
||||
|
||||
def enable(self) -> None:
|
||||
"""Enable metrics collection."""
|
||||
self._enabled = True
|
||||
logger.info("Prometheus metrics enabled")
|
||||
|
||||
def disable(self) -> None:
|
||||
"""Disable metrics collection."""
|
||||
self._enabled = False
|
||||
|
||||
def counter(
|
||||
self,
|
||||
name: str,
|
||||
description: str,
|
||||
labels: list[str] | None = None,
|
||||
) -> Any:
|
||||
"""
|
||||
Create a counter metric.
|
||||
|
||||
Args:
|
||||
name: Metric name
|
||||
description: Metric description
|
||||
labels: Label names
|
||||
|
||||
Returns:
|
||||
Counter metric (or placeholder if prometheus_client not installed)
|
||||
"""
|
||||
if not self._enabled:
|
||||
return _DummyMetric()
|
||||
|
||||
try:
|
||||
from prometheus_client import Counter
|
||||
|
||||
metric = Counter(name, description, labels or [])
|
||||
self._metrics[name] = metric
|
||||
return metric
|
||||
except ImportError:
|
||||
return _DummyMetric()
|
||||
|
||||
def histogram(
|
||||
self,
|
||||
name: str,
|
||||
description: str,
|
||||
labels: list[str] | None = None,
|
||||
buckets: list[float] | None = None,
|
||||
) -> Any:
|
||||
"""
|
||||
Create a histogram metric.
|
||||
|
||||
Args:
|
||||
name: Metric name
|
||||
description: Metric description
|
||||
labels: Label names
|
||||
buckets: Histogram buckets
|
||||
|
||||
Returns:
|
||||
Histogram metric (or placeholder)
|
||||
"""
|
||||
if not self._enabled:
|
||||
return _DummyMetric()
|
||||
|
||||
try:
|
||||
from prometheus_client import Histogram
|
||||
|
||||
kwargs: dict[str, Any] = {}
|
||||
if buckets:
|
||||
kwargs["buckets"] = buckets
|
||||
|
||||
metric = Histogram(name, description, labels or [], **kwargs)
|
||||
self._metrics[name] = metric
|
||||
return metric
|
||||
except ImportError:
|
||||
return _DummyMetric()
|
||||
|
||||
def gauge(
|
||||
self,
|
||||
name: str,
|
||||
description: str,
|
||||
labels: list[str] | None = None,
|
||||
) -> Any:
|
||||
"""
|
||||
Create a gauge metric.
|
||||
|
||||
Args:
|
||||
name: Metric name
|
||||
description: Metric description
|
||||
labels: Label names
|
||||
|
||||
Returns:
|
||||
Gauge metric (or placeholder)
|
||||
"""
|
||||
if not self._enabled:
|
||||
return _DummyMetric()
|
||||
|
||||
try:
|
||||
from prometheus_client import Gauge
|
||||
|
||||
metric = Gauge(name, description, labels or [])
|
||||
self._metrics[name] = metric
|
||||
return metric
|
||||
except ImportError:
|
||||
return _DummyMetric()
|
||||
|
||||
def generate_latest(self) -> bytes:
|
||||
"""
|
||||
Generate Prometheus metrics output.
|
||||
|
||||
Returns:
|
||||
Prometheus metrics in text format
|
||||
"""
|
||||
if not self._enabled:
|
||||
return b"# Metrics not enabled\n"
|
||||
|
||||
try:
|
||||
from prometheus_client import generate_latest
|
||||
|
||||
return generate_latest()
|
||||
except ImportError:
|
||||
return b"# prometheus_client not installed\n"
|
||||
|
||||
|
||||
class _DummyMetric:
|
||||
"""Placeholder metric when prometheus_client is not available."""
|
||||
|
||||
def labels(self, *args: Any, **kwargs: Any) -> "_DummyMetric":
|
||||
return self
|
||||
|
||||
def inc(self, amount: float = 1) -> None:
|
||||
pass
|
||||
|
||||
def dec(self, amount: float = 1) -> None:
|
||||
pass
|
||||
|
||||
def set(self, value: float) -> None:
|
||||
pass
|
||||
|
||||
def observe(self, amount: float) -> None:
|
||||
pass
|
||||
|
||||
|
||||
# Global metrics registry
|
||||
metrics_registry = MetricsRegistry()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Sentry Integration (Placeholder)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class SentryIntegration:
|
||||
"""
|
||||
Sentry error tracking integration.
|
||||
|
||||
This is a placeholder. For production, configure with actual Sentry DSN.
|
||||
|
||||
Example:
|
||||
sentry.init(dsn="https://key@sentry.io/project")
|
||||
sentry.capture_exception(error)
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._initialized = False
|
||||
self._dsn: str | None = None
|
||||
|
||||
def init(
|
||||
self,
|
||||
dsn: str | None = None,
|
||||
environment: str = "development",
|
||||
release: str | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize Sentry SDK.
|
||||
|
||||
Args:
|
||||
dsn: Sentry DSN
|
||||
environment: Environment name
|
||||
release: Release version
|
||||
"""
|
||||
if not dsn:
|
||||
logger.info("Sentry DSN not provided, error tracking disabled")
|
||||
return
|
||||
|
||||
try:
|
||||
import sentry_sdk
|
||||
|
||||
sentry_sdk.init(
|
||||
dsn=dsn,
|
||||
environment=environment,
|
||||
release=release,
|
||||
traces_sample_rate=0.1,
|
||||
)
|
||||
self._initialized = True
|
||||
self._dsn = dsn
|
||||
logger.info(f"Sentry initialized for environment: {environment}")
|
||||
except ImportError:
|
||||
logger.warning("sentry_sdk not installed, error tracking disabled")
|
||||
|
||||
def capture_exception(self, error: Exception) -> str | None:
|
||||
"""
|
||||
Capture an exception.
|
||||
|
||||
Args:
|
||||
error: Exception to capture
|
||||
|
||||
Returns:
|
||||
Event ID if captured, None otherwise
|
||||
"""
|
||||
if not self._initialized:
|
||||
return None
|
||||
|
||||
try:
|
||||
import sentry_sdk
|
||||
|
||||
return sentry_sdk.capture_exception(error)
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
def capture_message(self, message: str, level: str = "info") -> str | None:
|
||||
"""
|
||||
Capture a message.
|
||||
|
||||
Args:
|
||||
message: Message to capture
|
||||
level: Log level
|
||||
|
||||
Returns:
|
||||
Event ID if captured, None otherwise
|
||||
"""
|
||||
if not self._initialized:
|
||||
return None
|
||||
|
||||
try:
|
||||
import sentry_sdk
|
||||
|
||||
return sentry_sdk.capture_message(message, level=level)
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
|
||||
# Global Sentry instance
|
||||
sentry = SentryIntegration()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# External Tool URLs
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExternalToolConfig:
|
||||
"""Configuration for external monitoring tools."""
|
||||
|
||||
flower_url: str | None = None
|
||||
grafana_url: str | None = None
|
||||
prometheus_url: str | None = None
|
||||
|
||||
def to_dict(self) -> dict[str, str | None]:
|
||||
return {
|
||||
"flower": self.flower_url,
|
||||
"grafana": self.grafana_url,
|
||||
"prometheus": self.prometheus_url,
|
||||
}
|
||||
|
||||
|
||||
# Global external tool config
|
||||
external_tools = ExternalToolConfig()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Health API Router
|
||||
# =============================================================================
|
||||
|
||||
health_router = APIRouter(tags=["Health"])
|
||||
|
||||
|
||||
@health_router.get("/health")
|
||||
async def health_check() -> dict[str, Any]:
|
||||
"""
|
||||
Aggregated health check endpoint.
|
||||
|
||||
Returns combined health status from all registered checks.
|
||||
"""
|
||||
result = health_registry.run_all()
|
||||
return result.to_dict()
|
||||
|
||||
|
||||
@health_router.get("/health/live")
|
||||
async def liveness_check() -> dict[str, str]:
|
||||
"""
|
||||
Kubernetes liveness probe endpoint.
|
||||
|
||||
Returns 200 if the application is running.
|
||||
"""
|
||||
return {"status": "alive"}
|
||||
|
||||
|
||||
@health_router.get("/health/ready")
|
||||
async def readiness_check() -> dict[str, Any]:
|
||||
"""
|
||||
Kubernetes readiness probe endpoint.
|
||||
|
||||
Returns 200 if the application is ready to serve traffic.
|
||||
"""
|
||||
result = health_registry.run_all()
|
||||
return {
|
||||
"status": "ready" if result.status != HealthStatus.UNHEALTHY else "not_ready",
|
||||
"health": result.status.value,
|
||||
}
|
||||
|
||||
|
||||
@health_router.get("/metrics")
|
||||
async def metrics_endpoint() -> Response:
|
||||
"""
|
||||
Prometheus metrics endpoint.
|
||||
|
||||
Returns metrics in Prometheus text format for scraping.
|
||||
"""
|
||||
content = metrics_registry.generate_latest()
|
||||
return Response(
|
||||
content=content,
|
||||
media_type="text/plain; charset=utf-8",
|
||||
)
|
||||
|
||||
|
||||
@health_router.get("/health/tools")
|
||||
async def external_tools_endpoint() -> dict[str, str | None]:
|
||||
"""
|
||||
Get URLs for external monitoring tools.
|
||||
|
||||
Returns links to Flower, Grafana, etc.
|
||||
"""
|
||||
return external_tools.to_dict()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Initialization Functions
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def init_observability(
|
||||
enable_metrics: bool = False,
|
||||
sentry_dsn: str | None = None,
|
||||
environment: str = "development",
|
||||
flower_url: str | None = None,
|
||||
grafana_url: str | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize observability stack.
|
||||
|
||||
Args:
|
||||
enable_metrics: Whether to enable Prometheus metrics
|
||||
sentry_dsn: Sentry DSN for error tracking
|
||||
environment: Environment name (development, staging, production)
|
||||
flower_url: URL to Flower dashboard
|
||||
grafana_url: URL to Grafana dashboards
|
||||
"""
|
||||
logger.info("Initializing observability stack...")
|
||||
|
||||
# Enable metrics if requested
|
||||
if enable_metrics:
|
||||
metrics_registry.enable()
|
||||
|
||||
# Initialize Sentry
|
||||
if sentry_dsn:
|
||||
sentry.init(dsn=sentry_dsn, environment=environment)
|
||||
|
||||
# Configure external tools
|
||||
external_tools.flower_url = flower_url
|
||||
external_tools.grafana_url = grafana_url
|
||||
|
||||
logger.info("Observability stack initialized")
|
||||
|
||||
|
||||
def shutdown_observability() -> None:
|
||||
"""Shutdown observability stack."""
|
||||
logger.info("Shutting down observability stack...")
|
||||
metrics_registry.disable()
|
||||
|
||||
|
||||
def register_module_health_checks() -> None:
|
||||
"""
|
||||
Register health checks from all modules.
|
||||
|
||||
This should be called after all modules are loaded.
|
||||
"""
|
||||
from app.modules.registry import MODULES
|
||||
|
||||
for module in MODULES.values():
|
||||
if module.health_check:
|
||||
health_registry.register_check(
|
||||
f"module:{module.code}",
|
||||
lambda m=module: HealthCheckResult(
|
||||
name=f"module:{m.code}",
|
||||
**m.get_health_status(),
|
||||
),
|
||||
)
|
||||
logger.debug(f"Registered health check for module {module.code}")
|
||||
|
||||
|
||||
__all__ = [
|
||||
# Health checks
|
||||
"HealthStatus",
|
||||
"HealthCheckResult",
|
||||
"AggregatedHealth",
|
||||
"HealthCheckRegistry",
|
||||
"health_registry",
|
||||
# Metrics
|
||||
"MetricsRegistry",
|
||||
"metrics_registry",
|
||||
# Sentry
|
||||
"SentryIntegration",
|
||||
"sentry",
|
||||
# External tools
|
||||
"ExternalToolConfig",
|
||||
"external_tools",
|
||||
# Router
|
||||
"health_router",
|
||||
# Initialization
|
||||
"init_observability",
|
||||
"shutdown_observability",
|
||||
"register_module_health_checks",
|
||||
]
|
||||
Reference in New Issue
Block a user