Some checks failed
- Add mem_limit to all 6 app containers (db: 512m, redis: 128m, api: 512m, celery-worker: 512m, celery-beat: 128m, flower: 128m) - Restrict Flower port to localhost (127.0.0.1:5555:5555) - Add PostgreSQL and Redis health checks to /health/ready endpoint with individual check details (name, status, latency) - Add scaling guide with metrics, thresholds, Hetzner pricing - Add server verification script (12 infrastructure checks) - Update hetzner-server-setup.md with progress and pending tasks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
693 lines
18 KiB
Python
693 lines
18 KiB
Python
# app/core/observability.py
|
|
"""
|
|
Observability framework for the platform.
|
|
|
|
Provides infrastructure-level monitoring and health check aggregation:
|
|
- Prometheus metrics registry and /metrics endpoint
|
|
- Health check aggregation from modules
|
|
- Sentry error tracking initialization
|
|
- External tool integration (Flower, Grafana)
|
|
|
|
This is part of the Framework Layer - infrastructure that modules depend on,
|
|
not a module itself.
|
|
|
|
Usage:
|
|
# In main.py lifespan
|
|
from app.core.observability import init_observability, shutdown_observability
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
init_observability()
|
|
yield
|
|
shutdown_observability()
|
|
|
|
# Register health endpoint
|
|
from app.core.observability import health_router
|
|
app.include_router(health_router)
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Health Check Types
|
|
# =============================================================================
|
|
|
|
|
|
class HealthStatus(str, Enum):
|
|
"""Health status levels."""
|
|
|
|
HEALTHY = "healthy"
|
|
DEGRADED = "degraded"
|
|
UNHEALTHY = "unhealthy"
|
|
|
|
|
|
@dataclass
|
|
class HealthCheckResult:
|
|
"""Result of a health check."""
|
|
|
|
name: str
|
|
status: HealthStatus
|
|
message: str = ""
|
|
latency_ms: float = 0.0
|
|
details: dict[str, Any] = field(default_factory=dict)
|
|
checked_at: datetime = field(default_factory=lambda: datetime.now(UTC))
|
|
|
|
|
|
@dataclass
|
|
class AggregatedHealth:
|
|
"""Aggregated health status from all checks."""
|
|
|
|
status: HealthStatus
|
|
checks: list[HealthCheckResult]
|
|
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Convert to dictionary for JSON response."""
|
|
return {
|
|
"status": self.status.value,
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"checks": [
|
|
{
|
|
"name": check.name,
|
|
"status": check.status.value,
|
|
"message": check.message,
|
|
"latency_ms": round(check.latency_ms, 2),
|
|
"details": check.details,
|
|
}
|
|
for check in self.checks
|
|
],
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Health Check Registry
|
|
# =============================================================================
|
|
|
|
|
|
class HealthCheckRegistry:
|
|
"""
|
|
Registry for health check functions.
|
|
|
|
Components register their health checks here, and the /health endpoint
|
|
aggregates all results.
|
|
|
|
Example:
|
|
@health_registry.register("database")
|
|
def check_database() -> HealthCheckResult:
|
|
try:
|
|
db.execute("SELECT 1")
|
|
return HealthCheckResult(name="database", status=HealthStatus.HEALTHY)
|
|
except Exception as e:
|
|
return HealthCheckResult(
|
|
name="database",
|
|
status=HealthStatus.UNHEALTHY,
|
|
message=str(e)
|
|
)
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._checks: dict[str, Callable[[], HealthCheckResult]] = {}
|
|
|
|
def register(
|
|
self,
|
|
name: str,
|
|
) -> Callable[[Callable[[], HealthCheckResult]], Callable[[], HealthCheckResult]]:
|
|
"""
|
|
Decorator to register a health check.
|
|
|
|
Args:
|
|
name: Name of the health check
|
|
|
|
Returns:
|
|
Decorator function
|
|
"""
|
|
|
|
def decorator(
|
|
func: Callable[[], HealthCheckResult],
|
|
) -> Callable[[], HealthCheckResult]:
|
|
self._checks[name] = func
|
|
logger.debug(f"Registered health check: {name}")
|
|
return func
|
|
|
|
return decorator
|
|
|
|
def register_check(
|
|
self,
|
|
name: str,
|
|
check: Callable[[], HealthCheckResult],
|
|
) -> None:
|
|
"""
|
|
Register a health check function directly.
|
|
|
|
Args:
|
|
name: Name of the health check
|
|
check: Health check function
|
|
"""
|
|
self._checks[name] = check
|
|
logger.debug(f"Registered health check: {name}")
|
|
|
|
def unregister(self, name: str) -> bool:
|
|
"""
|
|
Unregister a health check.
|
|
|
|
Args:
|
|
name: Name of the health check to remove
|
|
|
|
Returns:
|
|
True if check was found and removed
|
|
"""
|
|
if name in self._checks:
|
|
del self._checks[name]
|
|
return True
|
|
return False
|
|
|
|
def run_all(self) -> AggregatedHealth:
|
|
"""
|
|
Run all registered health checks and aggregate results.
|
|
|
|
Returns:
|
|
Aggregated health status
|
|
"""
|
|
results: list[HealthCheckResult] = []
|
|
overall_status = HealthStatus.HEALTHY
|
|
|
|
for name, check_func in self._checks.items():
|
|
start = time.perf_counter()
|
|
try:
|
|
result = check_func()
|
|
result.latency_ms = (time.perf_counter() - start) * 1000
|
|
except Exception as e:
|
|
result = HealthCheckResult(
|
|
name=name,
|
|
status=HealthStatus.UNHEALTHY,
|
|
message=f"Check failed: {e}",
|
|
latency_ms=(time.perf_counter() - start) * 1000,
|
|
)
|
|
logger.exception(f"Health check {name} failed")
|
|
|
|
results.append(result)
|
|
|
|
# Determine overall status (worst wins)
|
|
if result.status == HealthStatus.UNHEALTHY:
|
|
overall_status = HealthStatus.UNHEALTHY
|
|
elif result.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY:
|
|
overall_status = HealthStatus.DEGRADED
|
|
|
|
return AggregatedHealth(status=overall_status, checks=results)
|
|
|
|
def get_check_names(self) -> list[str]:
|
|
"""Get names of all registered checks."""
|
|
return list(self._checks.keys())
|
|
|
|
|
|
# Global health check registry
|
|
health_registry = HealthCheckRegistry()
|
|
|
|
|
|
# =============================================================================
|
|
# Prometheus Metrics (Placeholder)
|
|
# =============================================================================
|
|
|
|
|
|
class MetricsRegistry:
|
|
"""
|
|
Prometheus metrics registry.
|
|
|
|
This is a placeholder implementation. For production, integrate with
|
|
prometheus_client library.
|
|
|
|
Example:
|
|
from prometheus_client import Counter, Histogram
|
|
|
|
request_count = metrics_registry.counter(
|
|
"http_requests_total",
|
|
"Total HTTP requests",
|
|
["method", "endpoint", "status"]
|
|
)
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._metrics: dict[str, Any] = {}
|
|
self._enabled = False
|
|
|
|
def enable(self) -> None:
|
|
"""Enable metrics collection."""
|
|
self._enabled = True
|
|
logger.info("Prometheus metrics enabled")
|
|
|
|
def disable(self) -> None:
|
|
"""Disable metrics collection."""
|
|
self._enabled = False
|
|
|
|
def counter(
|
|
self,
|
|
name: str,
|
|
description: str,
|
|
labels: list[str] | None = None,
|
|
) -> Any:
|
|
"""
|
|
Create a counter metric.
|
|
|
|
Args:
|
|
name: Metric name
|
|
description: Metric description
|
|
labels: Label names
|
|
|
|
Returns:
|
|
Counter metric (or placeholder if prometheus_client not installed)
|
|
"""
|
|
if not self._enabled:
|
|
return _DummyMetric()
|
|
|
|
try:
|
|
from prometheus_client import Counter
|
|
|
|
metric = Counter(name, description, labels or [])
|
|
self._metrics[name] = metric
|
|
return metric
|
|
except ImportError:
|
|
return _DummyMetric()
|
|
|
|
def histogram(
|
|
self,
|
|
name: str,
|
|
description: str,
|
|
labels: list[str] | None = None,
|
|
buckets: list[float] | None = None,
|
|
) -> Any:
|
|
"""
|
|
Create a histogram metric.
|
|
|
|
Args:
|
|
name: Metric name
|
|
description: Metric description
|
|
labels: Label names
|
|
buckets: Histogram buckets
|
|
|
|
Returns:
|
|
Histogram metric (or placeholder)
|
|
"""
|
|
if not self._enabled:
|
|
return _DummyMetric()
|
|
|
|
try:
|
|
from prometheus_client import Histogram
|
|
|
|
kwargs: dict[str, Any] = {}
|
|
if buckets:
|
|
kwargs["buckets"] = buckets
|
|
|
|
metric = Histogram(name, description, labels or [], **kwargs)
|
|
self._metrics[name] = metric
|
|
return metric
|
|
except ImportError:
|
|
return _DummyMetric()
|
|
|
|
def gauge(
|
|
self,
|
|
name: str,
|
|
description: str,
|
|
labels: list[str] | None = None,
|
|
) -> Any:
|
|
"""
|
|
Create a gauge metric.
|
|
|
|
Args:
|
|
name: Metric name
|
|
description: Metric description
|
|
labels: Label names
|
|
|
|
Returns:
|
|
Gauge metric (or placeholder)
|
|
"""
|
|
if not self._enabled:
|
|
return _DummyMetric()
|
|
|
|
try:
|
|
from prometheus_client import Gauge
|
|
|
|
metric = Gauge(name, description, labels or [])
|
|
self._metrics[name] = metric
|
|
return metric
|
|
except ImportError:
|
|
return _DummyMetric()
|
|
|
|
def generate_latest(self) -> bytes:
|
|
"""
|
|
Generate Prometheus metrics output.
|
|
|
|
Returns:
|
|
Prometheus metrics in text format
|
|
"""
|
|
if not self._enabled:
|
|
return b"# Metrics not enabled\n"
|
|
|
|
try:
|
|
from prometheus_client import generate_latest
|
|
|
|
return generate_latest()
|
|
except ImportError:
|
|
return b"# prometheus_client not installed\n"
|
|
|
|
|
|
class _DummyMetric:
|
|
"""Placeholder metric when prometheus_client is not available."""
|
|
|
|
def labels(self, *args: Any, **kwargs: Any) -> "_DummyMetric":
|
|
return self
|
|
|
|
def inc(self, amount: float = 1) -> None:
|
|
pass
|
|
|
|
def dec(self, amount: float = 1) -> None:
|
|
pass
|
|
|
|
def set(self, value: float) -> None:
|
|
pass
|
|
|
|
def observe(self, amount: float) -> None:
|
|
pass
|
|
|
|
|
|
# Global metrics registry
|
|
metrics_registry = MetricsRegistry()
|
|
|
|
|
|
# =============================================================================
|
|
# Sentry Integration (Placeholder)
|
|
# =============================================================================
|
|
|
|
|
|
class SentryIntegration:
|
|
"""
|
|
Sentry error tracking integration.
|
|
|
|
This is a placeholder. For production, configure with actual Sentry DSN.
|
|
|
|
Example:
|
|
sentry.init(dsn="https://key@sentry.io/project")
|
|
sentry.capture_exception(error)
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._initialized = False
|
|
self._dsn: str | None = None
|
|
|
|
def init(
|
|
self,
|
|
dsn: str | None = None,
|
|
environment: str = "development",
|
|
release: str | None = None,
|
|
) -> None:
|
|
"""
|
|
Initialize Sentry SDK.
|
|
|
|
Args:
|
|
dsn: Sentry DSN
|
|
environment: Environment name
|
|
release: Release version
|
|
"""
|
|
if not dsn:
|
|
logger.info("Sentry DSN not provided, error tracking disabled")
|
|
return
|
|
|
|
try:
|
|
import sentry_sdk
|
|
|
|
sentry_sdk.init(
|
|
dsn=dsn,
|
|
environment=environment,
|
|
release=release,
|
|
traces_sample_rate=0.1,
|
|
)
|
|
self._initialized = True
|
|
self._dsn = dsn
|
|
logger.info(f"Sentry initialized for environment: {environment}")
|
|
except ImportError:
|
|
logger.warning("sentry_sdk not installed, error tracking disabled")
|
|
|
|
def capture_exception(self, error: Exception) -> str | None:
|
|
"""
|
|
Capture an exception.
|
|
|
|
Args:
|
|
error: Exception to capture
|
|
|
|
Returns:
|
|
Event ID if captured, None otherwise
|
|
"""
|
|
if not self._initialized:
|
|
return None
|
|
|
|
try:
|
|
import sentry_sdk
|
|
|
|
return sentry_sdk.capture_exception(error)
|
|
except ImportError:
|
|
return None
|
|
|
|
def capture_message(self, message: str, level: str = "info") -> str | None:
|
|
"""
|
|
Capture a message.
|
|
|
|
Args:
|
|
message: Message to capture
|
|
level: Log level
|
|
|
|
Returns:
|
|
Event ID if captured, None otherwise
|
|
"""
|
|
if not self._initialized:
|
|
return None
|
|
|
|
try:
|
|
import sentry_sdk
|
|
|
|
return sentry_sdk.capture_message(message, level=level)
|
|
except ImportError:
|
|
return None
|
|
|
|
|
|
# Global Sentry instance
|
|
sentry = SentryIntegration()
|
|
|
|
|
|
# =============================================================================
|
|
# External Tool URLs
|
|
# =============================================================================
|
|
|
|
|
|
@dataclass
|
|
class ExternalToolConfig:
|
|
"""Configuration for external monitoring tools."""
|
|
|
|
flower_url: str | None = None
|
|
grafana_url: str | None = None
|
|
prometheus_url: str | None = None
|
|
|
|
def to_dict(self) -> dict[str, str | None]:
|
|
return {
|
|
"flower": self.flower_url,
|
|
"grafana": self.grafana_url,
|
|
"prometheus": self.prometheus_url,
|
|
}
|
|
|
|
|
|
# Global external tool config
|
|
external_tools = ExternalToolConfig()
|
|
|
|
|
|
# =============================================================================
|
|
# Health API Router
|
|
# =============================================================================
|
|
|
|
health_router = APIRouter(tags=["Health"])
|
|
|
|
|
|
@health_router.get("/health/live")
|
|
async def liveness_check() -> dict[str, str]:
|
|
"""
|
|
Kubernetes liveness probe endpoint.
|
|
|
|
Returns 200 if the application is running.
|
|
"""
|
|
return {"status": "alive"}
|
|
|
|
|
|
@health_router.get("/health/ready")
|
|
async def readiness_check() -> dict[str, Any]:
|
|
"""
|
|
Kubernetes readiness probe endpoint.
|
|
|
|
Returns 200 if the application is ready to serve traffic.
|
|
Includes individual check details with name, status, and latency.
|
|
"""
|
|
result = health_registry.run_all()
|
|
return result.to_dict()
|
|
|
|
|
|
@health_router.get("/metrics")
|
|
async def metrics_endpoint() -> Response:
|
|
"""
|
|
Prometheus metrics endpoint.
|
|
|
|
Returns metrics in Prometheus text format for scraping.
|
|
"""
|
|
content = metrics_registry.generate_latest()
|
|
return Response(
|
|
content=content,
|
|
media_type="text/plain; charset=utf-8",
|
|
)
|
|
|
|
|
|
@health_router.get("/health/tools")
|
|
async def external_tools_endpoint() -> dict[str, str | None]:
|
|
"""
|
|
Get URLs for external monitoring tools.
|
|
|
|
Returns links to Flower, Grafana, etc.
|
|
"""
|
|
return external_tools.to_dict()
|
|
|
|
|
|
# =============================================================================
|
|
# Initialization Functions
|
|
# =============================================================================
|
|
|
|
|
|
def _register_infrastructure_health_checks() -> None:
|
|
"""Register health checks for core infrastructure (PostgreSQL, Redis)."""
|
|
from .config import settings
|
|
|
|
@health_registry.register("database")
|
|
def check_database() -> HealthCheckResult:
|
|
try:
|
|
from .database import engine
|
|
|
|
with engine.connect() as conn:
|
|
from sqlalchemy import text
|
|
|
|
conn.execute(text("SELECT 1"))
|
|
return HealthCheckResult(name="database", status=HealthStatus.HEALTHY)
|
|
except Exception as e:
|
|
return HealthCheckResult(
|
|
name="database",
|
|
status=HealthStatus.UNHEALTHY,
|
|
message=str(e),
|
|
)
|
|
|
|
@health_registry.register("redis")
|
|
def check_redis() -> HealthCheckResult:
|
|
try:
|
|
import redis
|
|
|
|
r = redis.from_url(settings.redis_url, socket_connect_timeout=2)
|
|
r.ping()
|
|
r.close()
|
|
return HealthCheckResult(name="redis", status=HealthStatus.HEALTHY)
|
|
except Exception as e:
|
|
return HealthCheckResult(
|
|
name="redis",
|
|
status=HealthStatus.UNHEALTHY,
|
|
message=str(e),
|
|
)
|
|
|
|
|
|
def init_observability(
|
|
enable_metrics: bool = False,
|
|
sentry_dsn: str | None = None,
|
|
environment: str = "development",
|
|
flower_url: str | None = None,
|
|
grafana_url: str | None = None,
|
|
) -> None:
|
|
"""
|
|
Initialize observability stack.
|
|
|
|
Args:
|
|
enable_metrics: Whether to enable Prometheus metrics
|
|
sentry_dsn: Sentry DSN for error tracking
|
|
environment: Environment name (development, staging, production)
|
|
flower_url: URL to Flower dashboard
|
|
grafana_url: URL to Grafana dashboards
|
|
"""
|
|
logger.info("Initializing observability stack...")
|
|
|
|
# Register infrastructure health checks
|
|
_register_infrastructure_health_checks()
|
|
|
|
# Enable metrics if requested
|
|
if enable_metrics:
|
|
metrics_registry.enable()
|
|
|
|
# Initialize Sentry
|
|
if sentry_dsn:
|
|
sentry.init(dsn=sentry_dsn, environment=environment)
|
|
|
|
# Configure external tools
|
|
external_tools.flower_url = flower_url
|
|
external_tools.grafana_url = grafana_url
|
|
|
|
logger.info("Observability stack initialized")
|
|
|
|
|
|
def shutdown_observability() -> None:
|
|
"""Shutdown observability stack."""
|
|
logger.info("Shutting down observability stack...")
|
|
metrics_registry.disable()
|
|
|
|
|
|
def register_module_health_checks() -> None:
|
|
"""
|
|
Register health checks from all modules.
|
|
|
|
This should be called after all modules are loaded.
|
|
"""
|
|
from app.modules.registry import MODULES
|
|
|
|
for module in MODULES.values():
|
|
if module.health_check:
|
|
health_registry.register_check(
|
|
f"module:{module.code}",
|
|
lambda m=module: HealthCheckResult(
|
|
name=f"module:{m.code}",
|
|
**m.get_health_status(),
|
|
),
|
|
)
|
|
logger.debug(f"Registered health check for module {module.code}")
|
|
|
|
|
|
__all__ = [
|
|
# Health checks
|
|
"HealthStatus",
|
|
"HealthCheckResult",
|
|
"AggregatedHealth",
|
|
"HealthCheckRegistry",
|
|
"health_registry",
|
|
# Metrics
|
|
"MetricsRegistry",
|
|
"metrics_registry",
|
|
# Sentry
|
|
"SentryIntegration",
|
|
"sentry",
|
|
# External tools
|
|
"ExternalToolConfig",
|
|
"external_tools",
|
|
# Router
|
|
"health_router",
|
|
# Initialization
|
|
"init_observability",
|
|
"shutdown_observability",
|
|
"register_module_health_checks",
|
|
]
|