From 6db0187b54e67e9af53438897ed3e3e53f7ce191 Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Sun, 19 Oct 2025 16:04:44 +0200 Subject: [PATCH] Admin features (audit, log, settings) --- app/api/v1/admin/audit.py | 116 ++++++++ app/api/v1/admin/auth.py | 2 +- app/api/v1/admin/notifications.py | 151 ++++++++++ app/api/v1/admin/settings.py | 217 +++++++++++++++ app/services/admin_audit_service.py | 250 +++++++++++++++++ app/services/admin_service.py | 271 ++++++++++++++++-- app/services/admin_settings_service.py | 335 ++++++++++++++++++++++ models/database/admin.py | 161 +++++++++++ models/schema/admin.py | 366 ++++++++++++++++++++++++- 9 files changed, 1849 insertions(+), 20 deletions(-) create mode 100644 app/api/v1/admin/audit.py create mode 100644 app/api/v1/admin/notifications.py create mode 100644 app/api/v1/admin/settings.py create mode 100644 app/services/admin_audit_service.py create mode 100644 app/services/admin_settings_service.py diff --git a/app/api/v1/admin/audit.py b/app/api/v1/admin/audit.py new file mode 100644 index 00000000..a20c1a47 --- /dev/null +++ b/app/api/v1/admin/audit.py @@ -0,0 +1,116 @@ +# app/api/v1/admin/audit.py +""" +Admin audit log endpoints. + +Provides endpoints for: +- Viewing audit logs with filtering +- Tracking admin actions +- Generating audit reports +""" + +import logging +from typing import Optional +from datetime import datetime + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.api.deps import get_current_admin_user +from app.core.database import get_db +from app.services.admin_audit_service import admin_audit_service +from models.schema.admin import ( + AdminAuditLogResponse, + AdminAuditLogFilters, + AdminAuditLogListResponse +) +from models.database.user import User + +router = APIRouter(prefix="/audit") +logger = logging.getLogger(__name__) + + +@router.get("/logs", response_model=AdminAuditLogListResponse) +def get_audit_logs( + admin_user_id: Optional[int] = Query(None, description="Filter by admin user"), + action: Optional[str] = Query(None, description="Filter by action type"), + target_type: Optional[str] = Query(None, description="Filter by target type"), + date_from: Optional[datetime] = Query(None, description="Filter from date"), + date_to: Optional[datetime] = Query(None, description="Filter to date"), + skip: int = Query(0, ge=0, description="Number of records to skip"), + limit: int = Query(100, ge=1, le=1000, description="Maximum records to return"), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """ + Get filtered admin audit logs. + + Returns paginated list of all admin actions with filtering options. + Useful for compliance, security audits, and tracking admin activities. + """ + filters = AdminAuditLogFilters( + admin_user_id=admin_user_id, + action=action, + target_type=target_type, + date_from=date_from, + date_to=date_to, + skip=skip, + limit=limit + ) + + logs = admin_audit_service.get_audit_logs(db, filters) + total = admin_audit_service.get_audit_logs_count(db, filters) + + logger.info(f"Admin {current_admin.username} retrieved {len(logs)} audit logs") + + return AdminAuditLogListResponse( + logs=logs, + total=total, + skip=skip, + limit=limit + ) + + +@router.get("/logs/recent", response_model=list[AdminAuditLogResponse]) +def get_recent_audit_logs( + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Get recent audit logs (last 20 by default).""" + filters = AdminAuditLogFilters(limit=limit) + return admin_audit_service.get_audit_logs(db, filters) + + +@router.get("/logs/my-actions", response_model=list[AdminAuditLogResponse]) +def get_my_actions( + limit: int = Query(50, ge=1, le=100), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Get audit logs for current admin's actions.""" + return admin_audit_service.get_recent_actions_by_admin( + db=db, + admin_user_id=current_admin.id, + limit=limit + ) + + +@router.get("/logs/target/{target_type}/{target_id}") +def get_actions_by_target( + target_type: str, + target_id: str, + limit: int = Query(50, ge=1, le=100), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """ + Get all actions performed on a specific target. + + Useful for tracking the history of a specific vendor, user, or entity. + """ + return admin_audit_service.get_actions_by_target( + db=db, + target_type=target_type, + target_id=target_id, + limit=limit + ) diff --git a/app/api/v1/admin/auth.py b/app/api/v1/admin/auth.py index d2f06f35..d4f4a8b7 100644 --- a/app/api/v1/admin/auth.py +++ b/app/api/v1/admin/auth.py @@ -17,7 +17,7 @@ from app.services.auth_service import auth_service from app.exceptions import InvalidCredentialsException from models.schema.auth import LoginResponse, UserLogin -router = APIRouter() +router = APIRouter(prefix="/auth") logger = logging.getLogger(__name__) diff --git a/app/api/v1/admin/notifications.py b/app/api/v1/admin/notifications.py new file mode 100644 index 00000000..826244e0 --- /dev/null +++ b/app/api/v1/admin/notifications.py @@ -0,0 +1,151 @@ +# app/api/v1/admin/notifications.py +""" +Admin notifications and platform alerts endpoints. + +Provides endpoints for: +- Viewing admin notifications +- Managing platform alerts +- System health monitoring +""" + +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.api.deps import get_current_admin_user +from app.core.database import get_db +from models.schema.admin import ( + AdminNotificationCreate, + AdminNotificationResponse, + AdminNotificationListResponse, + PlatformAlertCreate, + PlatformAlertResponse, + PlatformAlertListResponse, + PlatformAlertResolve +) +from models.database.user import User + +router = APIRouter(prefix="/notifications") +logger = logging.getLogger(__name__) + + +# ============================================================================ +# ADMIN NOTIFICATIONS +# ============================================================================ + +@router.get("", response_model=AdminNotificationListResponse) +def get_notifications( + priority: Optional[str] = Query(None, description="Filter by priority"), + is_read: Optional[bool] = Query(None, description="Filter by read status"), + skip: int = Query(0, ge=0), + limit: int = Query(50, ge=1, le=100), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Get admin notifications with filtering.""" + # TODO: Implement notification service + return AdminNotificationListResponse( + notifications=[], + total=0, + unread_count=0, + skip=skip, + limit=limit + ) + + +@router.get("/unread-count") +def get_unread_count( + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Get count of unread notifications.""" + # TODO: Implement + return {"unread_count": 0} + + +@router.put("/{notification_id}/read") +def mark_as_read( + notification_id: int, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Mark notification as read.""" + # TODO: Implement + return {"message": "Notification marked as read"} + + +@router.put("/mark-all-read") +def mark_all_as_read( + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Mark all notifications as read.""" + # TODO: Implement + return {"message": "All notifications marked as read"} + + +# ============================================================================ +# PLATFORM ALERTS +# ============================================================================ + +@router.get("/alerts", response_model=PlatformAlertListResponse) +def get_platform_alerts( + severity: Optional[str] = Query(None, description="Filter by severity"), + is_resolved: Optional[bool] = Query(None, description="Filter by resolution status"), + skip: int = Query(0, ge=0), + limit: int = Query(50, ge=1, le=100), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Get platform alerts with filtering.""" + # TODO: Implement alert service + return PlatformAlertListResponse( + alerts=[], + total=0, + active_count=0, + critical_count=0, + skip=skip, + limit=limit + ) + + +@router.post("/alerts", response_model=PlatformAlertResponse) +def create_platform_alert( + alert_data: PlatformAlertCreate, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Create new platform alert (manual).""" + # TODO: Implement + logger.info(f"Admin {current_admin.username} created alert: {alert_data.title}") + return {} + + +@router.put("/alerts/{alert_id}/resolve") +def resolve_platform_alert( + alert_id: int, + resolve_data: PlatformAlertResolve, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Resolve platform alert.""" + # TODO: Implement + logger.info(f"Admin {current_admin.username} resolved alert {alert_id}") + return {"message": "Alert resolved successfully"} + + +@router.get("/alerts/stats") +def get_alert_statistics( + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Get alert statistics for dashboard.""" + # TODO: Implement + return { + "total_alerts": 0, + "active_alerts": 0, + "critical_alerts": 0, + "resolved_today": 0 + } diff --git a/app/api/v1/admin/settings.py b/app/api/v1/admin/settings.py new file mode 100644 index 00000000..6f03769d --- /dev/null +++ b/app/api/v1/admin/settings.py @@ -0,0 +1,217 @@ +# app/api/v1/admin/settings.py +""" +Platform settings management endpoints. + +Provides endpoints for: +- Viewing all platform settings +- Creating/updating settings +- Managing configuration by category +""" + +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.api.deps import get_current_admin_user +from app.core.database import get_db +from app.services.admin_settings_service import admin_settings_service +from app.services.admin_audit_service import admin_audit_service +from models.schema.admin import ( + AdminSettingCreate, + AdminSettingResponse, + AdminSettingUpdate, + AdminSettingListResponse +) +from models.database.user import User + +router = APIRouter(prefix="/settings") +logger = logging.getLogger(__name__) + + +@router.get("", response_model=AdminSettingListResponse) +def get_all_settings( + category: Optional[str] = Query(None, description="Filter by category"), + is_public: Optional[bool] = Query(None, description="Filter by public flag"), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """ + Get all platform settings. + + Can be filtered by category (system, security, marketplace, notifications) + and by public flag (settings that can be exposed to frontend). + """ + settings = admin_settings_service.get_all_settings(db, category, is_public) + + return AdminSettingListResponse( + settings=settings, + total=len(settings), + category=category + ) + + +@router.get("/categories") +def get_setting_categories( + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Get list of all setting categories.""" + # This could be enhanced to return counts per category + return { + "categories": [ + "system", + "security", + "marketplace", + "notifications", + "integrations", + "payments" + ] + } + + +@router.get("/{key}", response_model=AdminSettingResponse) +def get_setting( + key: str, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Get specific setting by key.""" + setting = admin_settings_service.get_setting_by_key(db, key) + + if not setting: + from fastapi import HTTPException + raise HTTPException(status_code=404, detail=f"Setting '{key}' not found") + + return AdminSettingResponse.model_validate(setting) + + +@router.post("", response_model=AdminSettingResponse) +def create_setting( + setting_data: AdminSettingCreate, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """ + Create new platform setting. + + Setting keys should be lowercase with underscores (e.g., max_vendors_allowed). + """ + result = admin_settings_service.create_setting( + db=db, + setting_data=setting_data, + admin_user_id=current_admin.id + ) + + # Log action + admin_audit_service.log_action( + db=db, + admin_user_id=current_admin.id, + action="create_setting", + target_type="setting", + target_id=setting_data.key, + details={"category": setting_data.category, "value_type": setting_data.value_type} + ) + + return result + + +@router.put("/{key}", response_model=AdminSettingResponse) +def update_setting( + key: str, + update_data: AdminSettingUpdate, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """Update existing setting value.""" + old_value = admin_settings_service.get_setting_value(db, key) + + result = admin_settings_service.update_setting( + db=db, + key=key, + update_data=update_data, + admin_user_id=current_admin.id + ) + + # Log action + admin_audit_service.log_action( + db=db, + admin_user_id=current_admin.id, + action="update_setting", + target_type="setting", + target_id=key, + details={"old_value": str(old_value), "new_value": update_data.value} + ) + + return result + + +@router.post("/upsert", response_model=AdminSettingResponse) +def upsert_setting( + setting_data: AdminSettingCreate, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """ + Create or update setting (upsert). + + If setting exists, updates its value. If not, creates new setting. + """ + result = admin_settings_service.upsert_setting( + db=db, + setting_data=setting_data, + admin_user_id=current_admin.id + ) + + # Log action + admin_audit_service.log_action( + db=db, + admin_user_id=current_admin.id, + action="upsert_setting", + target_type="setting", + target_id=setting_data.key, + details={"category": setting_data.category} + ) + + return result + + +@router.delete("/{key}") +def delete_setting( + key: str, + confirm: bool = Query(False, description="Must be true to confirm deletion"), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user), +): + """ + Delete platform setting. + + Requires confirmation parameter. + WARNING: Deleting settings may affect platform functionality. + """ + from fastapi import HTTPException + + if not confirm: + raise HTTPException( + status_code=400, + detail="Deletion requires confirmation parameter: confirm=true" + ) + + message = admin_settings_service.delete_setting( + db=db, + key=key, + admin_user_id=current_admin.id + ) + + # Log action + admin_audit_service.log_action( + db=db, + admin_user_id=current_admin.id, + action="delete_setting", + target_type="setting", + target_id=key, + details={} + ) + + return {"message": message} diff --git a/app/services/admin_audit_service.py b/app/services/admin_audit_service.py new file mode 100644 index 00000000..ee9da385 --- /dev/null +++ b/app/services/admin_audit_service.py @@ -0,0 +1,250 @@ +# app/services/admin_audit_service.py +""" +Admin audit service for tracking admin actions. + +This module provides functions for: +- Logging admin actions +- Querying audit logs +- Generating audit reports +""" + +import logging +from datetime import datetime, timezone +from typing import List, Optional, Dict, Any + +from sqlalchemy.orm import Session +from sqlalchemy import and_, or_ + +from models.database.admin import AdminAuditLog +from models.database.user import User +from models.schema.admin import AdminAuditLogFilters, AdminAuditLogResponse +from app.exceptions import AdminOperationException + +logger = logging.getLogger(__name__) + + +class AdminAuditService: + """Service for admin audit logging.""" + + def log_action( + self, + db: Session, + admin_user_id: int, + action: str, + target_type: str, + target_id: str, + details: Optional[Dict[str, Any]] = None, + ip_address: Optional[str] = None, + user_agent: Optional[str] = None, + request_id: Optional[str] = None + ) -> AdminAuditLog: + """ + Log an admin action to the audit trail. + + Args: + admin_user_id: ID of the admin performing the action + action: Action performed (e.g., 'create_vendor', 'delete_user') + target_type: Type of target (e.g., 'vendor', 'user') + target_id: ID of the target entity + details: Additional context about the action + ip_address: IP address of the admin + user_agent: User agent string + request_id: Request ID for correlation + + Returns: + Created AdminAuditLog instance + """ + try: + audit_log = AdminAuditLog( + admin_user_id=admin_user_id, + action=action, + target_type=target_type, + target_id=str(target_id), + details=details or {}, + ip_address=ip_address, + user_agent=user_agent, + request_id=request_id + ) + + db.add(audit_log) + db.commit() + db.refresh(audit_log) + + logger.info( + f"Admin action logged: {action} on {target_type}:{target_id} " + f"by admin {admin_user_id}" + ) + + return audit_log + + except Exception as e: + db.rollback() + logger.error(f"Failed to log admin action: {str(e)}") + # Don't raise exception - audit logging should not break operations + return None + + def get_audit_logs( + self, + db: Session, + filters: AdminAuditLogFilters + ) -> List[AdminAuditLogResponse]: + """ + Get filtered admin audit logs with pagination. + + Args: + filters: Filter criteria for audit logs + + Returns: + List of audit log responses + """ + try: + query = db.query(AdminAuditLog).join(User, AdminAuditLog.admin_user_id == User.id) + + # Apply filters + conditions = [] + + if filters.admin_user_id: + conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id) + + if filters.action: + conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%")) + + if filters.target_type: + conditions.append(AdminAuditLog.target_type == filters.target_type) + + if filters.date_from: + conditions.append(AdminAuditLog.created_at >= filters.date_from) + + if filters.date_to: + conditions.append(AdminAuditLog.created_at <= filters.date_to) + + if conditions: + query = query.filter(and_(*conditions)) + + # Execute query with pagination + logs = ( + query + .order_by(AdminAuditLog.created_at.desc()) + .offset(filters.skip) + .limit(filters.limit) + .all() + ) + + # Convert to response models + return [ + AdminAuditLogResponse( + id=log.id, + admin_user_id=log.admin_user_id, + admin_username=log.admin_user.username if log.admin_user else None, + action=log.action, + target_type=log.target_type, + target_id=log.target_id, + details=log.details, + ip_address=log.ip_address, + user_agent=log.user_agent, + request_id=log.request_id, + created_at=log.created_at + ) + for log in logs + ] + + except Exception as e: + logger.error(f"Failed to retrieve audit logs: {str(e)}") + raise AdminOperationException( + operation="get_audit_logs", + reason="Database query failed" + ) + + def get_audit_logs_count( + self, + db: Session, + filters: AdminAuditLogFilters + ) -> int: + """Get total count of audit logs matching filters.""" + try: + query = db.query(AdminAuditLog) + + # Apply same filters as get_audit_logs + conditions = [] + + if filters.admin_user_id: + conditions.append(AdminAuditLog.admin_user_id == filters.admin_user_id) + + if filters.action: + conditions.append(AdminAuditLog.action.ilike(f"%{filters.action}%")) + + if filters.target_type: + conditions.append(AdminAuditLog.target_type == filters.target_type) + + if filters.date_from: + conditions.append(AdminAuditLog.created_at >= filters.date_from) + + if filters.date_to: + conditions.append(AdminAuditLog.created_at <= filters.date_to) + + if conditions: + query = query.filter(and_(*conditions)) + + return query.count() + + except Exception as e: + logger.error(f"Failed to count audit logs: {str(e)}") + return 0 + + def get_recent_actions_by_admin( + self, + db: Session, + admin_user_id: int, + limit: int = 10 + ) -> List[AdminAuditLogResponse]: + """Get recent actions by a specific admin.""" + filters = AdminAuditLogFilters( + admin_user_id=admin_user_id, + limit=limit + ) + return self.get_audit_logs(db, filters) + + def get_actions_by_target( + self, + db: Session, + target_type: str, + target_id: str, + limit: int = 50 + ) -> List[AdminAuditLogResponse]: + """Get all actions performed on a specific target.""" + try: + logs = ( + db.query(AdminAuditLog) + .filter( + and_( + AdminAuditLog.target_type == target_type, + AdminAuditLog.target_id == str(target_id) + ) + ) + .order_by(AdminAuditLog.created_at.desc()) + .limit(limit) + .all() + ) + + return [ + AdminAuditLogResponse( + id=log.id, + admin_user_id=log.admin_user_id, + admin_username=log.admin_user.username if log.admin_user else None, + action=log.action, + target_type=log.target_type, + target_id=log.target_id, + details=log.details, + ip_address=log.ip_address, + created_at=log.created_at + ) + for log in logs + ] + + except Exception as e: + logger.error(f"Failed to get actions by target: {str(e)}") + return [] + + +# Create service instance +admin_audit_service = AdminAuditService() \ No newline at end of file diff --git a/app/services/admin_service.py b/app/services/admin_service.py index fcfec035..54463692 100644 --- a/app/services/admin_service.py +++ b/app/services/admin_service.py @@ -32,7 +32,7 @@ from app.exceptions import ( from models.schema.marketplace_import_job import MarketplaceImportJobResponse from models.schema.vendor import VendorCreate from models.database.marketplace_import_job import MarketplaceImportJob -from models.database.vendor import Vendor, Role +from models.database.vendor import Vendor, Role, VendorUser from models.database.user import User logger = logging.getLogger(__name__) @@ -108,6 +108,12 @@ class AdminService: """ Create vendor with owner user account. + Creates: + 1. User account with owner_email (for authentication) + 2. Vendor with contact_email (for business contact) + + If contact_email not provided, defaults to owner_email. + Returns: (vendor, owner_user, temporary_password) """ try: @@ -132,13 +138,12 @@ class AdminService: # Generate temporary password for owner temp_password = self._generate_temp_password() - # Create owner user + # Create owner user with owner_email from middleware.auth import AuthManager auth_manager = AuthManager() - owner_username = f"{vendor_data.vendor_code.lower()}_owner" - owner_email = vendor_data.owner_email if hasattr(vendor_data, - 'owner_email') else f"{owner_username}@{vendor_data.subdomain}.com" + owner_username = f"{vendor_data.subdomain}_owner" + owner_email = vendor_data.owner_email # ✅ For User authentication # Check if user with this email already exists existing_user = db.query(User).filter( @@ -151,31 +156,35 @@ class AdminService: else: # Create new owner user owner_user = User( - email=owner_email, + email=owner_email, # ✅ Authentication email username=owner_username, hashed_password=auth_manager.hash_password(temp_password), - role="user", # Will be vendor owner through relationship + role="user", is_active=True, ) db.add(owner_user) db.flush() # Get owner_user.id + # Determine contact_email + # If provided, use it; otherwise default to owner_email + contact_email = vendor_data.contact_email or owner_email + # Create vendor vendor = Vendor( vendor_code=vendor_data.vendor_code.upper(), subdomain=vendor_data.subdomain.lower(), name=vendor_data.name, - description=getattr(vendor_data, 'description', None), + description=vendor_data.description, owner_user_id=owner_user.id, - contact_email=owner_email, - contact_phone=getattr(vendor_data, 'contact_phone', None), - website=getattr(vendor_data, 'website', None), - business_address=getattr(vendor_data, 'business_address', None), - tax_number=getattr(vendor_data, 'tax_number', None), - letzshop_csv_url_fr=getattr(vendor_data, 'letzshop_csv_url_fr', None), - letzshop_csv_url_en=getattr(vendor_data, 'letzshop_csv_url_en', None), - letzshop_csv_url_de=getattr(vendor_data, 'letzshop_csv_url_de', None), - theme_config=getattr(vendor_data, 'theme_config', {}), + contact_email=contact_email, # ✅ Business contact email + contact_phone=vendor_data.contact_phone, + website=vendor_data.website, + business_address=vendor_data.business_address, + tax_number=vendor_data.tax_number, + letzshop_csv_url_fr=vendor_data.letzshop_csv_url_fr, + letzshop_csv_url_en=vendor_data.letzshop_csv_url_en, + letzshop_csv_url_de=vendor_data.letzshop_csv_url_de, + theme_config=vendor_data.theme_config or {}, is_active=True, is_verified=True, ) @@ -190,7 +199,8 @@ class AdminService: db.refresh(owner_user) logger.info( - f"Vendor {vendor.vendor_code} created with owner {owner_user.username}" + f"Vendor {vendor.vendor_code} created with owner {owner_user.username} " + f"(owner_email: {owner_email}, contact_email: {contact_email})" ) # TODO: Send welcome email to owner with credentials @@ -340,6 +350,231 @@ class AdminService: reason="Database deletion failed" ) + def update_vendor( + self, + db: Session, + vendor_id: int, + vendor_update # VendorUpdate schema + ) -> Vendor: + """ + Update vendor information (Admin only). + + Can update: + - Vendor details (name, description, subdomain) + - Business contact info (contact_email, phone, etc.) + - Status (is_active, is_verified) + + Cannot update: + - owner_email (use transfer_vendor_ownership instead) + - vendor_code (immutable) + - owner_user_id (use transfer_vendor_ownership instead) + + Args: + db: Database session + vendor_id: ID of vendor to update + vendor_update: VendorUpdate schema with updated data + + Returns: + Updated vendor object + + Raises: + VendorNotFoundException: If vendor not found + ValidationException: If subdomain already taken + """ + vendor = self._get_vendor_by_id_or_raise(db, vendor_id) + + try: + # Get update data + update_data = vendor_update.model_dump(exclude_unset=True) + + # Check subdomain uniqueness if changing + if 'subdomain' in update_data and update_data['subdomain'] != vendor.subdomain: + existing = db.query(Vendor).filter( + Vendor.subdomain == update_data['subdomain'], + Vendor.id != vendor_id + ).first() + if existing: + raise ValidationException( + f"Subdomain '{update_data['subdomain']}' is already taken" + ) + + # Update vendor fields + for field, value in update_data.items(): + setattr(vendor, field, value) + + vendor.updated_at = datetime.now(timezone.utc) + + db.commit() + db.refresh(vendor) + + logger.info( + f"Vendor {vendor_id} ({vendor.vendor_code}) updated by admin. " + f"Fields updated: {', '.join(update_data.keys())}" + ) + return vendor + + except ValidationException: + db.rollback() + raise + except Exception as e: + db.rollback() + logger.error(f"Failed to update vendor {vendor_id}: {str(e)}") + raise AdminOperationException( + operation="update_vendor", + reason=f"Database update failed: {str(e)}" + ) + + # Add this NEW method for transferring ownership: + + def transfer_vendor_ownership( + self, + db: Session, + vendor_id: int, + transfer_data # VendorTransferOwnership schema + ) -> Tuple[Vendor, User, User]: + """ + Transfer vendor ownership to another user. + + This method: + 1. Validates new owner exists and is active + 2. Removes old owner from "Owner" role (demotes to Manager) + 3. Assigns new owner to "Owner" role + 4. Updates vendor.owner_user_id + 5. Creates audit log entry + + Args: + db: Database session + vendor_id: ID of vendor + transfer_data: Transfer details (new owner ID, confirmation, reason) + + Returns: + Tuple of (vendor, old_owner, new_owner) + + Raises: + VendorNotFoundException: If vendor not found + UserNotFoundException: If new owner user not found + ValidationException: If confirmation not provided or user already owner + """ + + # Require confirmation + if not transfer_data.confirm_transfer: + raise ValidationException( + "Ownership transfer requires confirmation (confirm_transfer=true)" + ) + + # Get vendor + vendor = self._get_vendor_by_id_or_raise(db, vendor_id) + old_owner = vendor.owner + + # Get new owner + new_owner = db.query(User).filter( + User.id == transfer_data.new_owner_user_id + ).first() + + if not new_owner: + raise UserNotFoundException(str(transfer_data.new_owner_user_id)) + + # Check if new owner is active + if not new_owner.is_active: + raise ValidationException( + f"User {new_owner.username} (ID: {new_owner.id}) is not active" + ) + + # Check if already owner + if new_owner.id == old_owner.id: + raise ValidationException( + f"User {new_owner.username} is already the owner of this vendor" + ) + + try: + # Get Owner role for this vendor + owner_role = db.query(Role).filter( + Role.vendor_id == vendor_id, + Role.name == "Owner" + ).first() + + if not owner_role: + raise ValidationException("Owner role not found for vendor") + + # Get Manager role (to demote old owner) + manager_role = db.query(Role).filter( + Role.vendor_id == vendor_id, + Role.name == "Manager" + ).first() + + # Remove old owner from Owner role + old_owner_link = db.query(VendorUser).filter( + VendorUser.vendor_id == vendor_id, + VendorUser.user_id == old_owner.id, + VendorUser.role_id == owner_role.id + ).first() + + if old_owner_link: + if manager_role: + # Demote to Manager role + old_owner_link.role_id = manager_role.id + logger.info( + f"Old owner {old_owner.username} demoted to Manager role " + f"for vendor {vendor.vendor_code}" + ) + else: + # No Manager role, just remove Owner link + db.delete(old_owner_link) + logger.warning( + f"Old owner {old_owner.username} removed from vendor {vendor.vendor_code} " + f"(no Manager role available)" + ) + + # Check if new owner already has a vendor_user link + new_owner_link = db.query(VendorUser).filter( + VendorUser.vendor_id == vendor_id, + VendorUser.user_id == new_owner.id + ).first() + + if new_owner_link: + # Update existing link to Owner role + new_owner_link.role_id = owner_role.id + new_owner_link.is_active = True + else: + # Create new Owner link + new_owner_link = VendorUser( + vendor_id=vendor_id, + user_id=new_owner.id, + role_id=owner_role.id, + is_active=True + ) + db.add(new_owner_link) + + # Update vendor owner_user_id + vendor.owner_user_id = new_owner.id + vendor.updated_at = datetime.now(timezone.utc) + + db.commit() + db.refresh(vendor) + + logger.warning( + f"OWNERSHIP TRANSFERRED for vendor {vendor.vendor_code}: " + f"{old_owner.username} (ID: {old_owner.id}) -> " + f"{new_owner.username} (ID: {new_owner.id}). " + f"Reason: {transfer_data.transfer_reason or 'Not provided'}" + ) + + # TODO: Send notification emails to both old and new owners + # self._send_ownership_transfer_emails(vendor, old_owner, new_owner, transfer_data.transfer_reason) + + return vendor, old_owner, new_owner + + except (ValidationException, UserNotFoundException): + db.rollback() + raise + except Exception as e: + db.rollback() + logger.error(f"Failed to transfer ownership for vendor {vendor_id}: {str(e)}") + raise AdminOperationException( + operation="transfer_vendor_ownership", + reason=f"Ownership transfer failed: {str(e)}" + ) + # ============================================================================ # MARKETPLACE IMPORT JOBS # ============================================================================ diff --git a/app/services/admin_settings_service.py b/app/services/admin_settings_service.py new file mode 100644 index 00000000..dec031af --- /dev/null +++ b/app/services/admin_settings_service.py @@ -0,0 +1,335 @@ +# app/services/admin_settings_service.py +""" +Admin settings service for platform-wide configuration. + +This module provides functions for: +- Managing platform settings +- Getting/setting configuration values +- Encrypting sensitive settings +""" + +import logging +import json +from typing import Optional, List, Any, Dict +from datetime import datetime, timezone + +from sqlalchemy.orm import Session +from sqlalchemy import func + +from models.database.admin import AdminSetting +from models.schema.admin import ( + AdminSettingCreate, + AdminSettingResponse, + AdminSettingUpdate +) +from app.exceptions import ( + AdminOperationException, + ValidationException, + ResourceNotFoundException +) + +logger = logging.getLogger(__name__) + + +class AdminSettingsService: + """Service for managing platform-wide settings.""" + + def get_setting_by_key( + self, + db: Session, + key: str + ) -> Optional[AdminSetting]: + """Get setting by key.""" + try: + return db.query(AdminSetting).filter( + func.lower(AdminSetting.key) == key.lower() + ).first() + except Exception as e: + logger.error(f"Failed to get setting {key}: {str(e)}") + return None + + def get_setting_value( + self, + db: Session, + key: str, + default: Any = None + ) -> Any: + """ + Get setting value with type conversion. + + Args: + key: Setting key + default: Default value if setting doesn't exist + + Returns: + Typed setting value + """ + setting = self.get_setting_by_key(db, key) + + if not setting: + return default + + # Convert value based on type + try: + if setting.value_type == "integer": + return int(setting.value) + elif setting.value_type == "float": + return float(setting.value) + elif setting.value_type == "boolean": + return setting.value.lower() in ('true', '1', 'yes') + elif setting.value_type == "json": + return json.loads(setting.value) + else: + return setting.value + except Exception as e: + logger.error(f"Failed to convert setting {key} value: {str(e)}") + return default + + def get_all_settings( + self, + db: Session, + category: Optional[str] = None, + is_public: Optional[bool] = None + ) -> List[AdminSettingResponse]: + """Get all settings with optional filtering.""" + try: + query = db.query(AdminSetting) + + if category: + query = query.filter(AdminSetting.category == category) + + if is_public is not None: + query = query.filter(AdminSetting.is_public == is_public) + + settings = query.order_by(AdminSetting.category, AdminSetting.key).all() + + return [ + AdminSettingResponse.model_validate(setting) + for setting in settings + ] + + except Exception as e: + logger.error(f"Failed to get settings: {str(e)}") + raise AdminOperationException( + operation="get_all_settings", + reason="Database query failed" + ) + + def get_settings_by_category( + self, + db: Session, + category: str + ) -> Dict[str, Any]: + """ + Get all settings in a category as a dictionary. + + Returns: + Dictionary of key-value pairs + """ + settings = self.get_all_settings(db, category=category) + + result = {} + for setting in settings: + # Convert value based on type + if setting.value_type == "integer": + result[setting.key] = int(setting.value) + elif setting.value_type == "float": + result[setting.key] = float(setting.value) + elif setting.value_type == "boolean": + result[setting.key] = setting.value.lower() in ('true', '1', 'yes') + elif setting.value_type == "json": + result[setting.key] = json.loads(setting.value) + else: + result[setting.key] = setting.value + + return result + + def create_setting( + self, + db: Session, + setting_data: AdminSettingCreate, + admin_user_id: int + ) -> AdminSettingResponse: + """Create new setting.""" + try: + # Check if setting already exists + existing = self.get_setting_by_key(db, setting_data.key) + if existing: + raise ValidationException( + f"Setting with key '{setting_data.key}' already exists" + ) + + # Validate value based on type + self._validate_setting_value(setting_data.value, setting_data.value_type) + + # TODO: Encrypt value if is_encrypted=True + value_to_store = setting_data.value + if setting_data.is_encrypted: + # value_to_store = self._encrypt_value(setting_data.value) + pass + + setting = AdminSetting( + key=setting_data.key.lower(), + value=value_to_store, + value_type=setting_data.value_type, + category=setting_data.category, + description=setting_data.description, + is_encrypted=setting_data.is_encrypted, + is_public=setting_data.is_public, + last_modified_by_user_id=admin_user_id + ) + + db.add(setting) + db.commit() + db.refresh(setting) + + logger.info(f"Setting '{setting.key}' created by admin {admin_user_id}") + + return AdminSettingResponse.model_validate(setting) + + except ValidationException: + db.rollback() + raise + except Exception as e: + db.rollback() + logger.error(f"Failed to create setting: {str(e)}") + raise AdminOperationException( + operation="create_setting", + reason="Database operation failed" + ) + + def update_setting( + self, + db: Session, + key: str, + update_data: AdminSettingUpdate, + admin_user_id: int + ) -> AdminSettingResponse: + """Update existing setting.""" + setting = self.get_setting_by_key(db, key) + + if not setting: + raise ResourceNotFoundException( + resource_type="setting", + identifier=key + ) + + try: + # Validate new value + self._validate_setting_value(update_data.value, setting.value_type) + + # TODO: Encrypt value if needed + value_to_store = update_data.value + if setting.is_encrypted: + # value_to_store = self._encrypt_value(update_data.value) + pass + + setting.value = value_to_store + if update_data.description is not None: + setting.description = update_data.description + setting.last_modified_by_user_id = admin_user_id + setting.updated_at = datetime.now(timezone.utc) + + db.commit() + db.refresh(setting) + + logger.info(f"Setting '{setting.key}' updated by admin {admin_user_id}") + + return AdminSettingResponse.model_validate(setting) + + except ValidationException: + db.rollback() + raise + except Exception as e: + db.rollback() + logger.error(f"Failed to update setting {key}: {str(e)}") + raise AdminOperationException( + operation="update_setting", + reason="Database operation failed" + ) + + def upsert_setting( + self, + db: Session, + setting_data: AdminSettingCreate, + admin_user_id: int + ) -> AdminSettingResponse: + """Create or update setting (upsert).""" + existing = self.get_setting_by_key(db, setting_data.key) + + if existing: + update_data = AdminSettingUpdate( + value=setting_data.value, + description=setting_data.description + ) + return self.update_setting(db, setting_data.key, update_data, admin_user_id) + else: + return self.create_setting(db, setting_data, admin_user_id) + + def delete_setting( + self, + db: Session, + key: str, + admin_user_id: int + ) -> str: + """Delete setting.""" + setting = self.get_setting_by_key(db, key) + + if not setting: + raise ResourceNotFoundException( + resource_type="setting", + identifier=key + ) + + try: + db.delete(setting) + db.commit() + + logger.warning(f"Setting '{key}' deleted by admin {admin_user_id}") + + return f"Setting '{key}' successfully deleted" + + except Exception as e: + db.rollback() + logger.error(f"Failed to delete setting {key}: {str(e)}") + raise AdminOperationException( + operation="delete_setting", + reason="Database operation failed" + ) + + # ============================================================================ + # HELPER METHODS + # ============================================================================ + + def _validate_setting_value(self, value: str, value_type: str): + """Validate setting value matches declared type.""" + try: + if value_type == "integer": + int(value) + elif value_type == "float": + float(value) + elif value_type == "boolean": + if value.lower() not in ('true', 'false', '1', '0', 'yes', 'no'): + raise ValueError("Invalid boolean value") + elif value_type == "json": + json.loads(value) + except Exception as e: + raise ValidationException( + f"Value '{value}' is not valid for type '{value_type}': {str(e)}" + ) + + def _encrypt_value(self, value: str) -> str: + """Encrypt sensitive setting value.""" + # TODO: Implement encryption using Fernet or similar + # from cryptography.fernet import Fernet + # return encrypted_value + return value + + def _decrypt_value(self, encrypted_value: str) -> str: + """Decrypt sensitive setting value.""" + # TODO: Implement decryption + return encrypted_value + + +# Create service instance +admin_settings_service = AdminSettingsService() diff --git a/models/database/admin.py b/models/database/admin.py index a74f10b4..32b10ae8 100644 --- a/models/database/admin.py +++ b/models/database/admin.py @@ -1 +1,162 @@ # Admin-specific models +# models/database/admin.py +""" +Admin-specific database models. + +This module provides models for: +- Admin audit logging (compliance and security tracking) +- Admin notifications (system alerts and warnings) +- Platform settings (global configuration) +- Platform alerts (system-wide issues) +""" + +from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, JSON, ForeignKey +from sqlalchemy.orm import relationship +from app.core.database import Base +from .base import TimestampMixin + + +class AdminAuditLog(Base, TimestampMixin): + """ + Track all admin actions for compliance and security. + + Separate from regular audit logs - focuses on admin-specific operations + like vendor creation, user management, and system configuration changes. + """ + __tablename__ = "admin_audit_logs" + + id = Column(Integer, primary_key=True, index=True) + admin_user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) + action = Column(String(100), nullable=False, index=True) # create_vendor, delete_vendor, etc. + target_type = Column(String(50), nullable=False, index=True) # vendor, user, import_job, setting + target_id = Column(String(100), nullable=False, index=True) + details = Column(JSON) # Additional context about the action + ip_address = Column(String(45)) # IPv4 or IPv6 + user_agent = Column(Text) + request_id = Column(String(100)) # For correlating with application logs + + # Relationships + admin_user = relationship("User", foreign_keys=[admin_user_id]) + + def __repr__(self): + return f"" + + +class AdminNotification(Base, TimestampMixin): + """ + Admin-specific notifications for system alerts and warnings. + + Different from vendor/customer notifications - these are for platform + administrators to track system health and issues requiring attention. + """ + __tablename__ = "admin_notifications" + + id = Column(Integer, primary_key=True, index=True) + type = Column(String(50), nullable=False, index=True) # system_alert, vendor_issue, import_failure + priority = Column(String(20), default="normal", index=True) # low, normal, high, critical + title = Column(String(200), nullable=False) + message = Column(Text, nullable=False) + is_read = Column(Boolean, default=False, index=True) + read_at = Column(DateTime, nullable=True) + read_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True) + action_required = Column(Boolean, default=False, index=True) + action_url = Column(String(500)) # Link to relevant admin page + notification_metadata = Column(JSON) # Additional contextual data + + # Relationships + read_by = relationship("User", foreign_keys=[read_by_user_id]) + + def __repr__(self): + return f"" + + +class AdminSetting(Base, TimestampMixin): + """ + Platform-wide admin settings and configuration. + + Stores global settings that affect the entire platform, different from + vendor-specific settings. Supports encryption for sensitive values. + + Examples: + - max_vendors_allowed + - maintenance_mode + - default_vendor_trial_days + - smtp_settings + - stripe_api_keys (encrypted) + """ + __tablename__ = "admin_settings" + + id = Column(Integer, primary_key=True, index=True) + key = Column(String(100), unique=True, nullable=False, index=True) + value = Column(Text, nullable=False) + value_type = Column(String(20), default="string") # string, integer, boolean, json + category = Column(String(50), index=True) # system, security, marketplace, notifications + description = Column(Text) + is_encrypted = Column(Boolean, default=False) + is_public = Column(Boolean, default=False) # Can be exposed to frontend? + last_modified_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True) + + # Relationships + last_modified_by = relationship("User", foreign_keys=[last_modified_by_user_id]) + + def __repr__(self): + return f"" + + +class PlatformAlert(Base, TimestampMixin): + """ + System-wide alerts that admins need to be aware of. + + Tracks platform issues, performance problems, security incidents, + and other system-level concerns that require admin attention. + """ + __tablename__ = "platform_alerts" + + id = Column(Integer, primary_key=True, index=True) + alert_type = Column(String(50), nullable=False, index=True) # security, performance, capacity, integration + severity = Column(String(20), nullable=False, index=True) # info, warning, error, critical + title = Column(String(200), nullable=False) + description = Column(Text) + affected_vendors = Column(JSON) # List of affected vendor IDs + affected_systems = Column(JSON) # List of affected system components + is_resolved = Column(Boolean, default=False, index=True) + resolved_at = Column(DateTime, nullable=True) + resolved_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True) + resolution_notes = Column(Text) + auto_generated = Column(Boolean, default=True) # System-generated vs manual + occurrence_count = Column(Integer, default=1) # Track repeated occurrences + first_occurred_at = Column(DateTime, nullable=False) + last_occurred_at = Column(DateTime, nullable=False) + + # Relationships + resolved_by = relationship("User", foreign_keys=[resolved_by_user_id]) + + def __repr__(self): + return f"" + + +class AdminSession(Base, TimestampMixin): + """ + Track admin login sessions for security monitoring. + + Helps identify suspicious login patterns, track concurrent sessions, + and enforce session policies for admin users. + """ + __tablename__ = "admin_sessions" + + id = Column(Integer, primary_key=True, index=True) + admin_user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) + session_token = Column(String(255), unique=True, nullable=False, index=True) + ip_address = Column(String(45), nullable=False) + user_agent = Column(Text) + login_at = Column(DateTime, nullable=False, index=True) + last_activity_at = Column(DateTime, nullable=False) + logout_at = Column(DateTime, nullable=True) + is_active = Column(Boolean, default=True, index=True) + logout_reason = Column(String(50)) # manual, timeout, forced, suspicious + + # Relationships + admin_user = relationship("User", foreign_keys=[admin_user_id]) + + def __repr__(self): + return f"" diff --git a/models/schema/admin.py b/models/schema/admin.py index 47bda2ec..22318417 100644 --- a/models/schema/admin.py +++ b/models/schema/admin.py @@ -1 +1,365 @@ -# Admin operation models +# models/schema/admin.py +""" +Admin-specific Pydantic schemas for API validation and responses. + +This module provides schemas for: +- Admin audit logs +- Admin notifications +- Platform settings +- Platform alerts +- Bulk operations +- System health checks +""" + +from datetime import datetime +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field, field_validator + + +# ============================================================================ +# ADMIN AUDIT LOG SCHEMAS +# ============================================================================ + +class AdminAuditLogResponse(BaseModel): + """Response model for admin audit logs.""" + id: int + admin_user_id: int + admin_username: Optional[str] = None + action: str + target_type: str + target_id: str + details: Optional[Dict[str, Any]] = None + ip_address: Optional[str] = None + user_agent: Optional[str] = None + request_id: Optional[str] = None + created_at: datetime + + model_config = {"from_attributes": True} + + +class AdminAuditLogFilters(BaseModel): + """Filters for querying audit logs.""" + admin_user_id: Optional[int] = None + action: Optional[str] = None + target_type: Optional[str] = None + date_from: Optional[datetime] = None + date_to: Optional[datetime] = None + skip: int = Field(0, ge=0) + limit: int = Field(100, ge=1, le=1000) + + +class AdminAuditLogListResponse(BaseModel): + """Paginated list of audit logs.""" + logs: List[AdminAuditLogResponse] + total: int + skip: int + limit: int + + +# ============================================================================ +# ADMIN NOTIFICATION SCHEMAS +# ============================================================================ + +class AdminNotificationCreate(BaseModel): + """Create admin notification.""" + type: str = Field(..., max_length=50, description="Notification type") + priority: str = Field(default="normal", description="Priority level") + title: str = Field(..., max_length=200) + message: str = Field(..., description="Notification message") + action_required: bool = Field(default=False) + action_url: Optional[str] = Field(None, max_length=500) + metadata: Optional[Dict[str, Any]] = None + + @field_validator('priority') + @classmethod + def validate_priority(cls, v): + allowed = ['low', 'normal', 'high', 'critical'] + if v not in allowed: + raise ValueError(f"Priority must be one of: {', '.join(allowed)}") + return v + + +class AdminNotificationResponse(BaseModel): + """Admin notification response.""" + id: int + type: str + priority: str + title: str + message: str + is_read: bool + read_at: Optional[datetime] = None + read_by_user_id: Optional[int] = None + action_required: bool + action_url: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + created_at: datetime + + model_config = {"from_attributes": True} + + +class AdminNotificationUpdate(BaseModel): + """Mark notification as read.""" + is_read: bool = True + + +class AdminNotificationListResponse(BaseModel): + """Paginated list of notifications.""" + notifications: List[AdminNotificationResponse] + total: int + unread_count: int + skip: int + limit: int + + +# ============================================================================ +# ADMIN SETTINGS SCHEMAS +# ============================================================================ + +class AdminSettingCreate(BaseModel): + """Create or update admin setting.""" + key: str = Field(..., max_length=100, description="Unique setting key") + value: str = Field(..., description="Setting value") + value_type: str = Field(default="string", description="Data type") + category: Optional[str] = Field(None, max_length=50) + description: Optional[str] = None + is_encrypted: bool = Field(default=False) + is_public: bool = Field(default=False, description="Can be exposed to frontend") + + @field_validator('value_type') + @classmethod + def validate_value_type(cls, v): + allowed = ['string', 'integer', 'boolean', 'json', 'float'] + if v not in allowed: + raise ValueError(f"Value type must be one of: {', '.join(allowed)}") + return v + + @field_validator('key') + @classmethod + def validate_key_format(cls, v): + # Setting keys should be lowercase with underscores + if not v.replace('_', '').isalnum(): + raise ValueError("Setting key must contain only letters, numbers, and underscores") + return v.lower() + + +class AdminSettingResponse(BaseModel): + """Admin setting response.""" + id: int + key: str + value: str + value_type: str + category: Optional[str] = None + description: Optional[str] = None + is_encrypted: bool + is_public: bool + last_modified_by_user_id: Optional[int] = None + updated_at: datetime + + model_config = {"from_attributes": True} + + +class AdminSettingUpdate(BaseModel): + """Update admin setting value.""" + value: str + description: Optional[str] = None + + +class AdminSettingListResponse(BaseModel): + """List of settings by category.""" + settings: List[AdminSettingResponse] + total: int + category: Optional[str] = None + + +# ============================================================================ +# PLATFORM ALERT SCHEMAS +# ============================================================================ + +class PlatformAlertCreate(BaseModel): + """Create platform alert.""" + alert_type: str = Field(..., max_length=50) + severity: str = Field(..., description="Alert severity") + title: str = Field(..., max_length=200) + description: Optional[str] = None + affected_vendors: Optional[List[int]] = None + affected_systems: Optional[List[str]] = None + auto_generated: bool = Field(default=True) + + @field_validator('severity') + @classmethod + def validate_severity(cls, v): + allowed = ['info', 'warning', 'error', 'critical'] + if v not in allowed: + raise ValueError(f"Severity must be one of: {', '.join(allowed)}") + return v + + @field_validator('alert_type') + @classmethod + def validate_alert_type(cls, v): + allowed = ['security', 'performance', 'capacity', 'integration', 'database', 'system'] + if v not in allowed: + raise ValueError(f"Alert type must be one of: {', '.join(allowed)}") + return v + + +class PlatformAlertResponse(BaseModel): + """Platform alert response.""" + id: int + alert_type: str + severity: str + title: str + description: Optional[str] = None + affected_vendors: Optional[List[int]] = None + affected_systems: Optional[List[str]] = None + is_resolved: bool + resolved_at: Optional[datetime] = None + resolved_by_user_id: Optional[int] = None + resolution_notes: Optional[str] = None + auto_generated: bool + occurrence_count: int + first_occurred_at: datetime + last_occurred_at: datetime + created_at: datetime + + model_config = {"from_attributes": True} + + +class PlatformAlertResolve(BaseModel): + """Resolve platform alert.""" + is_resolved: bool = True + resolution_notes: Optional[str] = None + + +class PlatformAlertListResponse(BaseModel): + """Paginated list of platform alerts.""" + alerts: List[PlatformAlertResponse] + total: int + active_count: int + critical_count: int + skip: int + limit: int + + +# ============================================================================ +# BULK OPERATION SCHEMAS +# ============================================================================ + +class BulkVendorAction(BaseModel): + """Bulk actions on vendors.""" + vendor_ids: List[int] = Field(..., min_length=1, max_length=100) + action: str = Field(..., description="Action to perform") + confirm: bool = Field(default=False, description="Required for destructive actions") + reason: Optional[str] = Field(None, description="Reason for bulk action") + + @field_validator('action') + @classmethod + def validate_action(cls, v): + allowed = ['activate', 'deactivate', 'verify', 'unverify', 'delete'] + if v not in allowed: + raise ValueError(f"Action must be one of: {', '.join(allowed)}") + return v + + +class BulkVendorActionResponse(BaseModel): + """Response for bulk vendor actions.""" + successful: List[int] + failed: Dict[int, str] # vendor_id -> error_message + total_processed: int + action_performed: str + message: str + + +class BulkUserAction(BaseModel): + """Bulk actions on users.""" + user_ids: List[int] = Field(..., min_length=1, max_length=100) + action: str = Field(..., description="Action to perform") + confirm: bool = Field(default=False) + reason: Optional[str] = None + + @field_validator('action') + @classmethod + def validate_action(cls, v): + allowed = ['activate', 'deactivate', 'delete'] + if v not in allowed: + raise ValueError(f"Action must be one of: {', '.join(allowed)}") + return v + + +class BulkUserActionResponse(BaseModel): + """Response for bulk user actions.""" + successful: List[int] + failed: Dict[int, str] + total_processed: int + action_performed: str + message: str + + +# ============================================================================ +# ADMIN DASHBOARD SCHEMAS +# ============================================================================ + +class AdminDashboardStats(BaseModel): + """Comprehensive admin dashboard statistics.""" + platform: Dict[str, Any] + users: Dict[str, Any] + vendors: Dict[str, Any] + products: Dict[str, Any] + orders: Dict[str, Any] + imports: Dict[str, Any] + recent_vendors: List[Dict[str, Any]] + recent_imports: List[Dict[str, Any]] + unread_notifications: int + active_alerts: int + critical_alerts: int + + +# ============================================================================ +# SYSTEM HEALTH SCHEMAS +# ============================================================================ + +class ComponentHealthStatus(BaseModel): + """Health status for a system component.""" + status: str # healthy, degraded, unhealthy + response_time_ms: Optional[float] = None + error_message: Optional[str] = None + last_checked: datetime + details: Optional[Dict[str, Any]] = None + + +class SystemHealthResponse(BaseModel): + """System health check response.""" + overall_status: str # healthy, degraded, critical + database: ComponentHealthStatus + redis: ComponentHealthStatus + celery: ComponentHealthStatus + storage: ComponentHealthStatus + api_response_time_ms: float + uptime_seconds: int + timestamp: datetime + + +# ============================================================================ +# ADMIN SESSION SCHEMAS +# ============================================================================ + +class AdminSessionResponse(BaseModel): + """Admin session information.""" + id: int + admin_user_id: int + admin_username: Optional[str] = None + ip_address: str + user_agent: Optional[str] = None + login_at: datetime + last_activity_at: datetime + logout_at: Optional[datetime] = None + is_active: bool + logout_reason: Optional[str] = None + + model_config = {"from_attributes": True} + + +class AdminSessionListResponse(BaseModel): + """List of admin sessions.""" + sessions: List[AdminSessionResponse] + total: int + active_count: int