Files
orion/app/core/observability.py
Samir Boulahtit b68d542258
Some checks failed
CI / ruff (push) Successful in 10s
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / pytest (push) Has been cancelled
fix(security): harden Redis auth, restrict /metrics, document Gitea port fix
- Add Redis password via REDIS_PASSWORD env var (--requirepass flag)
- Update all REDIS_URL and REDIS_ADDR references to include password
- Restrict /metrics endpoint to localhost and Docker internal networks (403 for external requests)
- Document Gitea port 3000 localhost binding fix (must be applied manually on server)
- Add REDIS_PASSWORD to .env.example

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 23:15:15 +01:00

702 lines
19 KiB
Python

# app/core/observability.py
"""
Observability framework for the platform.
Provides infrastructure-level monitoring and health check aggregation:
- Prometheus metrics registry and /metrics endpoint
- Health check aggregation from modules
- Sentry error tracking initialization
- External tool integration (Flower, Grafana)
This is part of the Framework Layer - infrastructure that modules depend on,
not a module itself.
Usage:
# In main.py lifespan
from app.core.observability import init_observability, shutdown_observability
@asynccontextmanager
async def lifespan(app: FastAPI):
init_observability()
yield
shutdown_observability()
# Register health endpoint
from app.core.observability import health_router
app.include_router(health_router)
"""
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from fastapi import APIRouter, Request, Response
from fastapi.responses import JSONResponse
logger = logging.getLogger(__name__)
# =============================================================================
# Health Check Types
# =============================================================================
class HealthStatus(str, Enum):
"""Health status levels."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class HealthCheckResult:
"""Result of a health check."""
name: str
status: HealthStatus
message: str = ""
latency_ms: float = 0.0
details: dict[str, Any] = field(default_factory=dict)
checked_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class AggregatedHealth:
"""Aggregated health status from all checks."""
status: HealthStatus
checks: list[HealthCheckResult]
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON response."""
return {
"status": self.status.value,
"timestamp": self.timestamp.isoformat(),
"checks": [
{
"name": check.name,
"status": check.status.value,
"message": check.message,
"latency_ms": round(check.latency_ms, 2),
"details": check.details,
}
for check in self.checks
],
}
# =============================================================================
# Health Check Registry
# =============================================================================
class HealthCheckRegistry:
"""
Registry for health check functions.
Components register their health checks here, and the /health endpoint
aggregates all results.
Example:
@health_registry.register("database")
def check_database() -> HealthCheckResult:
try:
db.execute("SELECT 1")
return HealthCheckResult(name="database", status=HealthStatus.HEALTHY)
except Exception as e:
return HealthCheckResult(
name="database",
status=HealthStatus.UNHEALTHY,
message=str(e)
)
"""
def __init__(self) -> None:
self._checks: dict[str, Callable[[], HealthCheckResult]] = {}
def register(
self,
name: str,
) -> Callable[[Callable[[], HealthCheckResult]], Callable[[], HealthCheckResult]]:
"""
Decorator to register a health check.
Args:
name: Name of the health check
Returns:
Decorator function
"""
def decorator(
func: Callable[[], HealthCheckResult],
) -> Callable[[], HealthCheckResult]:
self._checks[name] = func
logger.debug(f"Registered health check: {name}")
return func
return decorator
def register_check(
self,
name: str,
check: Callable[[], HealthCheckResult],
) -> None:
"""
Register a health check function directly.
Args:
name: Name of the health check
check: Health check function
"""
self._checks[name] = check
logger.debug(f"Registered health check: {name}")
def unregister(self, name: str) -> bool:
"""
Unregister a health check.
Args:
name: Name of the health check to remove
Returns:
True if check was found and removed
"""
if name in self._checks:
del self._checks[name]
return True
return False
def run_all(self) -> AggregatedHealth:
"""
Run all registered health checks and aggregate results.
Returns:
Aggregated health status
"""
results: list[HealthCheckResult] = []
overall_status = HealthStatus.HEALTHY
for name, check_func in self._checks.items():
start = time.perf_counter()
try:
result = check_func()
result.latency_ms = (time.perf_counter() - start) * 1000
except Exception as e:
result = HealthCheckResult(
name=name,
status=HealthStatus.UNHEALTHY,
message=f"Check failed: {e}",
latency_ms=(time.perf_counter() - start) * 1000,
)
logger.exception(f"Health check {name} failed")
results.append(result)
# Determine overall status (worst wins)
if result.status == HealthStatus.UNHEALTHY:
overall_status = HealthStatus.UNHEALTHY
elif result.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY:
overall_status = HealthStatus.DEGRADED
return AggregatedHealth(status=overall_status, checks=results)
def get_check_names(self) -> list[str]:
"""Get names of all registered checks."""
return list(self._checks.keys())
# Global health check registry
health_registry = HealthCheckRegistry()
# =============================================================================
# Prometheus Metrics (Placeholder)
# =============================================================================
class MetricsRegistry:
"""
Prometheus metrics registry.
This is a placeholder implementation. For production, integrate with
prometheus_client library.
Example:
from prometheus_client import Counter, Histogram
request_count = metrics_registry.counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
"""
def __init__(self) -> None:
self._metrics: dict[str, Any] = {}
self._enabled = False
def enable(self) -> None:
"""Enable metrics collection."""
self._enabled = True
logger.info("Prometheus metrics enabled")
def disable(self) -> None:
"""Disable metrics collection."""
self._enabled = False
def counter(
self,
name: str,
description: str,
labels: list[str] | None = None,
) -> Any:
"""
Create a counter metric.
Args:
name: Metric name
description: Metric description
labels: Label names
Returns:
Counter metric (or placeholder if prometheus_client not installed)
"""
if not self._enabled:
return _DummyMetric()
try:
from prometheus_client import Counter
metric = Counter(name, description, labels or [])
self._metrics[name] = metric
return metric
except ImportError:
return _DummyMetric()
def histogram(
self,
name: str,
description: str,
labels: list[str] | None = None,
buckets: list[float] | None = None,
) -> Any:
"""
Create a histogram metric.
Args:
name: Metric name
description: Metric description
labels: Label names
buckets: Histogram buckets
Returns:
Histogram metric (or placeholder)
"""
if not self._enabled:
return _DummyMetric()
try:
from prometheus_client import Histogram
kwargs: dict[str, Any] = {}
if buckets:
kwargs["buckets"] = buckets
metric = Histogram(name, description, labels or [], **kwargs)
self._metrics[name] = metric
return metric
except ImportError:
return _DummyMetric()
def gauge(
self,
name: str,
description: str,
labels: list[str] | None = None,
) -> Any:
"""
Create a gauge metric.
Args:
name: Metric name
description: Metric description
labels: Label names
Returns:
Gauge metric (or placeholder)
"""
if not self._enabled:
return _DummyMetric()
try:
from prometheus_client import Gauge
metric = Gauge(name, description, labels or [])
self._metrics[name] = metric
return metric
except ImportError:
return _DummyMetric()
def generate_latest(self) -> bytes:
"""
Generate Prometheus metrics output.
Returns:
Prometheus metrics in text format
"""
if not self._enabled:
return b"# Metrics not enabled\n"
try:
from prometheus_client import generate_latest
return generate_latest()
except ImportError:
return b"# prometheus_client not installed\n"
class _DummyMetric:
"""Placeholder metric when prometheus_client is not available."""
def labels(self, *args: Any, **kwargs: Any) -> "_DummyMetric":
return self
def inc(self, amount: float = 1) -> None:
pass
def dec(self, amount: float = 1) -> None:
pass
def set(self, value: float) -> None:
pass
def observe(self, amount: float) -> None:
pass
# Global metrics registry
metrics_registry = MetricsRegistry()
# =============================================================================
# Sentry Integration (Placeholder)
# =============================================================================
class SentryIntegration:
"""
Sentry error tracking integration.
This is a placeholder. For production, configure with actual Sentry DSN.
Example:
sentry.init(dsn="https://key@sentry.io/project")
sentry.capture_exception(error)
"""
def __init__(self) -> None:
self._initialized = False
self._dsn: str | None = None
def init(
self,
dsn: str | None = None,
environment: str = "development",
release: str | None = None,
) -> None:
"""
Initialize Sentry SDK.
Args:
dsn: Sentry DSN
environment: Environment name
release: Release version
"""
if not dsn:
logger.info("Sentry DSN not provided, error tracking disabled")
return
try:
import sentry_sdk
sentry_sdk.init(
dsn=dsn,
environment=environment,
release=release,
traces_sample_rate=0.1,
)
self._initialized = True
self._dsn = dsn
logger.info(f"Sentry initialized for environment: {environment}")
except ImportError:
logger.warning("sentry_sdk not installed, error tracking disabled")
def capture_exception(self, error: Exception) -> str | None:
"""
Capture an exception.
Args:
error: Exception to capture
Returns:
Event ID if captured, None otherwise
"""
if not self._initialized:
return None
try:
import sentry_sdk
return sentry_sdk.capture_exception(error)
except ImportError:
return None
def capture_message(self, message: str, level: str = "info") -> str | None:
"""
Capture a message.
Args:
message: Message to capture
level: Log level
Returns:
Event ID if captured, None otherwise
"""
if not self._initialized:
return None
try:
import sentry_sdk
return sentry_sdk.capture_message(message, level=level)
except ImportError:
return None
# Global Sentry instance
sentry = SentryIntegration()
# =============================================================================
# External Tool URLs
# =============================================================================
@dataclass
class ExternalToolConfig:
"""Configuration for external monitoring tools."""
flower_url: str | None = None
grafana_url: str | None = None
prometheus_url: str | None = None
def to_dict(self) -> dict[str, str | None]:
return {
"flower": self.flower_url,
"grafana": self.grafana_url,
"prometheus": self.prometheus_url,
}
# Global external tool config
external_tools = ExternalToolConfig()
# =============================================================================
# Health API Router
# =============================================================================
health_router = APIRouter(tags=["Health"])
@health_router.get("/health/live")
async def liveness_check() -> dict[str, str]:
"""
Kubernetes liveness probe endpoint.
Returns 200 if the application is running.
"""
return {"status": "alive"}
@health_router.get("/health/ready")
async def readiness_check() -> dict[str, Any]:
"""
Kubernetes readiness probe endpoint.
Returns 200 if the application is ready to serve traffic.
Includes individual check details with name, status, and latency.
"""
result = health_registry.run_all()
return result.to_dict()
@health_router.get("/metrics")
async def metrics_endpoint(request: Request) -> Response:
"""
Prometheus metrics endpoint.
Returns metrics in Prometheus text format for scraping.
Restricted to localhost and Docker internal networks only.
"""
client_ip = request.client.host if request.client else None
allowed_prefixes = ("127.", "10.", "172.16.", "172.17.", "172.18.", "172.19.",
"172.20.", "172.21.", "172.22.", "172.23.", "172.24.",
"172.25.", "172.26.", "172.27.", "172.28.", "172.29.",
"172.30.", "172.31.", "192.168.", "::1")
if not client_ip or not client_ip.startswith(allowed_prefixes):
return JSONResponse(status_code=403, content={"detail": "Forbidden"})
content = metrics_registry.generate_latest()
return Response(
content=content,
media_type="text/plain; charset=utf-8",
)
@health_router.get("/health/tools")
async def external_tools_endpoint() -> dict[str, str | None]:
"""
Get URLs for external monitoring tools.
Returns links to Flower, Grafana, etc.
"""
return external_tools.to_dict()
# =============================================================================
# Initialization Functions
# =============================================================================
def _register_infrastructure_health_checks() -> None:
"""Register health checks for core infrastructure (PostgreSQL, Redis)."""
from .config import settings
@health_registry.register("database")
def check_database() -> HealthCheckResult:
try:
from .database import engine
with engine.connect() as conn:
from sqlalchemy import text
conn.execute(text("SELECT 1"))
return HealthCheckResult(name="database", status=HealthStatus.HEALTHY)
except Exception as e:
return HealthCheckResult(
name="database",
status=HealthStatus.UNHEALTHY,
message=str(e),
)
@health_registry.register("redis")
def check_redis() -> HealthCheckResult:
try:
import redis
r = redis.from_url(settings.redis_url, socket_connect_timeout=2)
r.ping()
r.close()
return HealthCheckResult(name="redis", status=HealthStatus.HEALTHY)
except Exception as e:
return HealthCheckResult(
name="redis",
status=HealthStatus.UNHEALTHY,
message=str(e),
)
def init_observability(
enable_metrics: bool = False,
sentry_dsn: str | None = None,
environment: str = "development",
flower_url: str | None = None,
grafana_url: str | None = None,
) -> None:
"""
Initialize observability stack.
Args:
enable_metrics: Whether to enable Prometheus metrics
sentry_dsn: Sentry DSN for error tracking
environment: Environment name (development, staging, production)
flower_url: URL to Flower dashboard
grafana_url: URL to Grafana dashboards
"""
logger.info("Initializing observability stack...")
# Register infrastructure health checks
_register_infrastructure_health_checks()
# Enable metrics if requested
if enable_metrics:
metrics_registry.enable()
# Initialize Sentry
if sentry_dsn:
sentry.init(dsn=sentry_dsn, environment=environment)
# Configure external tools
external_tools.flower_url = flower_url
external_tools.grafana_url = grafana_url
logger.info("Observability stack initialized")
def shutdown_observability() -> None:
"""Shutdown observability stack."""
logger.info("Shutting down observability stack...")
metrics_registry.disable()
def register_module_health_checks() -> None:
"""
Register health checks from all modules.
This should be called after all modules are loaded.
"""
from app.modules.registry import MODULES
for module in MODULES.values():
if module.health_check:
health_registry.register_check(
f"module:{module.code}",
lambda m=module: HealthCheckResult(
name=f"module:{m.code}",
**m.get_health_status(),
),
)
logger.debug(f"Registered health check for module {module.code}")
__all__ = [
# Health checks
"HealthStatus",
"HealthCheckResult",
"AggregatedHealth",
"HealthCheckRegistry",
"health_registry",
# Metrics
"MetricsRegistry",
"metrics_registry",
# Sentry
"SentryIntegration",
"sentry",
# External tools
"ExternalToolConfig",
"external_tools",
# Router
"health_router",
# Initialization
"init_observability",
"shutdown_observability",
"register_module_health_checks",
]