- Auto-fixed 4,496 lint issues (import sorting, modern syntax, etc.) - Added ignore rules for patterns intentional in this codebase: E402 (late imports), E712 (SQLAlchemy filters), B904 (raise from), SIM108/SIM105/SIM117 (readability preferences) - Added per-file ignores for tests and scripts - Excluded broken scripts/rename_terminology.py (has curly quotes) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
237 lines
7.6 KiB
Python
237 lines
7.6 KiB
Python
# app/modules/monitoring/services/admin_audit_service.py
|
|
"""
|
|
Admin audit service for tracking admin actions.
|
|
|
|
This module provides functions for:
|
|
- Logging admin actions
|
|
- Querying audit logs
|
|
- Generating audit reports
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from sqlalchemy import and_
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.modules.tenancy.exceptions import AdminOperationException
|
|
from app.modules.tenancy.models import AdminAuditLog, User
|
|
from app.modules.tenancy.schemas.admin import (
|
|
AdminAuditLogFilters,
|
|
AdminAuditLogResponse,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AdminAuditService:
|
|
"""Service for admin audit logging."""
|
|
|
|
def log_action(
|
|
self,
|
|
db: Session,
|
|
admin_user_id: int,
|
|
action: str,
|
|
target_type: str,
|
|
target_id: str,
|
|
details: dict[str, Any] | None = None,
|
|
ip_address: str | None = None,
|
|
user_agent: str | None = None,
|
|
request_id: str | None = None,
|
|
) -> AdminAuditLog | None:
|
|
"""
|
|
Log an admin action to the audit trail.
|
|
|
|
Args:
|
|
db: Database session
|
|
admin_user_id: ID of the admin performing the action
|
|
action: Action performed (e.g., 'create_store', 'delete_user')
|
|
target_type: Type of target (e.g., 'store', 'user')
|
|
target_id: ID of the target entity
|
|
details: Additional context about the action
|
|
ip_address: IP address of the admin
|
|
user_agent: User agent string
|
|
request_id: Request ID for correlation
|
|
|
|
Returns:
|
|
Created AdminAuditLog instance
|
|
"""
|
|
try:
|
|
audit_log = AdminAuditLog(
|
|
admin_user_id=admin_user_id,
|
|
action=action,
|
|
target_type=target_type,
|
|
target_id=str(target_id),
|
|
details=details or {},
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
request_id=request_id,
|
|
)
|
|
|
|
db.add(audit_log)
|
|
db.flush()
|
|
db.refresh(audit_log)
|
|
|
|
logger.info(
|
|
f"Admin action logged: {action} on {target_type}:{target_id} "
|
|
f"by admin {admin_user_id}"
|
|
)
|
|
|
|
return audit_log
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to log admin action: {str(e)}")
|
|
# Don't raise exception - audit logging should not break operations
|
|
return None
|
|
|
|
def get_audit_logs(
|
|
self, db: Session, filters: AdminAuditLogFilters
|
|
) -> list[AdminAuditLogResponse]:
|
|
"""
|
|
Get filtered admin audit logs with pagination.
|
|
|
|
Args:
|
|
db: Database session
|
|
filters: Filter criteria for audit logs
|
|
|
|
Returns:
|
|
List of audit log responses
|
|
"""
|
|
try:
|
|
query = db.query(AdminAuditLog).join(
|
|
User, AdminAuditLog.admin_user_id == User.id
|
|
)
|
|
|
|
# Apply filters
|
|
conditions = []
|
|
|
|
if filters.admin_user_id:
|
|
conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id)
|
|
|
|
if filters.action:
|
|
conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%"))
|
|
|
|
if filters.target_type:
|
|
conditions.append(AdminAuditLog.target_type == filters.target_type)
|
|
|
|
if filters.date_from:
|
|
conditions.append(AdminAuditLog.created_at >= filters.date_from)
|
|
|
|
if filters.date_to:
|
|
conditions.append(AdminAuditLog.created_at <= filters.date_to)
|
|
|
|
if conditions:
|
|
query = query.filter(and_(*conditions))
|
|
|
|
# Execute query with pagination
|
|
logs = (
|
|
query.order_by(AdminAuditLog.created_at.desc())
|
|
.offset(filters.skip)
|
|
.limit(filters.limit)
|
|
.all()
|
|
)
|
|
|
|
# Convert to response models
|
|
return [
|
|
AdminAuditLogResponse(
|
|
id=log.id,
|
|
admin_user_id=log.admin_user_id,
|
|
admin_username=log.admin_user.username if log.admin_user else None,
|
|
action=log.action,
|
|
target_type=log.target_type,
|
|
target_id=log.target_id,
|
|
details=log.details,
|
|
ip_address=log.ip_address,
|
|
user_agent=log.user_agent,
|
|
request_id=log.request_id,
|
|
created_at=log.created_at,
|
|
)
|
|
for log in logs
|
|
]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve audit logs: {str(e)}")
|
|
raise AdminOperationException(
|
|
operation="get_audit_logs", reason="Database query failed"
|
|
)
|
|
|
|
def get_audit_logs_count(self, db: Session, filters: AdminAuditLogFilters) -> int:
|
|
"""Get total count of audit logs matching filters."""
|
|
try:
|
|
query = db.query(AdminAuditLog)
|
|
|
|
# Apply same filters as get_audit_logs
|
|
conditions = []
|
|
|
|
if filters.admin_user_id:
|
|
conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id)
|
|
|
|
if filters.action:
|
|
conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%"))
|
|
|
|
if filters.target_type:
|
|
conditions.append(AdminAuditLog.target_type == filters.target_type)
|
|
|
|
if filters.date_from:
|
|
conditions.append(AdminAuditLog.created_at >= filters.date_from)
|
|
|
|
if filters.date_to:
|
|
conditions.append(AdminAuditLog.created_at <= filters.date_to)
|
|
|
|
if conditions:
|
|
query = query.filter(and_(*conditions))
|
|
|
|
return query.count()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to count audit logs: {str(e)}")
|
|
return 0
|
|
|
|
def get_recent_actions_by_admin(
|
|
self, db: Session, admin_user_id: int, limit: int = 10
|
|
) -> list[AdminAuditLogResponse]:
|
|
"""Get recent actions by a specific admin."""
|
|
filters = AdminAuditLogFilters(admin_user_id=admin_user_id, limit=limit)
|
|
return self.get_audit_logs(db, filters)
|
|
|
|
def get_actions_by_target(
|
|
self, db: Session, target_type: str, target_id: str, limit: int = 50
|
|
) -> list[AdminAuditLogResponse]:
|
|
"""Get all actions performed on a specific target."""
|
|
try:
|
|
logs = (
|
|
db.query(AdminAuditLog)
|
|
.filter(
|
|
and_(
|
|
AdminAuditLog.target_type == target_type,
|
|
AdminAuditLog.target_id == str(target_id),
|
|
)
|
|
)
|
|
.order_by(AdminAuditLog.created_at.desc())
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
return [
|
|
AdminAuditLogResponse(
|
|
id=log.id,
|
|
admin_user_id=log.admin_user_id,
|
|
admin_username=log.admin_user.username if log.admin_user else None,
|
|
action=log.action,
|
|
target_type=log.target_type,
|
|
target_id=log.target_id,
|
|
details=log.details,
|
|
ip_address=log.ip_address,
|
|
created_at=log.created_at,
|
|
)
|
|
for log in logs
|
|
]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get actions by target: {str(e)}")
|
|
return []
|
|
|
|
|
|
# Create service instance
|
|
admin_audit_service = AdminAuditService()
|