Files
orion/app/modules/contracts/audit.py
Samir Boulahtit f20266167d
Some checks failed
CI / ruff (push) Failing after 7s
CI / pytest (push) Failing after 1s
CI / architecture (push) Failing after 9s
CI / dependency-scanning (push) Successful in 27s
CI / audit (push) Successful in 8s
CI / docs (push) Has been skipped
fix(lint): auto-fix ruff violations and tune lint rules
- Auto-fixed 4,496 lint issues (import sorting, modern syntax, etc.)
- Added ignore rules for patterns intentional in this codebase:
  E402 (late imports), E712 (SQLAlchemy filters), B904 (raise from),
  SIM108/SIM105/SIM117 (readability preferences)
- Added per-file ignores for tests and scripts
- Excluded broken scripts/rename_terminology.py (has curly quotes)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 23:10:42 +01:00

169 lines
5.1 KiB
Python

# app/modules/contracts/audit.py
"""
Audit provider protocol for cross-module audit logging.
This module defines the protocol that modules implement to provide audit logging.
The core module's AuditAggregator discovers and uses providers from enabled modules.
Benefits:
- Audit logging is optional (monitoring module can be disabled)
- Each module can have its own audit implementation
- Core doesn't depend on monitoring module
- Easy to add different audit backends (file, database, external service)
Usage:
# 1. Implement the protocol in your module
class DatabaseAuditProvider:
@property
def audit_backend(self) -> str:
return "database"
def log_action(self, db, event: AuditEvent) -> bool:
# Log to database
...
# 2. Register in module definition
def _get_audit_provider():
from app.modules.monitoring.services.audit_provider import audit_provider
return audit_provider
monitoring_module = ModuleDefinition(
code="monitoring",
audit_provider=_get_audit_provider,
# ...
)
# 3. Use via aggregator in core
from app.modules.core.services.audit_aggregator import audit_aggregator
audit_aggregator.log_action(
db=db,
event=AuditEvent(
admin_user_id=123,
action="create_store",
target_type="store",
target_id="456",
)
)
"""
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
if TYPE_CHECKING:
from sqlalchemy.orm import Session
@dataclass
class AuditEvent:
"""
Standard audit event data.
This is the unit of data passed to audit providers.
It contains all information needed to log an admin action.
Attributes:
admin_user_id: ID of the admin performing the action
action: Action performed (e.g., "create_store", "update_setting")
target_type: Type of target (e.g., "store", "user", "setting")
target_id: ID of the target entity (as string)
details: Additional context about the action
ip_address: IP address of the admin (optional)
user_agent: User agent string (optional)
request_id: Request ID for correlation (optional)
Example:
AuditEvent(
admin_user_id=1,
action="create_setting",
target_type="setting",
target_id="max_stores",
details={"category": "system", "value_type": "integer"},
)
"""
admin_user_id: int
action: str
target_type: str
target_id: str
details: dict[str, Any] | None = None
ip_address: str | None = None
user_agent: str | None = None
request_id: str | None = None
@runtime_checkable
class AuditProviderProtocol(Protocol):
"""
Protocol for modules that provide audit logging.
Each module can implement this to expose its own audit backend.
The core module's AuditAggregator discovers and uses providers.
Implementation Notes:
- Providers should be fault-tolerant (don't break operations on failure)
- Return True on success, False on failure
- Log errors internally but don't raise exceptions
- Be mindful of performance (audit logging should be fast)
Example Implementation:
class DatabaseAuditProvider:
@property
def audit_backend(self) -> str:
return "database"
def log_action(self, db: Session, event: AuditEvent) -> bool:
try:
from app.modules.tenancy.models import AdminAuditLog
audit_log = AdminAuditLog(
admin_user_id=event.admin_user_id,
action=event.action,
target_type=event.target_type,
target_id=event.target_id,
details=event.details or {},
ip_address=event.ip_address,
user_agent=event.user_agent,
request_id=event.request_id,
)
db.add(audit_log)
db.flush()
return True
except Exception:
return False
"""
@property
def audit_backend(self) -> str:
"""
Backend name for this provider.
Should be a short, lowercase identifier for the audit backend.
Examples: "database", "file", "elasticsearch", "cloudwatch"
Returns:
Backend string used for identification
"""
...
def log_action(self, db: "Session", event: AuditEvent) -> bool:
"""
Log an audit event.
Called by the audit aggregator to record admin actions.
Should be fault-tolerant - don't raise exceptions, just return False.
Args:
db: Database session (may be needed for database backends)
event: The audit event to log
Returns:
True if logged successfully, False otherwise
"""
...
__all__ = [
"AuditEvent",
"AuditProviderProtocol",
]