Files
orion/app/api/v1/admin/settings.py
Samir Boulahtit cad862f469 refactor(api): introduce UserContext schema for API dependency injection
Replace direct User database model imports in API endpoints with UserContext
schema, following the architecture principle that API routes should not import
database models directly.

Changes:
- Create UserContext schema in models/schema/auth.py with from_user() factory
- Update app/api/deps.py to return UserContext from all auth dependencies
- Add _get_user_model() helper for functions needing User model access
- Update 58 API endpoint files to use UserContext instead of User
- Add noqa comments for 4 legitimate edge cases (enums, internal helpers)

Architecture validation: 0 errors (down from 61), 11 warnings remain

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 20:47:33 +01:00

716 lines
24 KiB
Python

# app/api/v1/admin/settings.py
"""
Platform settings management endpoints.
Provides endpoints for:
- Viewing all platform settings
- Creating/updating settings
- Managing configuration by category
- Email configuration status and testing
"""
import logging
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.config import settings as app_settings
from app.core.database import get_db
from app.exceptions import ConfirmationRequiredException, ResourceNotFoundException
from app.services.admin_audit_service import admin_audit_service
from app.services.admin_settings_service import admin_settings_service
from models.schema.auth import UserContext
from models.schema.admin import (
AdminSettingCreate,
AdminSettingDefaultResponse,
AdminSettingListResponse,
AdminSettingResponse,
AdminSettingUpdate,
PublicDisplaySettingsResponse,
RowsPerPageResponse,
RowsPerPageUpdateResponse,
)
router = APIRouter(prefix="/settings")
logger = logging.getLogger(__name__)
@router.get("", response_model=AdminSettingListResponse)
def get_all_settings(
category: str | None = Query(None, description="Filter by category"),
is_public: bool | None = Query(None, description="Filter by public flag"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Get all platform settings.
Can be filtered by category (system, security, marketplace, notifications)
and by public flag (settings that can be exposed to frontend).
"""
settings = admin_settings_service.get_all_settings(db, category, is_public)
return AdminSettingListResponse(
settings=settings, total=len(settings), category=category
)
@router.get("/categories")
def get_setting_categories(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get list of all setting categories."""
# This could be enhanced to return counts per category
return {
"categories": [
"system",
"security",
"marketplace",
"notifications",
"integrations",
"payments",
]
}
@router.get("/{key}", response_model=AdminSettingResponse | AdminSettingDefaultResponse)
def get_setting(
key: str,
default: str | None = Query(None, description="Default value if setting not found"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> AdminSettingResponse | AdminSettingDefaultResponse:
"""Get specific setting by key.
If `default` is provided and the setting doesn't exist, returns a response
with the default value instead of 404.
"""
setting = admin_settings_service.get_setting_by_key(db, key)
if not setting:
if default is not None:
# Return default value without creating the setting
return AdminSettingDefaultResponse(key=key, value=default, exists=False)
raise ResourceNotFoundException(resource_type="Setting", identifier=key)
return AdminSettingResponse.model_validate(setting)
@router.post("", response_model=AdminSettingResponse)
def create_setting(
setting_data: AdminSettingCreate,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Create new platform setting.
Setting keys should be lowercase with underscores (e.g., max_vendors_allowed).
"""
result = admin_settings_service.create_setting(
db=db, setting_data=setting_data, admin_user_id=current_admin.id
)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="create_setting",
target_type="setting",
target_id=setting_data.key,
details={
"category": setting_data.category,
"value_type": setting_data.value_type,
},
)
db.commit()
return result
@router.put("/{key}", response_model=AdminSettingResponse)
def update_setting(
key: str,
update_data: AdminSettingUpdate,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Update existing setting value."""
old_value = admin_settings_service.get_setting_value(db, key)
result = admin_settings_service.update_setting(
db=db, key=key, update_data=update_data, admin_user_id=current_admin.id
)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="update_setting",
target_type="setting",
target_id=key,
details={"old_value": str(old_value), "new_value": update_data.value},
)
db.commit()
return result
@router.post("/upsert", response_model=AdminSettingResponse)
def upsert_setting(
setting_data: AdminSettingCreate,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Create or update setting (upsert).
If setting exists, updates its value. If not, creates new setting.
"""
result = admin_settings_service.upsert_setting(
db=db, setting_data=setting_data, admin_user_id=current_admin.id
)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="upsert_setting",
target_type="setting",
target_id=setting_data.key,
details={"category": setting_data.category},
)
db.commit()
return result
# ============================================================================
# CONVENIENCE ENDPOINTS FOR COMMON SETTINGS
# ============================================================================
@router.get("/display/rows-per-page", response_model=RowsPerPageResponse)
def get_rows_per_page(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> RowsPerPageResponse:
"""Get the platform-wide rows per page setting."""
value = admin_settings_service.get_setting_value(db, "rows_per_page", default="20")
return RowsPerPageResponse(rows_per_page=int(value))
@router.put("/display/rows-per-page", response_model=RowsPerPageUpdateResponse)
def set_rows_per_page(
rows: int = Query(..., ge=10, le=100, description="Rows per page (10-100)"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> RowsPerPageUpdateResponse:
"""
Set the platform-wide rows per page setting.
Valid values: 10, 20, 50, 100
"""
valid_values = [10, 20, 50, 100]
if rows not in valid_values:
# Round to nearest valid value
rows = min(valid_values, key=lambda x: abs(x - rows))
setting_data = AdminSettingCreate(
key="rows_per_page",
value=str(rows),
value_type="integer",
category="display",
description="Default number of rows per page in admin tables",
is_public=True,
)
admin_settings_service.upsert_setting(
db=db, setting_data=setting_data, admin_user_id=current_admin.id
)
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="update_setting",
target_type="setting",
target_id="rows_per_page",
details={"value": rows},
)
db.commit()
return RowsPerPageUpdateResponse(
rows_per_page=rows, message="Rows per page setting updated"
)
@router.get("/display/public", response_model=PublicDisplaySettingsResponse)
def get_public_display_settings(
db: Session = Depends(get_db),
) -> PublicDisplaySettingsResponse:
"""
Get public display settings (no auth required).
Returns settings that can be used by frontend without admin auth.
"""
rows_per_page = admin_settings_service.get_setting_value(
db, "rows_per_page", default="20"
)
return PublicDisplaySettingsResponse(rows_per_page=int(rows_per_page))
@router.delete("/{key}")
def delete_setting(
key: str,
confirm: bool = Query(False, description="Must be true to confirm deletion"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Delete platform setting.
Requires confirmation parameter.
WARNING: Deleting settings may affect platform functionality.
"""
if not confirm:
raise ConfirmationRequiredException(
operation="delete_setting",
message="Deletion requires confirmation parameter: confirm=true",
)
message = admin_settings_service.delete_setting(
db=db, key=key, admin_user_id=current_admin.id
)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="delete_setting",
target_type="setting",
target_id=key,
details={},
)
db.commit()
return {"message": message}
# ============================================================================
# EMAIL CONFIGURATION ENDPOINTS
# ============================================================================
# Email setting keys stored in admin_settings table
EMAIL_SETTING_KEYS = {
"email_provider": "smtp",
"email_from_address": "",
"email_from_name": "",
"email_reply_to": "",
"smtp_host": "",
"smtp_port": "587",
"smtp_user": "",
"smtp_password": "",
"smtp_use_tls": "true",
"smtp_use_ssl": "false",
"sendgrid_api_key": "",
"mailgun_api_key": "",
"mailgun_domain": "",
"aws_access_key_id": "",
"aws_secret_access_key": "",
"aws_region": "eu-west-1",
"email_enabled": "true",
"email_debug": "false",
}
def get_email_setting(db: Session, key: str) -> str | None:
"""Get email setting from database, returns None if not set."""
setting = admin_settings_service.get_setting_by_key(db, key)
return setting.value if setting else None
def get_effective_email_config(db: Session) -> dict:
"""
Get effective email configuration.
Priority: Database settings > Environment variables
"""
config = {}
# Provider
db_provider = get_email_setting(db, "email_provider")
config["provider"] = db_provider if db_provider else app_settings.email_provider
# From settings
db_from_email = get_email_setting(db, "email_from_address")
config["from_email"] = db_from_email if db_from_email else app_settings.email_from_address
db_from_name = get_email_setting(db, "email_from_name")
config["from_name"] = db_from_name if db_from_name else app_settings.email_from_name
db_reply_to = get_email_setting(db, "email_reply_to")
config["reply_to"] = db_reply_to if db_reply_to else app_settings.email_reply_to
# SMTP settings
db_smtp_host = get_email_setting(db, "smtp_host")
config["smtp_host"] = db_smtp_host if db_smtp_host else app_settings.smtp_host
db_smtp_port = get_email_setting(db, "smtp_port")
config["smtp_port"] = int(db_smtp_port) if db_smtp_port else app_settings.smtp_port
db_smtp_user = get_email_setting(db, "smtp_user")
config["smtp_user"] = db_smtp_user if db_smtp_user else app_settings.smtp_user
db_smtp_password = get_email_setting(db, "smtp_password")
config["smtp_password"] = db_smtp_password if db_smtp_password else app_settings.smtp_password
db_smtp_use_tls = get_email_setting(db, "smtp_use_tls")
config["smtp_use_tls"] = db_smtp_use_tls.lower() in ("true", "1", "yes") if db_smtp_use_tls else app_settings.smtp_use_tls
db_smtp_use_ssl = get_email_setting(db, "smtp_use_ssl")
config["smtp_use_ssl"] = db_smtp_use_ssl.lower() in ("true", "1", "yes") if db_smtp_use_ssl else app_settings.smtp_use_ssl
# SendGrid
db_sendgrid_key = get_email_setting(db, "sendgrid_api_key")
config["sendgrid_api_key"] = db_sendgrid_key if db_sendgrid_key else app_settings.sendgrid_api_key
# Mailgun
db_mailgun_key = get_email_setting(db, "mailgun_api_key")
config["mailgun_api_key"] = db_mailgun_key if db_mailgun_key else app_settings.mailgun_api_key
db_mailgun_domain = get_email_setting(db, "mailgun_domain")
config["mailgun_domain"] = db_mailgun_domain if db_mailgun_domain else app_settings.mailgun_domain
# AWS SES
db_aws_key = get_email_setting(db, "aws_access_key_id")
config["aws_access_key_id"] = db_aws_key if db_aws_key else app_settings.aws_access_key_id
db_aws_secret = get_email_setting(db, "aws_secret_access_key")
config["aws_secret_access_key"] = db_aws_secret if db_aws_secret else app_settings.aws_secret_access_key
db_aws_region = get_email_setting(db, "aws_region")
config["aws_region"] = db_aws_region if db_aws_region else app_settings.aws_region
# Behavior
db_enabled = get_email_setting(db, "email_enabled")
config["enabled"] = db_enabled.lower() in ("true", "1", "yes") if db_enabled else app_settings.email_enabled
db_debug = get_email_setting(db, "email_debug")
config["debug"] = db_debug.lower() in ("true", "1", "yes") if db_debug else app_settings.email_debug
# Track source for each field (DB override or .env)
config["_sources"] = {}
for key in ["provider", "from_email", "from_name", "smtp_host", "smtp_port"]:
db_key = "email_provider" if key == "provider" else ("email_from_address" if key == "from_email" else ("email_from_name" if key == "from_name" else key))
config["_sources"][key] = "database" if get_email_setting(db, db_key) else "env"
return config
class EmailStatusResponse(BaseModel):
"""Platform email configuration status."""
provider: str
from_email: str
from_name: str
reply_to: str | None = None
smtp_host: str | None = None
smtp_port: int | None = None
smtp_user: str | None = None
mailgun_domain: str | None = None
aws_region: str | None = None
debug: bool
enabled: bool
is_configured: bool
has_db_overrides: bool = False
class EmailSettingsUpdate(BaseModel):
"""Update email settings."""
provider: str | None = None
from_email: EmailStr | None = None
from_name: str | None = None
reply_to: EmailStr | None = None
# SMTP
smtp_host: str | None = None
smtp_port: int | None = None
smtp_user: str | None = None
smtp_password: str | None = None
smtp_use_tls: bool | None = None
smtp_use_ssl: bool | None = None
# SendGrid
sendgrid_api_key: str | None = None
# Mailgun
mailgun_api_key: str | None = None
mailgun_domain: str | None = None
# AWS SES
aws_access_key_id: str | None = None
aws_secret_access_key: str | None = None
aws_region: str | None = None
# Behavior
enabled: bool | None = None
debug: bool | None = None
class TestEmailRequest(BaseModel):
"""Request body for test email."""
to_email: EmailStr
class TestEmailResponse(BaseModel):
"""Response for test email."""
success: bool
message: str
@router.get("/email/status", response_model=EmailStatusResponse)
def get_email_status(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> EmailStatusResponse:
"""
Get platform email configuration status.
Returns the effective email configuration (DB overrides > .env).
Sensitive values (passwords, API keys) are NOT exposed.
"""
config = get_effective_email_config(db)
provider = config["provider"].lower()
# Determine if email is configured based on provider
is_configured = False
if provider == "smtp":
is_configured = bool(config["smtp_host"] and config["smtp_host"] != "localhost")
elif provider == "sendgrid":
is_configured = bool(config["sendgrid_api_key"])
elif provider == "mailgun":
is_configured = bool(config["mailgun_api_key"] and config["mailgun_domain"])
elif provider == "ses":
is_configured = bool(config["aws_access_key_id"] and config["aws_secret_access_key"])
# Check if any DB overrides exist
has_db_overrides = any(v == "database" for v in config["_sources"].values())
return EmailStatusResponse(
provider=provider,
from_email=config["from_email"],
from_name=config["from_name"],
reply_to=config["reply_to"] or None,
smtp_host=config["smtp_host"] if provider == "smtp" else None,
smtp_port=config["smtp_port"] if provider == "smtp" else None,
smtp_user=config["smtp_user"] if provider == "smtp" else None,
mailgun_domain=config["mailgun_domain"] if provider == "mailgun" else None,
aws_region=config["aws_region"] if provider == "ses" else None,
debug=config["debug"],
enabled=config["enabled"],
is_configured=is_configured,
has_db_overrides=has_db_overrides,
)
@router.put("/email/settings")
def update_email_settings(
settings_update: EmailSettingsUpdate,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Update platform email settings.
Settings are stored in the database and override .env values.
Only non-null values are updated.
"""
from models.schema.admin import AdminSettingCreate
updated_keys = []
# Map request fields to database keys
field_mappings = {
"provider": ("email_provider", "string"),
"from_email": ("email_from_address", "string"),
"from_name": ("email_from_name", "string"),
"reply_to": ("email_reply_to", "string"),
"smtp_host": ("smtp_host", "string"),
"smtp_port": ("smtp_port", "integer"),
"smtp_user": ("smtp_user", "string"),
"smtp_password": ("smtp_password", "string"),
"smtp_use_tls": ("smtp_use_tls", "boolean"),
"smtp_use_ssl": ("smtp_use_ssl", "boolean"),
"sendgrid_api_key": ("sendgrid_api_key", "string"),
"mailgun_api_key": ("mailgun_api_key", "string"),
"mailgun_domain": ("mailgun_domain", "string"),
"aws_access_key_id": ("aws_access_key_id", "string"),
"aws_secret_access_key": ("aws_secret_access_key", "string"),
"aws_region": ("aws_region", "string"),
"enabled": ("email_enabled", "boolean"),
"debug": ("email_debug", "boolean"),
}
# Sensitive fields that should be marked as encrypted
sensitive_keys = {
"smtp_password", "sendgrid_api_key", "mailgun_api_key",
"aws_access_key_id", "aws_secret_access_key"
}
for field, (db_key, value_type) in field_mappings.items():
value = getattr(settings_update, field, None)
if value is not None:
# Convert value to string for storage
if value_type == "boolean":
str_value = "true" if value else "false"
elif value_type == "integer":
str_value = str(value)
else:
str_value = str(value)
# Create or update setting
setting_data = AdminSettingCreate(
key=db_key,
value=str_value,
value_type=value_type,
category="email",
description=f"Email setting: {field}",
is_encrypted=db_key in sensitive_keys,
is_public=False,
)
admin_settings_service.upsert_setting(db, setting_data, current_admin.id)
updated_keys.append(field)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="update_email_settings",
target_type="email_settings",
target_id="platform",
details={"updated_keys": updated_keys},
)
db.commit()
logger.info(f"Email settings updated by admin {current_admin.id}: {updated_keys}")
return {
"success": True,
"message": f"Updated {len(updated_keys)} email setting(s)",
"updated_keys": updated_keys,
}
@router.delete("/email/settings")
def reset_email_settings(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Reset email settings to use .env values.
Deletes all email settings from the database, reverting to .env configuration.
"""
deleted_count = 0
for key in EMAIL_SETTING_KEYS:
setting = admin_settings_service.get_setting_by_key(db, key)
if setting:
# Use service method for deletion (API-002 compliance)
admin_settings_service.delete_setting(db, key, current_admin.id)
deleted_count += 1
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="reset_email_settings",
target_type="email_settings",
target_id="platform",
details={"deleted_count": deleted_count},
)
db.commit()
logger.info(f"Email settings reset by admin {current_admin.id}, deleted {deleted_count} settings")
return {
"success": True,
"message": f"Reset {deleted_count} email setting(s) to .env defaults",
}
@router.post("/email/test", response_model=TestEmailResponse)
def send_test_email(
request: TestEmailRequest,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> TestEmailResponse:
"""
Send a test email using the platform email configuration.
This tests the email provider configuration from environment variables.
"""
from app.services.email_service import EmailService
try:
email_service = EmailService(db)
# Send test email using platform configuration
email_log = email_service.send_raw(
to_email=request.to_email,
to_name=None,
subject="Wizamart Platform - Test Email",
body_html="""
<html>
<body style="font-family: Arial, sans-serif; padding: 20px;">
<h2 style="color: #6b46c1;">Test Email from Wizamart</h2>
<p>This is a test email to verify your platform email configuration.</p>
<p>If you received this email, your email settings are working correctly!</p>
<hr style="border: none; border-top: 1px solid #e5e7eb; margin: 20px 0;">
<p style="color: #6b7280; font-size: 12px;">
Provider: {provider}<br>
From: {from_email}
</p>
</body>
</html>
""".format(
provider=app_settings.email_provider,
from_email=app_settings.email_from_address,
),
body_text=f"Test email from Wizamart platform.\n\nProvider: {app_settings.email_provider}\nFrom: {app_settings.email_from_address}",
is_platform_email=True,
)
# Check if email was actually sent (send_raw returns EmailLog, not boolean)
if email_log.status == "sent":
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="send_test_email",
target_type="email",
target_id=request.to_email,
details={"provider": app_settings.email_provider},
)
db.commit()
return TestEmailResponse(
success=True,
message=f"Test email sent to {request.to_email}",
)
else:
return TestEmailResponse(
success=False,
message=email_log.error_message or "Failed to send test email. Check server logs for details.",
)
except Exception as e:
logger.error(f"Failed to send test email: {e}")
return TestEmailResponse(
success=False,
message=f"Error sending test email: {str(e)}",
)