- Change icon from 'envelope' to 'mail' (envelope not in icons.js)
- Add default query param to GET /admin/settings/{key} endpoint
- Return AdminSettingDefaultResponse instead of 404 when default provided
- Update loadShippingSettings() to use default param for carrier settings
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
591 lines
16 KiB
Python
591 lines
16 KiB
Python
# models/schema/admin.py
|
|
"""
|
|
Admin-specific Pydantic schemas for API validation and responses.
|
|
|
|
This module provides schemas for:
|
|
- Admin audit logs
|
|
- Admin notifications
|
|
- Platform settings
|
|
- Platform alerts
|
|
- Bulk operations
|
|
- System health checks
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
# ============================================================================
|
|
# ADMIN AUDIT LOG SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class AdminAuditLogResponse(BaseModel):
|
|
"""Response model for admin audit logs."""
|
|
|
|
id: int
|
|
admin_user_id: int
|
|
admin_username: str | None = None
|
|
action: str
|
|
target_type: str
|
|
target_id: str
|
|
details: dict[str, Any] | None = None
|
|
ip_address: str | None = None
|
|
user_agent: str | None = None
|
|
request_id: str | None = None
|
|
created_at: datetime
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class AdminAuditLogFilters(BaseModel):
|
|
"""Filters for querying audit logs."""
|
|
|
|
admin_user_id: int | None = None
|
|
action: str | None = None
|
|
target_type: str | None = None
|
|
date_from: datetime | None = None
|
|
date_to: datetime | None = None
|
|
skip: int = Field(0, ge=0)
|
|
limit: int = Field(100, ge=1, le=1000)
|
|
|
|
|
|
class AdminAuditLogListResponse(BaseModel):
|
|
"""Paginated list of audit logs."""
|
|
|
|
logs: list[AdminAuditLogResponse]
|
|
total: int
|
|
skip: int
|
|
limit: int
|
|
|
|
|
|
# ============================================================================
|
|
# ADMIN NOTIFICATION SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class AdminNotificationCreate(BaseModel):
|
|
"""Create admin notification."""
|
|
|
|
type: str = Field(..., max_length=50, description="Notification type")
|
|
priority: str = Field(default="normal", description="Priority level")
|
|
title: str = Field(..., max_length=200)
|
|
message: str = Field(..., description="Notification message")
|
|
action_required: bool = Field(default=False)
|
|
action_url: str | None = Field(None, max_length=500)
|
|
metadata: dict[str, Any] | None = None
|
|
|
|
@field_validator("priority")
|
|
@classmethod
|
|
def validate_priority(cls, v):
|
|
allowed = ["low", "normal", "high", "critical"]
|
|
if v not in allowed:
|
|
raise ValueError(f"Priority must be one of: {', '.join(allowed)}")
|
|
return v
|
|
|
|
|
|
class AdminNotificationResponse(BaseModel):
|
|
"""Admin notification response."""
|
|
|
|
id: int
|
|
type: str
|
|
priority: str
|
|
title: str
|
|
message: str
|
|
is_read: bool
|
|
read_at: datetime | None = None
|
|
read_by_user_id: int | None = None
|
|
action_required: bool
|
|
action_url: str | None = None
|
|
metadata: dict[str, Any] | None = None
|
|
created_at: datetime
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class AdminNotificationUpdate(BaseModel):
|
|
"""Mark notification as read."""
|
|
|
|
is_read: bool = True
|
|
|
|
|
|
class AdminNotificationListResponse(BaseModel):
|
|
"""Paginated list of notifications."""
|
|
|
|
notifications: list[AdminNotificationResponse]
|
|
total: int
|
|
unread_count: int
|
|
skip: int
|
|
limit: int
|
|
|
|
|
|
# ============================================================================
|
|
# ADMIN SETTINGS SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class AdminSettingCreate(BaseModel):
|
|
"""Create or update admin setting."""
|
|
|
|
key: str = Field(..., max_length=100, description="Unique setting key")
|
|
value: str = Field(..., description="Setting value")
|
|
value_type: str = Field(default="string", description="Data type")
|
|
category: str | None = Field(None, max_length=50)
|
|
description: str | None = None
|
|
is_encrypted: bool = Field(default=False)
|
|
is_public: bool = Field(default=False, description="Can be exposed to frontend")
|
|
|
|
@field_validator("value_type")
|
|
@classmethod
|
|
def validate_value_type(cls, v):
|
|
allowed = ["string", "integer", "boolean", "json", "float"]
|
|
if v not in allowed:
|
|
raise ValueError(f"Value type must be one of: {', '.join(allowed)}")
|
|
return v
|
|
|
|
@field_validator("key")
|
|
@classmethod
|
|
def validate_key_format(cls, v):
|
|
# Setting keys should be lowercase with underscores
|
|
if not v.replace("_", "").isalnum():
|
|
raise ValueError(
|
|
"Setting key must contain only letters, numbers, and underscores"
|
|
)
|
|
return v.lower()
|
|
|
|
|
|
class AdminSettingResponse(BaseModel):
|
|
"""Admin setting response."""
|
|
|
|
id: int
|
|
key: str
|
|
value: str
|
|
value_type: str
|
|
category: str | None = None
|
|
description: str | None = None
|
|
is_encrypted: bool
|
|
is_public: bool
|
|
last_modified_by_user_id: int | None = None
|
|
updated_at: datetime
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class AdminSettingDefaultResponse(BaseModel):
|
|
"""Response when returning a default value for non-existent setting."""
|
|
|
|
key: str
|
|
value: str
|
|
exists: bool = False
|
|
|
|
|
|
class AdminSettingUpdate(BaseModel):
|
|
"""Update admin setting value."""
|
|
|
|
value: str
|
|
description: str | None = None
|
|
|
|
|
|
class AdminSettingListResponse(BaseModel):
|
|
"""List of settings by category."""
|
|
|
|
settings: list[AdminSettingResponse]
|
|
total: int
|
|
category: str | None = None
|
|
|
|
|
|
# ============================================================================
|
|
# DISPLAY SETTINGS SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class RowsPerPageResponse(BaseModel):
|
|
"""Response for rows per page setting."""
|
|
|
|
rows_per_page: int
|
|
|
|
|
|
class RowsPerPageUpdateResponse(BaseModel):
|
|
"""Response after updating rows per page."""
|
|
|
|
rows_per_page: int
|
|
message: str
|
|
|
|
|
|
class PublicDisplaySettingsResponse(BaseModel):
|
|
"""Public display settings (no auth required)."""
|
|
|
|
rows_per_page: int
|
|
|
|
|
|
# ============================================================================
|
|
# PLATFORM ALERT SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class PlatformAlertCreate(BaseModel):
|
|
"""Create platform alert."""
|
|
|
|
alert_type: str = Field(..., max_length=50)
|
|
severity: str = Field(..., description="Alert severity")
|
|
title: str = Field(..., max_length=200)
|
|
description: str | None = None
|
|
affected_vendors: list[int] | None = None
|
|
affected_systems: list[str] | None = None
|
|
auto_generated: bool = Field(default=True)
|
|
|
|
@field_validator("severity")
|
|
@classmethod
|
|
def validate_severity(cls, v):
|
|
allowed = ["info", "warning", "error", "critical"]
|
|
if v not in allowed:
|
|
raise ValueError(f"Severity must be one of: {', '.join(allowed)}")
|
|
return v
|
|
|
|
@field_validator("alert_type")
|
|
@classmethod
|
|
def validate_alert_type(cls, v):
|
|
allowed = [
|
|
"security",
|
|
"performance",
|
|
"capacity",
|
|
"integration",
|
|
"database",
|
|
"system",
|
|
]
|
|
if v not in allowed:
|
|
raise ValueError(f"Alert type must be one of: {', '.join(allowed)}")
|
|
return v
|
|
|
|
|
|
class PlatformAlertResponse(BaseModel):
|
|
"""Platform alert response."""
|
|
|
|
id: int
|
|
alert_type: str
|
|
severity: str
|
|
title: str
|
|
description: str | None = None
|
|
affected_vendors: list[int] | None = None
|
|
affected_systems: list[str] | None = None
|
|
is_resolved: bool
|
|
resolved_at: datetime | None = None
|
|
resolved_by_user_id: int | None = None
|
|
resolution_notes: str | None = None
|
|
auto_generated: bool
|
|
occurrence_count: int
|
|
first_occurred_at: datetime
|
|
last_occurred_at: datetime
|
|
created_at: datetime
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class PlatformAlertResolve(BaseModel):
|
|
"""Resolve platform alert."""
|
|
|
|
is_resolved: bool = True
|
|
resolution_notes: str | None = None
|
|
|
|
|
|
class PlatformAlertListResponse(BaseModel):
|
|
"""Paginated list of platform alerts."""
|
|
|
|
alerts: list[PlatformAlertResponse]
|
|
total: int
|
|
active_count: int
|
|
critical_count: int
|
|
skip: int
|
|
limit: int
|
|
|
|
|
|
# ============================================================================
|
|
# BULK OPERATION SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class BulkVendorAction(BaseModel):
|
|
"""Bulk actions on vendors."""
|
|
|
|
vendor_ids: list[int] = Field(..., min_length=1, max_length=100)
|
|
action: str = Field(..., description="Action to perform")
|
|
confirm: bool = Field(default=False, description="Required for destructive actions")
|
|
reason: str | None = Field(None, description="Reason for bulk action")
|
|
|
|
@field_validator("action")
|
|
@classmethod
|
|
def validate_action(cls, v):
|
|
allowed = ["activate", "deactivate", "verify", "unverify", "delete"]
|
|
if v not in allowed:
|
|
raise ValueError(f"Action must be one of: {', '.join(allowed)}")
|
|
return v
|
|
|
|
|
|
class BulkVendorActionResponse(BaseModel):
|
|
"""Response for bulk vendor actions."""
|
|
|
|
successful: list[int]
|
|
failed: dict[int, str] # vendor_id -> error_message
|
|
total_processed: int
|
|
action_performed: str
|
|
message: str
|
|
|
|
|
|
class BulkUserAction(BaseModel):
|
|
"""Bulk actions on users."""
|
|
|
|
user_ids: list[int] = Field(..., min_length=1, max_length=100)
|
|
action: str = Field(..., description="Action to perform")
|
|
confirm: bool = Field(default=False)
|
|
reason: str | None = None
|
|
|
|
@field_validator("action")
|
|
@classmethod
|
|
def validate_action(cls, v):
|
|
allowed = ["activate", "deactivate", "delete"]
|
|
if v not in allowed:
|
|
raise ValueError(f"Action must be one of: {', '.join(allowed)}")
|
|
return v
|
|
|
|
|
|
class BulkUserActionResponse(BaseModel):
|
|
"""Response for bulk user actions."""
|
|
|
|
successful: list[int]
|
|
failed: dict[int, str]
|
|
total_processed: int
|
|
action_performed: str
|
|
message: str
|
|
|
|
|
|
# ============================================================================
|
|
# ADMIN DASHBOARD SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class AdminDashboardStats(BaseModel):
|
|
"""Comprehensive admin dashboard statistics."""
|
|
|
|
platform: dict[str, Any]
|
|
users: dict[str, Any]
|
|
vendors: dict[str, Any]
|
|
products: dict[str, Any]
|
|
orders: dict[str, Any]
|
|
imports: dict[str, Any]
|
|
recent_vendors: list[dict[str, Any]]
|
|
recent_imports: list[dict[str, Any]]
|
|
unread_notifications: int
|
|
active_alerts: int
|
|
critical_alerts: int
|
|
|
|
|
|
# ============================================================================
|
|
# SYSTEM HEALTH SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class ComponentHealthStatus(BaseModel):
|
|
"""Health status for a system component."""
|
|
|
|
status: str # healthy, degraded, unhealthy
|
|
response_time_ms: float | None = None
|
|
error_message: str | None = None
|
|
last_checked: datetime
|
|
details: dict[str, Any] | None = None
|
|
|
|
|
|
class SystemHealthResponse(BaseModel):
|
|
"""System health check response."""
|
|
|
|
overall_status: str # healthy, degraded, critical
|
|
database: ComponentHealthStatus
|
|
redis: ComponentHealthStatus
|
|
celery: ComponentHealthStatus
|
|
storage: ComponentHealthStatus
|
|
api_response_time_ms: float
|
|
uptime_seconds: int
|
|
timestamp: datetime
|
|
|
|
|
|
# ============================================================================
|
|
# ADMIN SESSION SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class AdminSessionResponse(BaseModel):
|
|
"""Admin session information."""
|
|
|
|
id: int
|
|
admin_user_id: int
|
|
admin_username: str | None = None
|
|
ip_address: str
|
|
user_agent: str | None = None
|
|
login_at: datetime
|
|
last_activity_at: datetime
|
|
logout_at: datetime | None = None
|
|
is_active: bool
|
|
logout_reason: str | None = None
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class AdminSessionListResponse(BaseModel):
|
|
"""List of admin sessions."""
|
|
|
|
sessions: list[AdminSessionResponse]
|
|
total: int
|
|
active_count: int
|
|
|
|
|
|
# ============================================================================
|
|
# APPLICATION LOGS SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class ApplicationLogResponse(BaseModel):
|
|
"""Application log entry response."""
|
|
|
|
id: int
|
|
timestamp: datetime
|
|
level: str
|
|
logger_name: str
|
|
module: str | None = None
|
|
function_name: str | None = None
|
|
line_number: int | None = None
|
|
message: str
|
|
exception_type: str | None = None
|
|
exception_message: str | None = None
|
|
stack_trace: str | None = None
|
|
request_id: str | None = None
|
|
user_id: int | None = None
|
|
vendor_id: int | None = None
|
|
context: dict[str, Any] | None = None
|
|
created_at: datetime
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class ApplicationLogFilters(BaseModel):
|
|
"""Filters for querying application logs."""
|
|
|
|
level: str | None = Field(None, description="Filter by log level")
|
|
logger_name: str | None = Field(None, description="Filter by logger name")
|
|
module: str | None = Field(None, description="Filter by module")
|
|
user_id: int | None = Field(None, description="Filter by user ID")
|
|
vendor_id: int | None = Field(None, description="Filter by vendor ID")
|
|
date_from: datetime | None = Field(None, description="Start date")
|
|
date_to: datetime | None = Field(None, description="End date")
|
|
search: str | None = Field(None, description="Search in message")
|
|
skip: int = Field(0, ge=0)
|
|
limit: int = Field(100, ge=1, le=1000)
|
|
|
|
|
|
class ApplicationLogListResponse(BaseModel):
|
|
"""Paginated list of application logs."""
|
|
|
|
logs: list[ApplicationLogResponse]
|
|
total: int
|
|
skip: int
|
|
limit: int
|
|
|
|
|
|
class LogStatistics(BaseModel):
|
|
"""Statistics about application logs."""
|
|
|
|
total_count: int
|
|
warning_count: int
|
|
error_count: int
|
|
critical_count: int
|
|
by_level: dict[str, int]
|
|
by_module: dict[str, int]
|
|
recent_errors: list[ApplicationLogResponse]
|
|
|
|
|
|
# ============================================================================
|
|
# LOG SETTINGS SCHEMAS
|
|
# ============================================================================
|
|
|
|
|
|
class LogSettingsResponse(BaseModel):
|
|
"""Log configuration settings."""
|
|
|
|
log_level: str
|
|
log_file_max_size_mb: int
|
|
log_file_backup_count: int
|
|
db_log_retention_days: int
|
|
file_logging_enabled: bool
|
|
db_logging_enabled: bool
|
|
|
|
|
|
class LogSettingsUpdate(BaseModel):
|
|
"""Update log settings."""
|
|
|
|
log_level: str | None = Field(
|
|
None, description="Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL"
|
|
)
|
|
log_file_max_size_mb: int | None = Field(
|
|
None, ge=1, le=1000, description="Max log file size in MB"
|
|
)
|
|
log_file_backup_count: int | None = Field(
|
|
None, ge=0, le=50, description="Number of backup files to keep"
|
|
)
|
|
db_log_retention_days: int | None = Field(
|
|
None, ge=1, le=365, description="Days to retain logs in database"
|
|
)
|
|
|
|
@field_validator("log_level")
|
|
@classmethod
|
|
def validate_log_level(cls, v):
|
|
if v is not None:
|
|
allowed = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
|
|
if v.upper() not in allowed:
|
|
raise ValueError(f"Log level must be one of: {', '.join(allowed)}")
|
|
return v.upper()
|
|
return v
|
|
|
|
|
|
class FileLogResponse(BaseModel):
|
|
"""File log content response."""
|
|
|
|
filename: str
|
|
size_bytes: int
|
|
last_modified: datetime
|
|
lines: list[str]
|
|
total_lines: int
|
|
|
|
|
|
class LogFileInfo(BaseModel):
|
|
"""Log file info for listing."""
|
|
|
|
filename: str
|
|
size_bytes: int
|
|
last_modified: datetime
|
|
|
|
|
|
class LogFileListResponse(BaseModel):
|
|
"""Response for listing log files."""
|
|
|
|
files: list[LogFileInfo]
|
|
|
|
|
|
class LogDeleteResponse(BaseModel):
|
|
"""Response for log deletion."""
|
|
|
|
message: str
|
|
|
|
|
|
class LogCleanupResponse(BaseModel):
|
|
"""Response for log cleanup operation."""
|
|
|
|
message: str
|
|
deleted_count: int
|
|
|
|
|
|
class LogSettingsUpdateResponse(BaseModel):
|
|
"""Response for log settings update."""
|
|
|
|
message: str
|
|
updated_fields: list[str]
|
|
note: str | None = None
|