# app/modules/contracts/audit.py """ Audit provider protocol for cross-module audit logging. This module defines the protocol that modules implement to provide audit logging. The core module's AuditAggregator discovers and uses providers from enabled modules. Benefits: - Audit logging is optional (monitoring module can be disabled) - Each module can have its own audit implementation - Core doesn't depend on monitoring module - Easy to add different audit backends (file, database, external service) Usage: # 1. Implement the protocol in your module class DatabaseAuditProvider: @property def audit_backend(self) -> str: return "database" def log_action(self, db, event: AuditEvent) -> bool: # Log to database ... # 2. Register in module definition def _get_audit_provider(): from app.modules.monitoring.services.audit_provider import audit_provider return audit_provider monitoring_module = ModuleDefinition( code="monitoring", audit_provider=_get_audit_provider, # ... ) # 3. Use via aggregator in core from app.modules.core.services.audit_aggregator import audit_aggregator audit_aggregator.log_action( db=db, event=AuditEvent( admin_user_id=123, action="create_store", target_type="store", target_id="456", ) ) """ from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable if TYPE_CHECKING: from sqlalchemy.orm import Session @dataclass class AuditEvent: """ Standard audit event data. This is the unit of data passed to audit providers. It contains all information needed to log an admin action. Attributes: admin_user_id: ID of the admin performing the action action: Action performed (e.g., "create_store", "update_setting") target_type: Type of target (e.g., "store", "user", "setting") target_id: ID of the target entity (as string) details: Additional context about the action ip_address: IP address of the admin (optional) user_agent: User agent string (optional) request_id: Request ID for correlation (optional) Example: AuditEvent( admin_user_id=1, action="create_setting", target_type="setting", target_id="max_stores", details={"category": "system", "value_type": "integer"}, ) """ admin_user_id: int action: str target_type: str target_id: str details: dict[str, Any] | None = None ip_address: str | None = None user_agent: str | None = None request_id: str | None = None @runtime_checkable class AuditProviderProtocol(Protocol): """ Protocol for modules that provide audit logging. Each module can implement this to expose its own audit backend. The core module's AuditAggregator discovers and uses providers. Implementation Notes: - Providers should be fault-tolerant (don't break operations on failure) - Return True on success, False on failure - Log errors internally but don't raise exceptions - Be mindful of performance (audit logging should be fast) Example Implementation: class DatabaseAuditProvider: @property def audit_backend(self) -> str: return "database" def log_action(self, db: Session, event: AuditEvent) -> bool: try: from app.modules.tenancy.models import AdminAuditLog audit_log = AdminAuditLog( admin_user_id=event.admin_user_id, action=event.action, target_type=event.target_type, target_id=event.target_id, details=event.details or {}, ip_address=event.ip_address, user_agent=event.user_agent, request_id=event.request_id, ) db.add(audit_log) db.flush() return True except Exception: return False """ @property def audit_backend(self) -> str: """ Backend name for this provider. Should be a short, lowercase identifier for the audit backend. Examples: "database", "file", "elasticsearch", "cloudwatch" Returns: Backend string used for identification """ ... def log_action(self, db: "Session", event: AuditEvent) -> bool: """ Log an audit event. Called by the audit aggregator to record admin actions. Should be fault-tolerant - don't raise exceptions, just return False. Args: db: Database session (may be needed for database backends) event: The audit event to log Returns: True if logged successfully, False otherwise """ ... __all__ = [ "AuditEvent", "AuditProviderProtocol", ]