Files
orion/app/services/admin_audit_service.py

250 lines
7.7 KiB
Python

# app/services/admin_audit_service.py
"""
Admin audit service for tracking admin actions.
This module provides functions for:
- Logging admin actions
- Querying audit logs
- Generating audit reports
"""
import logging
from datetime import datetime, timezone
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from models.database.admin import AdminAuditLog
from models.database.user import User
from models.schema.admin import AdminAuditLogFilters, AdminAuditLogResponse
from app.exceptions import AdminOperationException
logger = logging.getLogger(__name__)
class AdminAuditService:
"""Service for admin audit logging."""
def log_action(
self,
db: Session,
admin_user_id: int,
action: str,
target_type: str,
target_id: str,
details: Optional[Dict[str, Any]] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
request_id: Optional[str] = None
) -> AdminAuditLog:
"""
Log an admin action to the audit trail.
Args:
admin_user_id: ID of the admin performing the action
action: Action performed (e.g., 'create_vendor', 'delete_user')
target_type: Type of target (e.g., 'vendor', 'user')
target_id: ID of the target entity
details: Additional context about the action
ip_address: IP address of the admin
user_agent: User agent string
request_id: Request ID for correlation
Returns:
Created AdminAuditLog instance
"""
try:
audit_log = AdminAuditLog(
admin_user_id=admin_user_id,
action=action,
target_type=target_type,
target_id=str(target_id),
details=details or {},
ip_address=ip_address,
user_agent=user_agent,
request_id=request_id
)
db.add(audit_log)
db.commit()
db.refresh(audit_log)
logger.info(
f"Admin action logged: {action} on {target_type}:{target_id} "
f"by admin {admin_user_id}"
)
return audit_log
except Exception as e:
db.rollback()
logger.error(f"Failed to log admin action: {str(e)}")
# Don't raise exception - audit logging should not break operations
return None
def get_audit_logs(
self,
db: Session,
filters: AdminAuditLogFilters
) -> List[AdminAuditLogResponse]:
"""
Get filtered admin audit logs with pagination.
Args:
filters: Filter criteria for audit logs
Returns:
List of audit log responses
"""
try:
query = db.query(AdminAuditLog).join(User, AdminAuditLog.admin_user_id == User.id)
# Apply filters
conditions = []
if filters.admin_user_id:
conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id)
if filters.action:
conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%"))
if filters.target_type:
conditions.append(AdminAuditLog.target_type == filters.target_type)
if filters.date_from:
conditions.append(AdminAuditLog.created_at >= filters.date_from)
if filters.date_to:
conditions.append(AdminAuditLog.created_at <= filters.date_to)
if conditions:
query = query.filter(and_(*conditions))
# Execute query with pagination
logs = (
query
.order_by(AdminAuditLog.created_at.desc())
.offset(filters.skip)
.limit(filters.limit)
.all()
)
# Convert to response models
return [
AdminAuditLogResponse(
id=log.id,
admin_user_id=log.admin_user_id,
admin_username=log.admin_user.username if log.admin_user else None,
action=log.action,
target_type=log.target_type,
target_id=log.target_id,
details=log.details,
ip_address=log.ip_address,
user_agent=log.user_agent,
request_id=log.request_id,
created_at=log.created_at
)
for log in logs
]
except Exception as e:
logger.error(f"Failed to retrieve audit logs: {str(e)}")
raise AdminOperationException(
operation="get_audit_logs",
reason="Database query failed"
)
def get_audit_logs_count(
self,
db: Session,
filters: AdminAuditLogFilters
) -> int:
"""Get total count of audit logs matching filters."""
try:
query = db.query(AdminAuditLog)
# Apply same filters as get_audit_logs
conditions = []
if filters.admin_user_id:
conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id)
if filters.action:
conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%"))
if filters.target_type:
conditions.append(AdminAuditLog.target_type == filters.target_type)
if filters.date_from:
conditions.append(AdminAuditLog.created_at >= filters.date_from)
if filters.date_to:
conditions.append(AdminAuditLog.created_at <= filters.date_to)
if conditions:
query = query.filter(and_(*conditions))
return query.count()
except Exception as e:
logger.error(f"Failed to count audit logs: {str(e)}")
return 0
def get_recent_actions_by_admin(
self,
db: Session,
admin_user_id: int,
limit: int = 10
) -> List[AdminAuditLogResponse]:
"""Get recent actions by a specific admin."""
filters = AdminAuditLogFilters(
admin_user_id=admin_user_id,
limit=limit
)
return self.get_audit_logs(db, filters)
def get_actions_by_target(
self,
db: Session,
target_type: str,
target_id: str,
limit: int = 50
) -> List[AdminAuditLogResponse]:
"""Get all actions performed on a specific target."""
try:
logs = (
db.query(AdminAuditLog)
.filter(
and_(
AdminAuditLog.target_type == target_type,
AdminAuditLog.target_id == str(target_id)
)
)
.order_by(AdminAuditLog.created_at.desc())
.limit(limit)
.all()
)
return [
AdminAuditLogResponse(
id=log.id,
admin_user_id=log.admin_user_id,
admin_username=log.admin_user.username if log.admin_user else None,
action=log.action,
target_type=log.target_type,
target_id=log.target_id,
details=log.details,
ip_address=log.ip_address,
created_at=log.created_at
)
for log in logs
]
except Exception as e:
logger.error(f"Failed to get actions by target: {str(e)}")
return []
# Create service instance
admin_audit_service = AdminAuditService()