Files
orion/app/modules/monitoring/services/admin_audit_service.py
Samir Boulahtit 86e85a98b8
Some checks failed
CI / ruff (push) Successful in 9s
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / pytest (push) Has been cancelled
refactor(arch): eliminate all cross-module model imports in service layer
Enforce MOD-025/MOD-026 rules: zero top-level cross-module model imports
remain in any service file. All 66 files migrated using deferred import
patterns (method-body, _get_model() helpers, instance-cached self._Model)
and new cross-module service methods in tenancy. Documentation updated
with Pattern 6 (deferred imports), migration plan marked complete, and
violations status reflects 84→0 service-layer violations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 06:13:15 +01:00

252 lines
8.0 KiB
Python

# app/modules/monitoring/services/admin_audit_service.py
"""
Admin audit service for tracking admin actions.
This module provides functions for:
- Logging admin actions
- Querying audit logs
- Generating audit reports
"""
from __future__ import annotations
import logging
from typing import Any
from sqlalchemy import and_
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.modules.tenancy.exceptions import AdminOperationException
from app.modules.tenancy.schemas.admin import (
AdminAuditLogFilters,
AdminAuditLogResponse,
)
logger = logging.getLogger(__name__)
def _get_audit_log_model():
"""Deferred import for AdminAuditLog model (lives in tenancy, consumed by monitoring)."""
from app.modules.tenancy.models import AdminAuditLog
return AdminAuditLog
class AdminAuditService:
"""Service for admin audit logging."""
def log_action(
self,
db: Session,
admin_user_id: int,
action: str,
target_type: str,
target_id: str,
details: dict[str, Any] | None = None,
ip_address: str | None = None,
user_agent: str | None = None,
request_id: str | None = None,
) -> AdminAuditLog | None:
"""
Log an admin action to the audit trail.
Args:
db: Database session
admin_user_id: ID of the admin performing the action
action: Action performed (e.g., 'create_store', 'delete_user')
target_type: Type of target (e.g., 'store', 'user')
target_id: ID of the target entity
details: Additional context about the action
ip_address: IP address of the admin
user_agent: User agent string
request_id: Request ID for correlation
Returns:
Created AdminAuditLog instance
"""
AdminAuditLog = _get_audit_log_model()
try:
audit_log = AdminAuditLog(
admin_user_id=admin_user_id,
action=action,
target_type=target_type,
target_id=str(target_id),
details=details or {},
ip_address=ip_address,
user_agent=user_agent,
request_id=request_id,
)
db.add(audit_log)
db.flush()
db.refresh(audit_log)
logger.info(
f"Admin action logged: {action} on {target_type}:{target_id} "
f"by admin {admin_user_id}"
)
return audit_log
except SQLAlchemyError as e:
logger.error(f"Failed to log admin action: {str(e)}")
# Don't raise exception - audit logging should not break operations
return None
def get_audit_logs(
self, db: Session, filters: AdminAuditLogFilters
) -> list[AdminAuditLogResponse]:
"""
Get filtered admin audit logs with pagination.
Args:
db: Database session
filters: Filter criteria for audit logs
Returns:
List of audit log responses
"""
AdminAuditLog = _get_audit_log_model()
try:
from sqlalchemy.orm import joinedload
query = db.query(AdminAuditLog).options(
joinedload(AdminAuditLog.admin_user)
)
# Apply filters
conditions = []
if filters.admin_user_id:
conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id)
if filters.action:
conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%"))
if filters.target_type:
conditions.append(AdminAuditLog.target_type == filters.target_type)
if filters.date_from:
conditions.append(AdminAuditLog.created_at >= filters.date_from)
if filters.date_to:
conditions.append(AdminAuditLog.created_at <= filters.date_to)
if conditions:
query = query.filter(and_(*conditions))
# Execute query with pagination
logs = (
query.order_by(AdminAuditLog.created_at.desc())
.offset(filters.skip)
.limit(filters.limit)
.all()
)
# Convert to response models
return [
AdminAuditLogResponse(
id=log.id,
admin_user_id=log.admin_user_id,
admin_username=log.admin_user.username if log.admin_user else None,
action=log.action,
target_type=log.target_type,
target_id=log.target_id,
details=log.details,
ip_address=log.ip_address,
user_agent=log.user_agent,
request_id=log.request_id,
created_at=log.created_at,
)
for log in logs
]
except SQLAlchemyError as e:
logger.error(f"Failed to retrieve audit logs: {str(e)}")
raise AdminOperationException(
operation="get_audit_logs", reason="Database query failed"
)
def get_audit_logs_count(self, db: Session, filters: AdminAuditLogFilters) -> int:
"""Get total count of audit logs matching filters."""
AdminAuditLog = _get_audit_log_model()
try:
query = db.query(AdminAuditLog)
# Apply same filters as get_audit_logs
conditions = []
if filters.admin_user_id:
conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id)
if filters.action:
conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%"))
if filters.target_type:
conditions.append(AdminAuditLog.target_type == filters.target_type)
if filters.date_from:
conditions.append(AdminAuditLog.created_at >= filters.date_from)
if filters.date_to:
conditions.append(AdminAuditLog.created_at <= filters.date_to)
if conditions:
query = query.filter(and_(*conditions))
return query.count()
except SQLAlchemyError as e:
logger.error(f"Failed to count audit logs: {str(e)}")
return 0
def get_recent_actions_by_admin(
self, db: Session, admin_user_id: int, limit: int = 10
) -> list[AdminAuditLogResponse]:
"""Get recent actions by a specific admin."""
filters = AdminAuditLogFilters(admin_user_id=admin_user_id, limit=limit)
return self.get_audit_logs(db, filters)
def get_actions_by_target(
self, db: Session, target_type: str, target_id: str, limit: int = 50
) -> list[AdminAuditLogResponse]:
"""Get all actions performed on a specific target."""
AdminAuditLog = _get_audit_log_model()
try:
logs = (
db.query(AdminAuditLog)
.filter(
and_(
AdminAuditLog.target_type == target_type,
AdminAuditLog.target_id == str(target_id),
)
)
.order_by(AdminAuditLog.created_at.desc())
.limit(limit)
.all()
)
return [
AdminAuditLogResponse(
id=log.id,
admin_user_id=log.admin_user_id,
admin_username=log.admin_user.username if log.admin_user else None,
action=log.action,
target_type=log.target_type,
target_id=log.target_id,
details=log.details,
ip_address=log.ip_address,
created_at=log.created_at,
)
for log in logs
]
except SQLAlchemyError as e:
logger.error(f"Failed to get actions by target: {str(e)}")
return []
# Create service instance
admin_audit_service = AdminAuditService()