Files
orion/models/schema/admin.py

366 lines
11 KiB
Python

# models/schema/admin.py
"""
Admin-specific Pydantic schemas for API validation and responses.
This module provides schemas for:
- Admin audit logs
- Admin notifications
- Platform settings
- Platform alerts
- Bulk operations
- System health checks
"""
from datetime import datetime
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field, field_validator
# ============================================================================
# ADMIN AUDIT LOG SCHEMAS
# ============================================================================
class AdminAuditLogResponse(BaseModel):
"""Response model for admin audit logs."""
id: int
admin_user_id: int
admin_username: Optional[str] = None
action: str
target_type: str
target_id: str
details: Optional[Dict[str, Any]] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
request_id: Optional[str] = None
created_at: datetime
model_config = {"from_attributes": True}
class AdminAuditLogFilters(BaseModel):
"""Filters for querying audit logs."""
admin_user_id: Optional[int] = None
action: Optional[str] = None
target_type: Optional[str] = None
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
skip: int = Field(0, ge=0)
limit: int = Field(100, ge=1, le=1000)
class AdminAuditLogListResponse(BaseModel):
"""Paginated list of audit logs."""
logs: List[AdminAuditLogResponse]
total: int
skip: int
limit: int
# ============================================================================
# ADMIN NOTIFICATION SCHEMAS
# ============================================================================
class AdminNotificationCreate(BaseModel):
"""Create admin notification."""
type: str = Field(..., max_length=50, description="Notification type")
priority: str = Field(default="normal", description="Priority level")
title: str = Field(..., max_length=200)
message: str = Field(..., description="Notification message")
action_required: bool = Field(default=False)
action_url: Optional[str] = Field(None, max_length=500)
metadata: Optional[Dict[str, Any]] = None
@field_validator('priority')
@classmethod
def validate_priority(cls, v):
allowed = ['low', 'normal', 'high', 'critical']
if v not in allowed:
raise ValueError(f"Priority must be one of: {', '.join(allowed)}")
return v
class AdminNotificationResponse(BaseModel):
"""Admin notification response."""
id: int
type: str
priority: str
title: str
message: str
is_read: bool
read_at: Optional[datetime] = None
read_by_user_id: Optional[int] = None
action_required: bool
action_url: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
created_at: datetime
model_config = {"from_attributes": True}
class AdminNotificationUpdate(BaseModel):
"""Mark notification as read."""
is_read: bool = True
class AdminNotificationListResponse(BaseModel):
"""Paginated list of notifications."""
notifications: List[AdminNotificationResponse]
total: int
unread_count: int
skip: int
limit: int
# ============================================================================
# ADMIN SETTINGS SCHEMAS
# ============================================================================
class AdminSettingCreate(BaseModel):
"""Create or update admin setting."""
key: str = Field(..., max_length=100, description="Unique setting key")
value: str = Field(..., description="Setting value")
value_type: str = Field(default="string", description="Data type")
category: Optional[str] = Field(None, max_length=50)
description: Optional[str] = None
is_encrypted: bool = Field(default=False)
is_public: bool = Field(default=False, description="Can be exposed to frontend")
@field_validator('value_type')
@classmethod
def validate_value_type(cls, v):
allowed = ['string', 'integer', 'boolean', 'json', 'float']
if v not in allowed:
raise ValueError(f"Value type must be one of: {', '.join(allowed)}")
return v
@field_validator('key')
@classmethod
def validate_key_format(cls, v):
# Setting keys should be lowercase with underscores
if not v.replace('_', '').isalnum():
raise ValueError("Setting key must contain only letters, numbers, and underscores")
return v.lower()
class AdminSettingResponse(BaseModel):
"""Admin setting response."""
id: int
key: str
value: str
value_type: str
category: Optional[str] = None
description: Optional[str] = None
is_encrypted: bool
is_public: bool
last_modified_by_user_id: Optional[int] = None
updated_at: datetime
model_config = {"from_attributes": True}
class AdminSettingUpdate(BaseModel):
"""Update admin setting value."""
value: str
description: Optional[str] = None
class AdminSettingListResponse(BaseModel):
"""List of settings by category."""
settings: List[AdminSettingResponse]
total: int
category: Optional[str] = None
# ============================================================================
# PLATFORM ALERT SCHEMAS
# ============================================================================
class PlatformAlertCreate(BaseModel):
"""Create platform alert."""
alert_type: str = Field(..., max_length=50)
severity: str = Field(..., description="Alert severity")
title: str = Field(..., max_length=200)
description: Optional[str] = None
affected_vendors: Optional[List[int]] = None
affected_systems: Optional[List[str]] = None
auto_generated: bool = Field(default=True)
@field_validator('severity')
@classmethod
def validate_severity(cls, v):
allowed = ['info', 'warning', 'error', 'critical']
if v not in allowed:
raise ValueError(f"Severity must be one of: {', '.join(allowed)}")
return v
@field_validator('alert_type')
@classmethod
def validate_alert_type(cls, v):
allowed = ['security', 'performance', 'capacity', 'integration', 'database', 'system']
if v not in allowed:
raise ValueError(f"Alert type must be one of: {', '.join(allowed)}")
return v
class PlatformAlertResponse(BaseModel):
"""Platform alert response."""
id: int
alert_type: str
severity: str
title: str
description: Optional[str] = None
affected_vendors: Optional[List[int]] = None
affected_systems: Optional[List[str]] = None
is_resolved: bool
resolved_at: Optional[datetime] = None
resolved_by_user_id: Optional[int] = None
resolution_notes: Optional[str] = None
auto_generated: bool
occurrence_count: int
first_occurred_at: datetime
last_occurred_at: datetime
created_at: datetime
model_config = {"from_attributes": True}
class PlatformAlertResolve(BaseModel):
"""Resolve platform alert."""
is_resolved: bool = True
resolution_notes: Optional[str] = None
class PlatformAlertListResponse(BaseModel):
"""Paginated list of platform alerts."""
alerts: List[PlatformAlertResponse]
total: int
active_count: int
critical_count: int
skip: int
limit: int
# ============================================================================
# BULK OPERATION SCHEMAS
# ============================================================================
class BulkVendorAction(BaseModel):
"""Bulk actions on vendors."""
vendor_ids: List[int] = Field(..., min_length=1, max_length=100)
action: str = Field(..., description="Action to perform")
confirm: bool = Field(default=False, description="Required for destructive actions")
reason: Optional[str] = Field(None, description="Reason for bulk action")
@field_validator('action')
@classmethod
def validate_action(cls, v):
allowed = ['activate', 'deactivate', 'verify', 'unverify', 'delete']
if v not in allowed:
raise ValueError(f"Action must be one of: {', '.join(allowed)}")
return v
class BulkVendorActionResponse(BaseModel):
"""Response for bulk vendor actions."""
successful: List[int]
failed: Dict[int, str] # vendor_id -> error_message
total_processed: int
action_performed: str
message: str
class BulkUserAction(BaseModel):
"""Bulk actions on users."""
user_ids: List[int] = Field(..., min_length=1, max_length=100)
action: str = Field(..., description="Action to perform")
confirm: bool = Field(default=False)
reason: Optional[str] = None
@field_validator('action')
@classmethod
def validate_action(cls, v):
allowed = ['activate', 'deactivate', 'delete']
if v not in allowed:
raise ValueError(f"Action must be one of: {', '.join(allowed)}")
return v
class BulkUserActionResponse(BaseModel):
"""Response for bulk user actions."""
successful: List[int]
failed: Dict[int, str]
total_processed: int
action_performed: str
message: str
# ============================================================================
# ADMIN DASHBOARD SCHEMAS
# ============================================================================
class AdminDashboardStats(BaseModel):
"""Comprehensive admin dashboard statistics."""
platform: Dict[str, Any]
users: Dict[str, Any]
vendors: Dict[str, Any]
products: Dict[str, Any]
orders: Dict[str, Any]
imports: Dict[str, Any]
recent_vendors: List[Dict[str, Any]]
recent_imports: List[Dict[str, Any]]
unread_notifications: int
active_alerts: int
critical_alerts: int
# ============================================================================
# SYSTEM HEALTH SCHEMAS
# ============================================================================
class ComponentHealthStatus(BaseModel):
"""Health status for a system component."""
status: str # healthy, degraded, unhealthy
response_time_ms: Optional[float] = None
error_message: Optional[str] = None
last_checked: datetime
details: Optional[Dict[str, Any]] = None
class SystemHealthResponse(BaseModel):
"""System health check response."""
overall_status: str # healthy, degraded, critical
database: ComponentHealthStatus
redis: ComponentHealthStatus
celery: ComponentHealthStatus
storage: ComponentHealthStatus
api_response_time_ms: float
uptime_seconds: int
timestamp: datetime
# ============================================================================
# ADMIN SESSION SCHEMAS
# ============================================================================
class AdminSessionResponse(BaseModel):
"""Admin session information."""
id: int
admin_user_id: int
admin_username: Optional[str] = None
ip_address: str
user_agent: Optional[str] = None
login_at: datetime
last_activity_at: datetime
logout_at: Optional[datetime] = None
is_active: bool
logout_reason: Optional[str] = None
model_config = {"from_attributes": True}
class AdminSessionListResponse(BaseModel):
"""List of admin sessions."""
sessions: List[AdminSessionResponse]
total: int
active_count: int