# app/core/observability.py """ Observability framework for the platform. Provides infrastructure-level monitoring and health check aggregation: - Prometheus metrics registry and /metrics endpoint - Health check aggregation from modules - Sentry error tracking initialization - External tool integration (Flower, Grafana) This is part of the Framework Layer - infrastructure that modules depend on, not a module itself. Usage: # In main.py lifespan from app.core.observability import init_observability, shutdown_observability @asynccontextmanager async def lifespan(app: FastAPI): init_observability() yield shutdown_observability() # Register health endpoint from app.core.observability import health_router app.include_router(health_router) """ import logging import time from collections.abc import Callable from dataclasses import dataclass, field from datetime import UTC, datetime from enum import Enum from typing import Any from fastapi import APIRouter, Request, Response from fastapi.responses import JSONResponse logger = logging.getLogger(__name__) # ============================================================================= # Health Check Types # ============================================================================= class HealthStatus(str, Enum): """Health status levels.""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" @dataclass class HealthCheckResult: """Result of a health check.""" name: str status: HealthStatus message: str = "" latency_ms: float = 0.0 details: dict[str, Any] = field(default_factory=dict) checked_at: datetime = field(default_factory=lambda: datetime.now(UTC)) @dataclass class AggregatedHealth: """Aggregated health status from all checks.""" status: HealthStatus checks: list[HealthCheckResult] timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON response.""" return { "status": self.status.value, "timestamp": self.timestamp.isoformat(), "checks": [ { "name": check.name, "status": check.status.value, "message": check.message, "latency_ms": round(check.latency_ms, 2), "details": check.details, } for check in self.checks ], } # ============================================================================= # Health Check Registry # ============================================================================= class HealthCheckRegistry: """ Registry for health check functions. Components register their health checks here, and the /health endpoint aggregates all results. Example: @health_registry.register("database") def check_database() -> HealthCheckResult: try: db.execute("SELECT 1") return HealthCheckResult(name="database", status=HealthStatus.HEALTHY) except Exception as e: return HealthCheckResult( name="database", status=HealthStatus.UNHEALTHY, message=str(e) ) """ def __init__(self) -> None: self._checks: dict[str, Callable[[], HealthCheckResult]] = {} def register( self, name: str, ) -> Callable[[Callable[[], HealthCheckResult]], Callable[[], HealthCheckResult]]: """ Decorator to register a health check. Args: name: Name of the health check Returns: Decorator function """ def decorator( func: Callable[[], HealthCheckResult], ) -> Callable[[], HealthCheckResult]: self._checks[name] = func logger.debug(f"Registered health check: {name}") return func return decorator def register_check( self, name: str, check: Callable[[], HealthCheckResult], ) -> None: """ Register a health check function directly. Args: name: Name of the health check check: Health check function """ self._checks[name] = check logger.debug(f"Registered health check: {name}") def unregister(self, name: str) -> bool: """ Unregister a health check. Args: name: Name of the health check to remove Returns: True if check was found and removed """ if name in self._checks: del self._checks[name] return True return False def run_all(self) -> AggregatedHealth: """ Run all registered health checks and aggregate results. Returns: Aggregated health status """ results: list[HealthCheckResult] = [] overall_status = HealthStatus.HEALTHY for name, check_func in self._checks.items(): start = time.perf_counter() try: result = check_func() result.latency_ms = (time.perf_counter() - start) * 1000 except Exception as e: result = HealthCheckResult( name=name, status=HealthStatus.UNHEALTHY, message=f"Check failed: {e}", latency_ms=(time.perf_counter() - start) * 1000, ) logger.exception(f"Health check {name} failed") results.append(result) # Determine overall status (worst wins) if result.status == HealthStatus.UNHEALTHY: overall_status = HealthStatus.UNHEALTHY elif result.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY: overall_status = HealthStatus.DEGRADED return AggregatedHealth(status=overall_status, checks=results) def get_check_names(self) -> list[str]: """Get names of all registered checks.""" return list(self._checks.keys()) # Global health check registry health_registry = HealthCheckRegistry() # ============================================================================= # Prometheus Metrics (Placeholder) # ============================================================================= class MetricsRegistry: """ Prometheus metrics registry. This is a placeholder implementation. For production, integrate with prometheus_client library. Example: from prometheus_client import Counter, Histogram request_count = metrics_registry.counter( "http_requests_total", "Total HTTP requests", ["method", "endpoint", "status"] ) """ def __init__(self) -> None: self._metrics: dict[str, Any] = {} self._enabled = False def enable(self) -> None: """Enable metrics collection.""" self._enabled = True logger.info("Prometheus metrics enabled") def disable(self) -> None: """Disable metrics collection.""" self._enabled = False def counter( self, name: str, description: str, labels: list[str] | None = None, ) -> Any: """ Create a counter metric. Args: name: Metric name description: Metric description labels: Label names Returns: Counter metric (or placeholder if prometheus_client not installed) """ if not self._enabled: return _DummyMetric() try: from prometheus_client import Counter metric = Counter(name, description, labels or []) self._metrics[name] = metric return metric except ImportError: return _DummyMetric() def histogram( self, name: str, description: str, labels: list[str] | None = None, buckets: list[float] | None = None, ) -> Any: """ Create a histogram metric. Args: name: Metric name description: Metric description labels: Label names buckets: Histogram buckets Returns: Histogram metric (or placeholder) """ if not self._enabled: return _DummyMetric() try: from prometheus_client import Histogram kwargs: dict[str, Any] = {} if buckets: kwargs["buckets"] = buckets metric = Histogram(name, description, labels or [], **kwargs) self._metrics[name] = metric return metric except ImportError: return _DummyMetric() def gauge( self, name: str, description: str, labels: list[str] | None = None, ) -> Any: """ Create a gauge metric. Args: name: Metric name description: Metric description labels: Label names Returns: Gauge metric (or placeholder) """ if not self._enabled: return _DummyMetric() try: from prometheus_client import Gauge metric = Gauge(name, description, labels or []) self._metrics[name] = metric return metric except ImportError: return _DummyMetric() def generate_latest(self) -> bytes: """ Generate Prometheus metrics output. Returns: Prometheus metrics in text format """ if not self._enabled: return b"# Metrics not enabled\n" try: from prometheus_client import generate_latest return generate_latest() except ImportError: return b"# prometheus_client not installed\n" class _DummyMetric: """Placeholder metric when prometheus_client is not available.""" def labels(self, *args: Any, **kwargs: Any) -> "_DummyMetric": return self def inc(self, amount: float = 1) -> None: pass def dec(self, amount: float = 1) -> None: pass def set(self, value: float) -> None: pass def observe(self, amount: float) -> None: pass # Global metrics registry metrics_registry = MetricsRegistry() # ============================================================================= # Sentry Integration (Placeholder) # ============================================================================= class SentryIntegration: """ Sentry error tracking integration. This is a placeholder. For production, configure with actual Sentry DSN. Example: sentry.init(dsn="https://key@sentry.io/project") sentry.capture_exception(error) """ def __init__(self) -> None: self._initialized = False self._dsn: str | None = None def init( self, dsn: str | None = None, environment: str = "development", release: str | None = None, ) -> None: """ Initialize Sentry SDK. Args: dsn: Sentry DSN environment: Environment name release: Release version """ if not dsn: logger.info("Sentry DSN not provided, error tracking disabled") return try: import sentry_sdk sentry_sdk.init( dsn=dsn, environment=environment, release=release, traces_sample_rate=0.1, ) self._initialized = True self._dsn = dsn logger.info(f"Sentry initialized for environment: {environment}") except ImportError: logger.warning("sentry_sdk not installed, error tracking disabled") def capture_exception(self, error: Exception) -> str | None: """ Capture an exception. Args: error: Exception to capture Returns: Event ID if captured, None otherwise """ if not self._initialized: return None try: import sentry_sdk return sentry_sdk.capture_exception(error) except ImportError: return None def capture_message(self, message: str, level: str = "info") -> str | None: """ Capture a message. Args: message: Message to capture level: Log level Returns: Event ID if captured, None otherwise """ if not self._initialized: return None try: import sentry_sdk return sentry_sdk.capture_message(message, level=level) except ImportError: return None # Global Sentry instance sentry = SentryIntegration() # ============================================================================= # External Tool URLs # ============================================================================= @dataclass class ExternalToolConfig: """Configuration for external monitoring tools.""" flower_url: str | None = None grafana_url: str | None = None prometheus_url: str | None = None def to_dict(self) -> dict[str, str | None]: return { "flower": self.flower_url, "grafana": self.grafana_url, "prometheus": self.prometheus_url, } # Global external tool config external_tools = ExternalToolConfig() # ============================================================================= # Health API Router # ============================================================================= health_router = APIRouter(tags=["Health"]) @health_router.get("/health/live") async def liveness_check() -> dict[str, str]: """ Kubernetes liveness probe endpoint. Returns 200 if the application is running. """ return {"status": "alive"} @health_router.get("/health/ready") async def readiness_check() -> dict[str, Any]: """ Kubernetes readiness probe endpoint. Returns 200 if the application is ready to serve traffic. Includes individual check details with name, status, and latency. """ result = health_registry.run_all() return result.to_dict() @health_router.get("/metrics") async def metrics_endpoint(request: Request) -> Response: """ Prometheus metrics endpoint. Returns metrics in Prometheus text format for scraping. Restricted to localhost and Docker internal networks only. """ client_ip = request.client.host if request.client else None allowed_prefixes = ("127.", "10.", "172.16.", "172.17.", "172.18.", "172.19.", "172.20.", "172.21.", "172.22.", "172.23.", "172.24.", "172.25.", "172.26.", "172.27.", "172.28.", "172.29.", "172.30.", "172.31.", "192.168.", "::1") if not client_ip or not client_ip.startswith(allowed_prefixes): return JSONResponse(status_code=403, content={"detail": "Forbidden"}) content = metrics_registry.generate_latest() return Response( content=content, media_type="text/plain; charset=utf-8", ) @health_router.get("/health/tools") async def external_tools_endpoint() -> dict[str, str | None]: """ Get URLs for external monitoring tools. Returns links to Flower, Grafana, etc. """ return external_tools.to_dict() # ============================================================================= # Initialization Functions # ============================================================================= def _register_infrastructure_health_checks() -> None: """Register health checks for core infrastructure (PostgreSQL, Redis).""" from .config import settings @health_registry.register("database") def check_database() -> HealthCheckResult: try: from .database import engine with engine.connect() as conn: from sqlalchemy import text conn.execute(text("SELECT 1")) return HealthCheckResult(name="database", status=HealthStatus.HEALTHY) except Exception as e: return HealthCheckResult( name="database", status=HealthStatus.UNHEALTHY, message=str(e), ) @health_registry.register("redis") def check_redis() -> HealthCheckResult: try: import redis r = redis.from_url(settings.redis_url, socket_connect_timeout=2) r.ping() r.close() return HealthCheckResult(name="redis", status=HealthStatus.HEALTHY) except Exception as e: return HealthCheckResult( name="redis", status=HealthStatus.UNHEALTHY, message=str(e), ) def init_observability( enable_metrics: bool = False, sentry_dsn: str | None = None, environment: str = "development", flower_url: str | None = None, grafana_url: str | None = None, ) -> None: """ Initialize observability stack. Args: enable_metrics: Whether to enable Prometheus metrics sentry_dsn: Sentry DSN for error tracking environment: Environment name (development, staging, production) flower_url: URL to Flower dashboard grafana_url: URL to Grafana dashboards """ logger.info("Initializing observability stack...") # Register infrastructure health checks _register_infrastructure_health_checks() # Enable metrics if requested if enable_metrics: metrics_registry.enable() # Initialize Sentry if sentry_dsn: sentry.init(dsn=sentry_dsn, environment=environment) # Configure external tools external_tools.flower_url = flower_url external_tools.grafana_url = grafana_url logger.info("Observability stack initialized") def shutdown_observability() -> None: """Shutdown observability stack.""" logger.info("Shutting down observability stack...") metrics_registry.disable() def register_module_health_checks() -> None: """ Register health checks from all modules. This should be called after all modules are loaded. """ from app.modules.registry import MODULES for module in MODULES.values(): if module.health_check: health_registry.register_check( f"module:{module.code}", lambda m=module: HealthCheckResult( name=f"module:{m.code}", **m.get_health_status(), ), ) logger.debug(f"Registered health check for module {module.code}") __all__ = [ # Health checks "HealthStatus", "HealthCheckResult", "AggregatedHealth", "HealthCheckRegistry", "health_registry", # Metrics "MetricsRegistry", "metrics_registry", # Sentry "SentryIntegration", "sentry", # External tools "ExternalToolConfig", "external_tools", # Router "health_router", # Initialization "init_observability", "shutdown_observability", "register_module_health_checks", ]