# app/modules/monitoring/services/admin_audit_service.py """ Admin audit service for tracking admin actions. This module provides functions for: - Logging admin actions - Querying audit logs - Generating audit reports """ from __future__ import annotations import logging from typing import Any from sqlalchemy import and_ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from app.modules.tenancy.exceptions import AdminOperationException from app.modules.tenancy.schemas.admin import ( AdminAuditLogFilters, AdminAuditLogResponse, ) logger = logging.getLogger(__name__) def _get_audit_log_model(): """Deferred import for AdminAuditLog model (lives in tenancy, consumed by monitoring).""" from app.modules.tenancy.models import AdminAuditLog return AdminAuditLog class AdminAuditService: """Service for admin audit logging.""" def log_action( self, db: Session, admin_user_id: int, action: str, target_type: str, target_id: str, details: dict[str, Any] | None = None, ip_address: str | None = None, user_agent: str | None = None, request_id: str | None = None, ) -> AdminAuditLog | None: """ Log an admin action to the audit trail. Args: db: Database session admin_user_id: ID of the admin performing the action action: Action performed (e.g., 'create_store', 'delete_user') target_type: Type of target (e.g., 'store', 'user') target_id: ID of the target entity details: Additional context about the action ip_address: IP address of the admin user_agent: User agent string request_id: Request ID for correlation Returns: Created AdminAuditLog instance """ AdminAuditLog = _get_audit_log_model() try: audit_log = AdminAuditLog( admin_user_id=admin_user_id, action=action, target_type=target_type, target_id=str(target_id), details=details or {}, ip_address=ip_address, user_agent=user_agent, request_id=request_id, ) db.add(audit_log) db.flush() db.refresh(audit_log) logger.info( f"Admin action logged: {action} on {target_type}:{target_id} " f"by admin {admin_user_id}" ) return audit_log except SQLAlchemyError as e: logger.error(f"Failed to log admin action: {str(e)}") # Don't raise exception - audit logging should not break operations return None def get_audit_logs( self, db: Session, filters: AdminAuditLogFilters ) -> list[AdminAuditLogResponse]: """ Get filtered admin audit logs with pagination. Args: db: Database session filters: Filter criteria for audit logs Returns: List of audit log responses """ AdminAuditLog = _get_audit_log_model() try: from sqlalchemy.orm import joinedload query = db.query(AdminAuditLog).options( joinedload(AdminAuditLog.admin_user) ) # Apply filters conditions = [] if filters.admin_user_id: conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id) if filters.action: conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%")) if filters.target_type: conditions.append(AdminAuditLog.target_type == filters.target_type) if filters.date_from: conditions.append(AdminAuditLog.created_at >= filters.date_from) if filters.date_to: conditions.append(AdminAuditLog.created_at <= filters.date_to) if conditions: query = query.filter(and_(*conditions)) # Execute query with pagination logs = ( query.order_by(AdminAuditLog.created_at.desc()) .offset(filters.skip) .limit(filters.limit) .all() ) # Convert to response models return [ AdminAuditLogResponse( id=log.id, admin_user_id=log.admin_user_id, admin_username=log.admin_user.username if log.admin_user else None, action=log.action, target_type=log.target_type, target_id=log.target_id, details=log.details, ip_address=log.ip_address, user_agent=log.user_agent, request_id=log.request_id, created_at=log.created_at, ) for log in logs ] except SQLAlchemyError as e: logger.error(f"Failed to retrieve audit logs: {str(e)}") raise AdminOperationException( operation="get_audit_logs", reason="Database query failed" ) def get_audit_logs_count(self, db: Session, filters: AdminAuditLogFilters) -> int: """Get total count of audit logs matching filters.""" AdminAuditLog = _get_audit_log_model() try: query = db.query(AdminAuditLog) # Apply same filters as get_audit_logs conditions = [] if filters.admin_user_id: conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id) if filters.action: conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%")) if filters.target_type: conditions.append(AdminAuditLog.target_type == filters.target_type) if filters.date_from: conditions.append(AdminAuditLog.created_at >= filters.date_from) if filters.date_to: conditions.append(AdminAuditLog.created_at <= filters.date_to) if conditions: query = query.filter(and_(*conditions)) return query.count() except SQLAlchemyError as e: logger.error(f"Failed to count audit logs: {str(e)}") return 0 def get_recent_actions_by_admin( self, db: Session, admin_user_id: int, limit: int = 10 ) -> list[AdminAuditLogResponse]: """Get recent actions by a specific admin.""" filters = AdminAuditLogFilters(admin_user_id=admin_user_id, limit=limit) return self.get_audit_logs(db, filters) def get_actions_by_target( self, db: Session, target_type: str, target_id: str, limit: int = 50 ) -> list[AdminAuditLogResponse]: """Get all actions performed on a specific target.""" AdminAuditLog = _get_audit_log_model() try: logs = ( db.query(AdminAuditLog) .filter( and_( AdminAuditLog.target_type == target_type, AdminAuditLog.target_id == str(target_id), ) ) .order_by(AdminAuditLog.created_at.desc()) .limit(limit) .all() ) return [ AdminAuditLogResponse( id=log.id, admin_user_id=log.admin_user_id, admin_username=log.admin_user.username if log.admin_user else None, action=log.action, target_type=log.target_type, target_id=log.target_id, details=log.details, ip_address=log.ip_address, created_at=log.created_at, ) for log in logs ] except SQLAlchemyError as e: logger.error(f"Failed to get actions by target: {str(e)}") return [] # Create service instance admin_audit_service = AdminAuditService()