Files
orion/models/database/admin.py
Samir Boulahtit 238c1ec9b8 refactor: modernize code quality tooling with Ruff
- Replace black, isort, and flake8 with Ruff (all-in-one linter and formatter)
- Add comprehensive pyproject.toml configuration
- Simplify Makefile code quality targets
- Configure exclusions for venv/.venv in pyproject.toml
- Auto-fix 1,359 linting issues across codebase

Benefits:
- Much faster builds (Ruff is written in Rust)
- Single tool replaces multiple tools
- More comprehensive rule set (UP, B, C4, SIM, PIE, RET, Q)
- All configuration centralized in pyproject.toml
- Better import sorting and formatting consistency

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 19:37:38 +01:00

193 lines
6.8 KiB
Python

# Admin-specific models
# models/database/admin.py
"""
Admin-specific database models.
This module provides models for:
- Admin audit logging (compliance and security tracking)
- Admin notifications (system alerts and warnings)
- Platform settings (global configuration)
- Platform alerts (system-wide issues)
"""
from sqlalchemy import (
JSON,
Boolean,
Column,
DateTime,
ForeignKey,
Integer,
String,
Text,
)
from sqlalchemy.orm import relationship
from app.core.database import Base
from .base import TimestampMixin
class AdminAuditLog(Base, TimestampMixin):
"""
Track all admin actions for compliance and security.
Separate from regular audit logs - focuses on admin-specific operations
like vendor creation, user management, and system configuration changes.
"""
__tablename__ = "admin_audit_logs"
id = Column(Integer, primary_key=True, index=True)
admin_user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
action = Column(
String(100), nullable=False, index=True
) # create_vendor, delete_vendor, etc.
target_type = Column(
String(50), nullable=False, index=True
) # vendor, user, import_job, setting
target_id = Column(String(100), nullable=False, index=True)
details = Column(JSON) # Additional context about the action
ip_address = Column(String(45)) # IPv4 or IPv6
user_agent = Column(Text)
request_id = Column(String(100)) # For correlating with application logs
# Relationships
admin_user = relationship("User", foreign_keys=[admin_user_id])
def __repr__(self):
return f"<AdminAuditLog(id={self.id}, action='{self.action}', target={self.target_type}:{self.target_id})>"
class AdminNotification(Base, TimestampMixin):
"""
Admin-specific notifications for system alerts and warnings.
Different from vendor/customer notifications - these are for platform
administrators to track system health and issues requiring attention.
"""
__tablename__ = "admin_notifications"
id = Column(Integer, primary_key=True, index=True)
type = Column(
String(50), nullable=False, index=True
) # system_alert, vendor_issue, import_failure
priority = Column(
String(20), default="normal", index=True
) # low, normal, high, critical
title = Column(String(200), nullable=False)
message = Column(Text, nullable=False)
is_read = Column(Boolean, default=False, index=True)
read_at = Column(DateTime, nullable=True)
read_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True)
action_required = Column(Boolean, default=False, index=True)
action_url = Column(String(500)) # Link to relevant admin page
notification_metadata = Column(JSON) # Additional contextual data
# Relationships
read_by = relationship("User", foreign_keys=[read_by_user_id])
def __repr__(self):
return f"<AdminNotification(id={self.id}, type='{self.type}', priority='{self.priority}')>"
class AdminSetting(Base, TimestampMixin):
"""
Platform-wide admin settings and configuration.
Stores global settings that affect the entire platform, different from
vendor-specific settings. Supports encryption for sensitive values.
Examples:
- max_vendors_allowed
- maintenance_mode
- default_vendor_trial_days
- smtp_settings
- stripe_api_keys (encrypted)
"""
__tablename__ = "admin_settings"
id = Column(Integer, primary_key=True, index=True)
key = Column(String(100), unique=True, nullable=False, index=True)
value = Column(Text, nullable=False)
value_type = Column(String(20), default="string") # string, integer, boolean, json
category = Column(
String(50), index=True
) # system, security, marketplace, notifications
description = Column(Text)
is_encrypted = Column(Boolean, default=False)
is_public = Column(Boolean, default=False) # Can be exposed to frontend?
last_modified_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True)
# Relationships
last_modified_by = relationship("User", foreign_keys=[last_modified_by_user_id])
def __repr__(self):
return f"<AdminSetting(key='{self.key}', category='{self.category}')>"
class PlatformAlert(Base, TimestampMixin):
"""
System-wide alerts that admins need to be aware of.
Tracks platform issues, performance problems, security incidents,
and other system-level concerns that require admin attention.
"""
__tablename__ = "platform_alerts"
id = Column(Integer, primary_key=True, index=True)
alert_type = Column(
String(50), nullable=False, index=True
) # security, performance, capacity, integration
severity = Column(
String(20), nullable=False, index=True
) # info, warning, error, critical
title = Column(String(200), nullable=False)
description = Column(Text)
affected_vendors = Column(JSON) # List of affected vendor IDs
affected_systems = Column(JSON) # List of affected system components
is_resolved = Column(Boolean, default=False, index=True)
resolved_at = Column(DateTime, nullable=True)
resolved_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True)
resolution_notes = Column(Text)
auto_generated = Column(Boolean, default=True) # System-generated vs manual
occurrence_count = Column(Integer, default=1) # Track repeated occurrences
first_occurred_at = Column(DateTime, nullable=False)
last_occurred_at = Column(DateTime, nullable=False)
# Relationships
resolved_by = relationship("User", foreign_keys=[resolved_by_user_id])
def __repr__(self):
return f"<PlatformAlert(id={self.id}, type='{self.alert_type}', severity='{self.severity}')>"
class AdminSession(Base, TimestampMixin):
"""
Track admin login sessions for security monitoring.
Helps identify suspicious login patterns, track concurrent sessions,
and enforce session policies for admin users.
"""
__tablename__ = "admin_sessions"
id = Column(Integer, primary_key=True, index=True)
admin_user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
session_token = Column(String(255), unique=True, nullable=False, index=True)
ip_address = Column(String(45), nullable=False)
user_agent = Column(Text)
login_at = Column(DateTime, nullable=False, index=True)
last_activity_at = Column(DateTime, nullable=False)
logout_at = Column(DateTime, nullable=True)
is_active = Column(Boolean, default=True, index=True)
logout_reason = Column(String(50)) # manual, timeout, forced, suspicious
# Relationships
admin_user = relationship("User", foreign_keys=[admin_user_id])
def __repr__(self):
return f"<AdminSession(id={self.id}, admin_user_id={self.admin_user_id}, is_active={self.is_active})>"