Files
orion/app/api/v1/admin/settings.py
Samir Boulahtit 84a523cd7b fix: resolve email settings architecture violations and add tests/docs
- Fix API-002 in admin/settings.py: use service layer for DB delete
- Fix API-001/API-003 in vendor/email_settings.py: add Pydantic response
  models, remove HTTPException raises
- Fix SVC-002/SVC-006 in vendor_email_settings_service.py: use domain
  exceptions, change db.commit() to db.flush()
- Add unit tests for VendorEmailSettingsService
- Add integration tests for vendor and admin email settings APIs
- Add user guide (docs/guides/email-settings.md)
- Add developer guide (docs/implementation/email-settings.md)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 22:38:10 +01:00

706 lines
23 KiB
Python

# app/api/v1/admin/settings.py
"""
Platform settings management endpoints.
Provides endpoints for:
- Viewing all platform settings
- Creating/updating settings
- Managing configuration by category
- Email configuration status and testing
"""
import logging
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.config import settings as app_settings
from app.core.database import get_db
from app.exceptions import ConfirmationRequiredException, ResourceNotFoundException
from app.services.admin_audit_service import admin_audit_service
from app.services.admin_settings_service import admin_settings_service
from models.database.user import User
from models.schema.admin import (
AdminSettingCreate,
AdminSettingListResponse,
AdminSettingResponse,
AdminSettingUpdate,
PublicDisplaySettingsResponse,
RowsPerPageResponse,
RowsPerPageUpdateResponse,
)
router = APIRouter(prefix="/settings")
logger = logging.getLogger(__name__)
@router.get("", response_model=AdminSettingListResponse)
def get_all_settings(
category: str | None = Query(None, description="Filter by category"),
is_public: bool | None = Query(None, description="Filter by public flag"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Get all platform settings.
Can be filtered by category (system, security, marketplace, notifications)
and by public flag (settings that can be exposed to frontend).
"""
settings = admin_settings_service.get_all_settings(db, category, is_public)
return AdminSettingListResponse(
settings=settings, total=len(settings), category=category
)
@router.get("/categories")
def get_setting_categories(
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get list of all setting categories."""
# This could be enhanced to return counts per category
return {
"categories": [
"system",
"security",
"marketplace",
"notifications",
"integrations",
"payments",
]
}
@router.get("/{key}", response_model=AdminSettingResponse)
def get_setting(
key: str,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get specific setting by key."""
setting = admin_settings_service.get_setting_by_key(db, key)
if not setting:
raise ResourceNotFoundException(resource_type="Setting", identifier=key)
return AdminSettingResponse.model_validate(setting)
@router.post("", response_model=AdminSettingResponse)
def create_setting(
setting_data: AdminSettingCreate,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Create new platform setting.
Setting keys should be lowercase with underscores (e.g., max_vendors_allowed).
"""
result = admin_settings_service.create_setting(
db=db, setting_data=setting_data, admin_user_id=current_admin.id
)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="create_setting",
target_type="setting",
target_id=setting_data.key,
details={
"category": setting_data.category,
"value_type": setting_data.value_type,
},
)
db.commit()
return result
@router.put("/{key}", response_model=AdminSettingResponse)
def update_setting(
key: str,
update_data: AdminSettingUpdate,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Update existing setting value."""
old_value = admin_settings_service.get_setting_value(db, key)
result = admin_settings_service.update_setting(
db=db, key=key, update_data=update_data, admin_user_id=current_admin.id
)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="update_setting",
target_type="setting",
target_id=key,
details={"old_value": str(old_value), "new_value": update_data.value},
)
db.commit()
return result
@router.post("/upsert", response_model=AdminSettingResponse)
def upsert_setting(
setting_data: AdminSettingCreate,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Create or update setting (upsert).
If setting exists, updates its value. If not, creates new setting.
"""
result = admin_settings_service.upsert_setting(
db=db, setting_data=setting_data, admin_user_id=current_admin.id
)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="upsert_setting",
target_type="setting",
target_id=setting_data.key,
details={"category": setting_data.category},
)
db.commit()
return result
# ============================================================================
# CONVENIENCE ENDPOINTS FOR COMMON SETTINGS
# ============================================================================
@router.get("/display/rows-per-page", response_model=RowsPerPageResponse)
def get_rows_per_page(
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
) -> RowsPerPageResponse:
"""Get the platform-wide rows per page setting."""
value = admin_settings_service.get_setting_value(db, "rows_per_page", default="20")
return RowsPerPageResponse(rows_per_page=int(value))
@router.put("/display/rows-per-page", response_model=RowsPerPageUpdateResponse)
def set_rows_per_page(
rows: int = Query(..., ge=10, le=100, description="Rows per page (10-100)"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
) -> RowsPerPageUpdateResponse:
"""
Set the platform-wide rows per page setting.
Valid values: 10, 20, 50, 100
"""
valid_values = [10, 20, 50, 100]
if rows not in valid_values:
# Round to nearest valid value
rows = min(valid_values, key=lambda x: abs(x - rows))
setting_data = AdminSettingCreate(
key="rows_per_page",
value=str(rows),
value_type="integer",
category="display",
description="Default number of rows per page in admin tables",
is_public=True,
)
admin_settings_service.upsert_setting(
db=db, setting_data=setting_data, admin_user_id=current_admin.id
)
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="update_setting",
target_type="setting",
target_id="rows_per_page",
details={"value": rows},
)
db.commit()
return RowsPerPageUpdateResponse(
rows_per_page=rows, message="Rows per page setting updated"
)
@router.get("/display/public", response_model=PublicDisplaySettingsResponse)
def get_public_display_settings(
db: Session = Depends(get_db),
) -> PublicDisplaySettingsResponse:
"""
Get public display settings (no auth required).
Returns settings that can be used by frontend without admin auth.
"""
rows_per_page = admin_settings_service.get_setting_value(
db, "rows_per_page", default="20"
)
return PublicDisplaySettingsResponse(rows_per_page=int(rows_per_page))
@router.delete("/{key}")
def delete_setting(
key: str,
confirm: bool = Query(False, description="Must be true to confirm deletion"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Delete platform setting.
Requires confirmation parameter.
WARNING: Deleting settings may affect platform functionality.
"""
if not confirm:
raise ConfirmationRequiredException(
operation="delete_setting",
message="Deletion requires confirmation parameter: confirm=true",
)
message = admin_settings_service.delete_setting(
db=db, key=key, admin_user_id=current_admin.id
)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="delete_setting",
target_type="setting",
target_id=key,
details={},
)
db.commit()
return {"message": message}
# ============================================================================
# EMAIL CONFIGURATION ENDPOINTS
# ============================================================================
# Email setting keys stored in admin_settings table
EMAIL_SETTING_KEYS = {
"email_provider": "smtp",
"email_from_address": "",
"email_from_name": "",
"email_reply_to": "",
"smtp_host": "",
"smtp_port": "587",
"smtp_user": "",
"smtp_password": "",
"smtp_use_tls": "true",
"smtp_use_ssl": "false",
"sendgrid_api_key": "",
"mailgun_api_key": "",
"mailgun_domain": "",
"aws_access_key_id": "",
"aws_secret_access_key": "",
"aws_region": "eu-west-1",
"email_enabled": "true",
"email_debug": "false",
}
def get_email_setting(db: Session, key: str) -> str | None:
"""Get email setting from database, returns None if not set."""
setting = admin_settings_service.get_setting_by_key(db, key)
return setting.value if setting else None
def get_effective_email_config(db: Session) -> dict:
"""
Get effective email configuration.
Priority: Database settings > Environment variables
"""
config = {}
# Provider
db_provider = get_email_setting(db, "email_provider")
config["provider"] = db_provider if db_provider else app_settings.email_provider
# From settings
db_from_email = get_email_setting(db, "email_from_address")
config["from_email"] = db_from_email if db_from_email else app_settings.email_from_address
db_from_name = get_email_setting(db, "email_from_name")
config["from_name"] = db_from_name if db_from_name else app_settings.email_from_name
db_reply_to = get_email_setting(db, "email_reply_to")
config["reply_to"] = db_reply_to if db_reply_to else app_settings.email_reply_to
# SMTP settings
db_smtp_host = get_email_setting(db, "smtp_host")
config["smtp_host"] = db_smtp_host if db_smtp_host else app_settings.smtp_host
db_smtp_port = get_email_setting(db, "smtp_port")
config["smtp_port"] = int(db_smtp_port) if db_smtp_port else app_settings.smtp_port
db_smtp_user = get_email_setting(db, "smtp_user")
config["smtp_user"] = db_smtp_user if db_smtp_user else app_settings.smtp_user
db_smtp_password = get_email_setting(db, "smtp_password")
config["smtp_password"] = db_smtp_password if db_smtp_password else app_settings.smtp_password
db_smtp_use_tls = get_email_setting(db, "smtp_use_tls")
config["smtp_use_tls"] = db_smtp_use_tls.lower() in ("true", "1", "yes") if db_smtp_use_tls else app_settings.smtp_use_tls
db_smtp_use_ssl = get_email_setting(db, "smtp_use_ssl")
config["smtp_use_ssl"] = db_smtp_use_ssl.lower() in ("true", "1", "yes") if db_smtp_use_ssl else app_settings.smtp_use_ssl
# SendGrid
db_sendgrid_key = get_email_setting(db, "sendgrid_api_key")
config["sendgrid_api_key"] = db_sendgrid_key if db_sendgrid_key else app_settings.sendgrid_api_key
# Mailgun
db_mailgun_key = get_email_setting(db, "mailgun_api_key")
config["mailgun_api_key"] = db_mailgun_key if db_mailgun_key else app_settings.mailgun_api_key
db_mailgun_domain = get_email_setting(db, "mailgun_domain")
config["mailgun_domain"] = db_mailgun_domain if db_mailgun_domain else app_settings.mailgun_domain
# AWS SES
db_aws_key = get_email_setting(db, "aws_access_key_id")
config["aws_access_key_id"] = db_aws_key if db_aws_key else app_settings.aws_access_key_id
db_aws_secret = get_email_setting(db, "aws_secret_access_key")
config["aws_secret_access_key"] = db_aws_secret if db_aws_secret else app_settings.aws_secret_access_key
db_aws_region = get_email_setting(db, "aws_region")
config["aws_region"] = db_aws_region if db_aws_region else app_settings.aws_region
# Behavior
db_enabled = get_email_setting(db, "email_enabled")
config["enabled"] = db_enabled.lower() in ("true", "1", "yes") if db_enabled else app_settings.email_enabled
db_debug = get_email_setting(db, "email_debug")
config["debug"] = db_debug.lower() in ("true", "1", "yes") if db_debug else app_settings.email_debug
# Track source for each field (DB override or .env)
config["_sources"] = {}
for key in ["provider", "from_email", "from_name", "smtp_host", "smtp_port"]:
db_key = "email_provider" if key == "provider" else ("email_from_address" if key == "from_email" else ("email_from_name" if key == "from_name" else key))
config["_sources"][key] = "database" if get_email_setting(db, db_key) else "env"
return config
class EmailStatusResponse(BaseModel):
"""Platform email configuration status."""
provider: str
from_email: str
from_name: str
reply_to: str | None = None
smtp_host: str | None = None
smtp_port: int | None = None
smtp_user: str | None = None
mailgun_domain: str | None = None
aws_region: str | None = None
debug: bool
enabled: bool
is_configured: bool
has_db_overrides: bool = False
class EmailSettingsUpdate(BaseModel):
"""Update email settings."""
provider: str | None = None
from_email: EmailStr | None = None
from_name: str | None = None
reply_to: EmailStr | None = None
# SMTP
smtp_host: str | None = None
smtp_port: int | None = None
smtp_user: str | None = None
smtp_password: str | None = None
smtp_use_tls: bool | None = None
smtp_use_ssl: bool | None = None
# SendGrid
sendgrid_api_key: str | None = None
# Mailgun
mailgun_api_key: str | None = None
mailgun_domain: str | None = None
# AWS SES
aws_access_key_id: str | None = None
aws_secret_access_key: str | None = None
aws_region: str | None = None
# Behavior
enabled: bool | None = None
debug: bool | None = None
class TestEmailRequest(BaseModel):
"""Request body for test email."""
to_email: EmailStr
class TestEmailResponse(BaseModel):
"""Response for test email."""
success: bool
message: str
@router.get("/email/status", response_model=EmailStatusResponse)
def get_email_status(
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
) -> EmailStatusResponse:
"""
Get platform email configuration status.
Returns the effective email configuration (DB overrides > .env).
Sensitive values (passwords, API keys) are NOT exposed.
"""
config = get_effective_email_config(db)
provider = config["provider"].lower()
# Determine if email is configured based on provider
is_configured = False
if provider == "smtp":
is_configured = bool(config["smtp_host"] and config["smtp_host"] != "localhost")
elif provider == "sendgrid":
is_configured = bool(config["sendgrid_api_key"])
elif provider == "mailgun":
is_configured = bool(config["mailgun_api_key"] and config["mailgun_domain"])
elif provider == "ses":
is_configured = bool(config["aws_access_key_id"] and config["aws_secret_access_key"])
# Check if any DB overrides exist
has_db_overrides = any(v == "database" for v in config["_sources"].values())
return EmailStatusResponse(
provider=provider,
from_email=config["from_email"],
from_name=config["from_name"],
reply_to=config["reply_to"] or None,
smtp_host=config["smtp_host"] if provider == "smtp" else None,
smtp_port=config["smtp_port"] if provider == "smtp" else None,
smtp_user=config["smtp_user"] if provider == "smtp" else None,
mailgun_domain=config["mailgun_domain"] if provider == "mailgun" else None,
aws_region=config["aws_region"] if provider == "ses" else None,
debug=config["debug"],
enabled=config["enabled"],
is_configured=is_configured,
has_db_overrides=has_db_overrides,
)
@router.put("/email/settings")
def update_email_settings(
settings_update: EmailSettingsUpdate,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Update platform email settings.
Settings are stored in the database and override .env values.
Only non-null values are updated.
"""
from models.schema.admin import AdminSettingCreate
updated_keys = []
# Map request fields to database keys
field_mappings = {
"provider": ("email_provider", "string"),
"from_email": ("email_from_address", "string"),
"from_name": ("email_from_name", "string"),
"reply_to": ("email_reply_to", "string"),
"smtp_host": ("smtp_host", "string"),
"smtp_port": ("smtp_port", "integer"),
"smtp_user": ("smtp_user", "string"),
"smtp_password": ("smtp_password", "string"),
"smtp_use_tls": ("smtp_use_tls", "boolean"),
"smtp_use_ssl": ("smtp_use_ssl", "boolean"),
"sendgrid_api_key": ("sendgrid_api_key", "string"),
"mailgun_api_key": ("mailgun_api_key", "string"),
"mailgun_domain": ("mailgun_domain", "string"),
"aws_access_key_id": ("aws_access_key_id", "string"),
"aws_secret_access_key": ("aws_secret_access_key", "string"),
"aws_region": ("aws_region", "string"),
"enabled": ("email_enabled", "boolean"),
"debug": ("email_debug", "boolean"),
}
# Sensitive fields that should be marked as encrypted
sensitive_keys = {
"smtp_password", "sendgrid_api_key", "mailgun_api_key",
"aws_access_key_id", "aws_secret_access_key"
}
for field, (db_key, value_type) in field_mappings.items():
value = getattr(settings_update, field, None)
if value is not None:
# Convert value to string for storage
if value_type == "boolean":
str_value = "true" if value else "false"
elif value_type == "integer":
str_value = str(value)
else:
str_value = str(value)
# Create or update setting
setting_data = AdminSettingCreate(
key=db_key,
value=str_value,
value_type=value_type,
category="email",
description=f"Email setting: {field}",
is_encrypted=db_key in sensitive_keys,
is_public=False,
)
admin_settings_service.upsert_setting(db, setting_data, current_admin.id)
updated_keys.append(field)
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="update_email_settings",
target_type="email_settings",
target_id="platform",
details={"updated_keys": updated_keys},
)
db.commit()
logger.info(f"Email settings updated by admin {current_admin.id}: {updated_keys}")
return {
"success": True,
"message": f"Updated {len(updated_keys)} email setting(s)",
"updated_keys": updated_keys,
}
@router.delete("/email/settings")
def reset_email_settings(
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Reset email settings to use .env values.
Deletes all email settings from the database, reverting to .env configuration.
"""
deleted_count = 0
for key in EMAIL_SETTING_KEYS:
setting = admin_settings_service.get_setting_by_key(db, key)
if setting:
# Use service method for deletion (API-002 compliance)
admin_settings_service.delete_setting(db, key, current_admin.id)
deleted_count += 1
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="reset_email_settings",
target_type="email_settings",
target_id="platform",
details={"deleted_count": deleted_count},
)
db.commit()
logger.info(f"Email settings reset by admin {current_admin.id}, deleted {deleted_count} settings")
return {
"success": True,
"message": f"Reset {deleted_count} email setting(s) to .env defaults",
}
@router.post("/email/test", response_model=TestEmailResponse)
def send_test_email(
request: TestEmailRequest,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
) -> TestEmailResponse:
"""
Send a test email using the platform email configuration.
This tests the email provider configuration from environment variables.
"""
from app.services.email_service import EmailService
try:
email_service = EmailService(db)
# Send test email using platform configuration
success = email_service.send_raw(
to_email=request.to_email,
to_name=None,
subject="Wizamart Platform - Test Email",
body_html="""
<html>
<body style="font-family: Arial, sans-serif; padding: 20px;">
<h2 style="color: #6b46c1;">Test Email from Wizamart</h2>
<p>This is a test email to verify your platform email configuration.</p>
<p>If you received this email, your email settings are working correctly!</p>
<hr style="border: none; border-top: 1px solid #e5e7eb; margin: 20px 0;">
<p style="color: #6b7280; font-size: 12px;">
Provider: {provider}<br>
From: {from_email}
</p>
</body>
</html>
""".format(
provider=app_settings.email_provider,
from_email=app_settings.email_from_address,
),
body_text=f"Test email from Wizamart platform.\n\nProvider: {app_settings.email_provider}\nFrom: {app_settings.email_from_address}",
is_platform_email=True,
)
if success:
# Log action
admin_audit_service.log_action(
db=db,
admin_user_id=current_admin.id,
action="send_test_email",
target_type="email",
target_id=request.to_email,
details={"provider": app_settings.email_provider},
)
db.commit()
return TestEmailResponse(
success=True,
message=f"Test email sent to {request.to_email}",
)
else:
return TestEmailResponse(
success=False,
message="Failed to send test email. Check server logs for details.",
)
except Exception as e:
logger.error(f"Failed to send test email: {e}")
return TestEmailResponse(
success=False,
message=f"Error sending test email: {str(e)}",
)