Files
orion/app/services/admin_settings_service.py

336 lines
10 KiB
Python

# app/services/admin_settings_service.py
"""
Admin settings service for platform-wide configuration.
This module provides functions for:
- Managing platform settings
- Getting/setting configuration values
- Encrypting sensitive settings
"""
import logging
import json
from typing import Optional, List, Any, Dict
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from sqlalchemy import func
from models.database.admin import AdminSetting
from models.schema.admin import (
AdminSettingCreate,
AdminSettingResponse,
AdminSettingUpdate
)
from app.exceptions import (
AdminOperationException,
ValidationException,
ResourceNotFoundException
)
logger = logging.getLogger(__name__)
class AdminSettingsService:
"""Service for managing platform-wide settings."""
def get_setting_by_key(
self,
db: Session,
key: str
) -> Optional[AdminSetting]:
"""Get setting by key."""
try:
return db.query(AdminSetting).filter(
func.lower(AdminSetting.key) == key.lower()
).first()
except Exception as e:
logger.error(f"Failed to get setting {key}: {str(e)}")
return None
def get_setting_value(
self,
db: Session,
key: str,
default: Any = None
) -> Any:
"""
Get setting value with type conversion.
Args:
key: Setting key
default: Default value if setting doesn't exist
Returns:
Typed setting value
"""
setting = self.get_setting_by_key(db, key)
if not setting:
return default
# Convert value based on type
try:
if setting.value_type == "integer":
return int(setting.value)
elif setting.value_type == "float":
return float(setting.value)
elif setting.value_type == "boolean":
return setting.value.lower() in ('true', '1', 'yes')
elif setting.value_type == "json":
return json.loads(setting.value)
else:
return setting.value
except Exception as e:
logger.error(f"Failed to convert setting {key} value: {str(e)}")
return default
def get_all_settings(
self,
db: Session,
category: Optional[str] = None,
is_public: Optional[bool] = None
) -> List[AdminSettingResponse]:
"""Get all settings with optional filtering."""
try:
query = db.query(AdminSetting)
if category:
query = query.filter(AdminSetting.category == category)
if is_public is not None:
query = query.filter(AdminSetting.is_public == is_public)
settings = query.order_by(AdminSetting.category, AdminSetting.key).all()
return [
AdminSettingResponse.model_validate(setting)
for setting in settings
]
except Exception as e:
logger.error(f"Failed to get settings: {str(e)}")
raise AdminOperationException(
operation="get_all_settings",
reason="Database query failed"
)
def get_settings_by_category(
self,
db: Session,
category: str
) -> Dict[str, Any]:
"""
Get all settings in a category as a dictionary.
Returns:
Dictionary of key-value pairs
"""
settings = self.get_all_settings(db, category=category)
result = {}
for setting in settings:
# Convert value based on type
if setting.value_type == "integer":
result[setting.key] = int(setting.value)
elif setting.value_type == "float":
result[setting.key] = float(setting.value)
elif setting.value_type == "boolean":
result[setting.key] = setting.value.lower() in ('true', '1', 'yes')
elif setting.value_type == "json":
result[setting.key] = json.loads(setting.value)
else:
result[setting.key] = setting.value
return result
def create_setting(
self,
db: Session,
setting_data: AdminSettingCreate,
admin_user_id: int
) -> AdminSettingResponse:
"""Create new setting."""
try:
# Check if setting already exists
existing = self.get_setting_by_key(db, setting_data.key)
if existing:
raise ValidationException(
f"Setting with key '{setting_data.key}' already exists"
)
# Validate value based on type
self._validate_setting_value(setting_data.value, setting_data.value_type)
# TODO: Encrypt value if is_encrypted=True
value_to_store = setting_data.value
if setting_data.is_encrypted:
# value_to_store = self._encrypt_value(setting_data.value)
pass
setting = AdminSetting(
key=setting_data.key.lower(),
value=value_to_store,
value_type=setting_data.value_type,
category=setting_data.category,
description=setting_data.description,
is_encrypted=setting_data.is_encrypted,
is_public=setting_data.is_public,
last_modified_by_user_id=admin_user_id
)
db.add(setting)
db.commit()
db.refresh(setting)
logger.info(f"Setting '{setting.key}' created by admin {admin_user_id}")
return AdminSettingResponse.model_validate(setting)
except ValidationException:
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to create setting: {str(e)}")
raise AdminOperationException(
operation="create_setting",
reason="Database operation failed"
)
def update_setting(
self,
db: Session,
key: str,
update_data: AdminSettingUpdate,
admin_user_id: int
) -> AdminSettingResponse:
"""Update existing setting."""
setting = self.get_setting_by_key(db, key)
if not setting:
raise ResourceNotFoundException(
resource_type="setting",
identifier=key
)
try:
# Validate new value
self._validate_setting_value(update_data.value, setting.value_type)
# TODO: Encrypt value if needed
value_to_store = update_data.value
if setting.is_encrypted:
# value_to_store = self._encrypt_value(update_data.value)
pass
setting.value = value_to_store
if update_data.description is not None:
setting.description = update_data.description
setting.last_modified_by_user_id = admin_user_id
setting.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(setting)
logger.info(f"Setting '{setting.key}' updated by admin {admin_user_id}")
return AdminSettingResponse.model_validate(setting)
except ValidationException:
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to update setting {key}: {str(e)}")
raise AdminOperationException(
operation="update_setting",
reason="Database operation failed"
)
def upsert_setting(
self,
db: Session,
setting_data: AdminSettingCreate,
admin_user_id: int
) -> AdminSettingResponse:
"""Create or update setting (upsert)."""
existing = self.get_setting_by_key(db, setting_data.key)
if existing:
update_data = AdminSettingUpdate(
value=setting_data.value,
description=setting_data.description
)
return self.update_setting(db, setting_data.key, update_data, admin_user_id)
else:
return self.create_setting(db, setting_data, admin_user_id)
def delete_setting(
self,
db: Session,
key: str,
admin_user_id: int
) -> str:
"""Delete setting."""
setting = self.get_setting_by_key(db, key)
if not setting:
raise ResourceNotFoundException(
resource_type="setting",
identifier=key
)
try:
db.delete(setting)
db.commit()
logger.warning(f"Setting '{key}' deleted by admin {admin_user_id}")
return f"Setting '{key}' successfully deleted"
except Exception as e:
db.rollback()
logger.error(f"Failed to delete setting {key}: {str(e)}")
raise AdminOperationException(
operation="delete_setting",
reason="Database operation failed"
)
# ============================================================================
# HELPER METHODS
# ============================================================================
def _validate_setting_value(self, value: str, value_type: str):
"""Validate setting value matches declared type."""
try:
if value_type == "integer":
int(value)
elif value_type == "float":
float(value)
elif value_type == "boolean":
if value.lower() not in ('true', 'false', '1', '0', 'yes', 'no'):
raise ValueError("Invalid boolean value")
elif value_type == "json":
json.loads(value)
except Exception as e:
raise ValidationException(
f"Value '{value}' is not valid for type '{value_type}': {str(e)}"
)
def _encrypt_value(self, value: str) -> str:
"""Encrypt sensitive setting value."""
# TODO: Implement encryption using Fernet or similar
# from cryptography.fernet import Fernet
# return encrypted_value
return value
def _decrypt_value(self, encrypted_value: str) -> str:
"""Decrypt sensitive setting value."""
# TODO: Implement decryption
return encrypted_value
# Create service instance
admin_settings_service = AdminSettingsService()