# models/schema/admin.py """ Admin-specific Pydantic schemas for API validation and responses. This module provides schemas for: - Admin audit logs - Admin notifications - Platform settings - Platform alerts - Bulk operations - System health checks """ from datetime import datetime from typing import Optional, List, Dict, Any from pydantic import BaseModel, Field, field_validator # ============================================================================ # ADMIN AUDIT LOG SCHEMAS # ============================================================================ class AdminAuditLogResponse(BaseModel): """Response model for admin audit logs.""" id: int admin_user_id: int admin_username: Optional[str] = None action: str target_type: str target_id: str details: Optional[Dict[str, Any]] = None ip_address: Optional[str] = None user_agent: Optional[str] = None request_id: Optional[str] = None created_at: datetime model_config = {"from_attributes": True} class AdminAuditLogFilters(BaseModel): """Filters for querying audit logs.""" admin_user_id: Optional[int] = None action: Optional[str] = None target_type: Optional[str] = None date_from: Optional[datetime] = None date_to: Optional[datetime] = None skip: int = Field(0, ge=0) limit: int = Field(100, ge=1, le=1000) class AdminAuditLogListResponse(BaseModel): """Paginated list of audit logs.""" logs: List[AdminAuditLogResponse] total: int skip: int limit: int # ============================================================================ # ADMIN NOTIFICATION SCHEMAS # ============================================================================ class AdminNotificationCreate(BaseModel): """Create admin notification.""" type: str = Field(..., max_length=50, description="Notification type") priority: str = Field(default="normal", description="Priority level") title: str = Field(..., max_length=200) message: str = Field(..., description="Notification message") action_required: bool = Field(default=False) action_url: Optional[str] = Field(None, max_length=500) metadata: Optional[Dict[str, Any]] = None @field_validator('priority') @classmethod def validate_priority(cls, v): allowed = ['low', 'normal', 'high', 'critical'] if v not in allowed: raise ValueError(f"Priority must be one of: {', '.join(allowed)}") return v class AdminNotificationResponse(BaseModel): """Admin notification response.""" id: int type: str priority: str title: str message: str is_read: bool read_at: Optional[datetime] = None read_by_user_id: Optional[int] = None action_required: bool action_url: Optional[str] = None metadata: Optional[Dict[str, Any]] = None created_at: datetime model_config = {"from_attributes": True} class AdminNotificationUpdate(BaseModel): """Mark notification as read.""" is_read: bool = True class AdminNotificationListResponse(BaseModel): """Paginated list of notifications.""" notifications: List[AdminNotificationResponse] total: int unread_count: int skip: int limit: int # ============================================================================ # ADMIN SETTINGS SCHEMAS # ============================================================================ class AdminSettingCreate(BaseModel): """Create or update admin setting.""" key: str = Field(..., max_length=100, description="Unique setting key") value: str = Field(..., description="Setting value") value_type: str = Field(default="string", description="Data type") category: Optional[str] = Field(None, max_length=50) description: Optional[str] = None is_encrypted: bool = Field(default=False) is_public: bool = Field(default=False, description="Can be exposed to frontend") @field_validator('value_type') @classmethod def validate_value_type(cls, v): allowed = ['string', 'integer', 'boolean', 'json', 'float'] if v not in allowed: raise ValueError(f"Value type must be one of: {', '.join(allowed)}") return v @field_validator('key') @classmethod def validate_key_format(cls, v): # Setting keys should be lowercase with underscores if not v.replace('_', '').isalnum(): raise ValueError("Setting key must contain only letters, numbers, and underscores") return v.lower() class AdminSettingResponse(BaseModel): """Admin setting response.""" id: int key: str value: str value_type: str category: Optional[str] = None description: Optional[str] = None is_encrypted: bool is_public: bool last_modified_by_user_id: Optional[int] = None updated_at: datetime model_config = {"from_attributes": True} class AdminSettingUpdate(BaseModel): """Update admin setting value.""" value: str description: Optional[str] = None class AdminSettingListResponse(BaseModel): """List of settings by category.""" settings: List[AdminSettingResponse] total: int category: Optional[str] = None # ============================================================================ # PLATFORM ALERT SCHEMAS # ============================================================================ class PlatformAlertCreate(BaseModel): """Create platform alert.""" alert_type: str = Field(..., max_length=50) severity: str = Field(..., description="Alert severity") title: str = Field(..., max_length=200) description: Optional[str] = None affected_vendors: Optional[List[int]] = None affected_systems: Optional[List[str]] = None auto_generated: bool = Field(default=True) @field_validator('severity') @classmethod def validate_severity(cls, v): allowed = ['info', 'warning', 'error', 'critical'] if v not in allowed: raise ValueError(f"Severity must be one of: {', '.join(allowed)}") return v @field_validator('alert_type') @classmethod def validate_alert_type(cls, v): allowed = ['security', 'performance', 'capacity', 'integration', 'database', 'system'] if v not in allowed: raise ValueError(f"Alert type must be one of: {', '.join(allowed)}") return v class PlatformAlertResponse(BaseModel): """Platform alert response.""" id: int alert_type: str severity: str title: str description: Optional[str] = None affected_vendors: Optional[List[int]] = None affected_systems: Optional[List[str]] = None is_resolved: bool resolved_at: Optional[datetime] = None resolved_by_user_id: Optional[int] = None resolution_notes: Optional[str] = None auto_generated: bool occurrence_count: int first_occurred_at: datetime last_occurred_at: datetime created_at: datetime model_config = {"from_attributes": True} class PlatformAlertResolve(BaseModel): """Resolve platform alert.""" is_resolved: bool = True resolution_notes: Optional[str] = None class PlatformAlertListResponse(BaseModel): """Paginated list of platform alerts.""" alerts: List[PlatformAlertResponse] total: int active_count: int critical_count: int skip: int limit: int # ============================================================================ # BULK OPERATION SCHEMAS # ============================================================================ class BulkVendorAction(BaseModel): """Bulk actions on vendors.""" vendor_ids: List[int] = Field(..., min_length=1, max_length=100) action: str = Field(..., description="Action to perform") confirm: bool = Field(default=False, description="Required for destructive actions") reason: Optional[str] = Field(None, description="Reason for bulk action") @field_validator('action') @classmethod def validate_action(cls, v): allowed = ['activate', 'deactivate', 'verify', 'unverify', 'delete'] if v not in allowed: raise ValueError(f"Action must be one of: {', '.join(allowed)}") return v class BulkVendorActionResponse(BaseModel): """Response for bulk vendor actions.""" successful: List[int] failed: Dict[int, str] # vendor_id -> error_message total_processed: int action_performed: str message: str class BulkUserAction(BaseModel): """Bulk actions on users.""" user_ids: List[int] = Field(..., min_length=1, max_length=100) action: str = Field(..., description="Action to perform") confirm: bool = Field(default=False) reason: Optional[str] = None @field_validator('action') @classmethod def validate_action(cls, v): allowed = ['activate', 'deactivate', 'delete'] if v not in allowed: raise ValueError(f"Action must be one of: {', '.join(allowed)}") return v class BulkUserActionResponse(BaseModel): """Response for bulk user actions.""" successful: List[int] failed: Dict[int, str] total_processed: int action_performed: str message: str # ============================================================================ # ADMIN DASHBOARD SCHEMAS # ============================================================================ class AdminDashboardStats(BaseModel): """Comprehensive admin dashboard statistics.""" platform: Dict[str, Any] users: Dict[str, Any] vendors: Dict[str, Any] products: Dict[str, Any] orders: Dict[str, Any] imports: Dict[str, Any] recent_vendors: List[Dict[str, Any]] recent_imports: List[Dict[str, Any]] unread_notifications: int active_alerts: int critical_alerts: int # ============================================================================ # SYSTEM HEALTH SCHEMAS # ============================================================================ class ComponentHealthStatus(BaseModel): """Health status for a system component.""" status: str # healthy, degraded, unhealthy response_time_ms: Optional[float] = None error_message: Optional[str] = None last_checked: datetime details: Optional[Dict[str, Any]] = None class SystemHealthResponse(BaseModel): """System health check response.""" overall_status: str # healthy, degraded, critical database: ComponentHealthStatus redis: ComponentHealthStatus celery: ComponentHealthStatus storage: ComponentHealthStatus api_response_time_ms: float uptime_seconds: int timestamp: datetime # ============================================================================ # ADMIN SESSION SCHEMAS # ============================================================================ class AdminSessionResponse(BaseModel): """Admin session information.""" id: int admin_user_id: int admin_username: Optional[str] = None ip_address: str user_agent: Optional[str] = None login_at: datetime last_activity_at: datetime logout_at: Optional[datetime] = None is_active: bool logout_reason: Optional[str] = None model_config = {"from_attributes": True} class AdminSessionListResponse(BaseModel): """List of admin sessions.""" sessions: List[AdminSessionResponse] total: int active_count: int