# app/services/admin_settings_service.py """ Admin settings service for platform-wide configuration. This module provides functions for: - Managing platform settings - Getting/setting configuration values - Encrypting sensitive settings """ import logging import json from typing import Optional, List, Any, Dict from datetime import datetime, timezone from sqlalchemy.orm import Session from sqlalchemy import func from models.database.admin import AdminSetting from models.schema.admin import ( AdminSettingCreate, AdminSettingResponse, AdminSettingUpdate ) from app.exceptions import ( AdminOperationException, ValidationException, ResourceNotFoundException ) logger = logging.getLogger(__name__) class AdminSettingsService: """Service for managing platform-wide settings.""" def get_setting_by_key( self, db: Session, key: str ) -> Optional[AdminSetting]: """Get setting by key.""" try: return db.query(AdminSetting).filter( func.lower(AdminSetting.key) == key.lower() ).first() except Exception as e: logger.error(f"Failed to get setting {key}: {str(e)}") return None def get_setting_value( self, db: Session, key: str, default: Any = None ) -> Any: """ Get setting value with type conversion. Args: key: Setting key default: Default value if setting doesn't exist Returns: Typed setting value """ setting = self.get_setting_by_key(db, key) if not setting: return default # Convert value based on type try: if setting.value_type == "integer": return int(setting.value) elif setting.value_type == "float": return float(setting.value) elif setting.value_type == "boolean": return setting.value.lower() in ('true', '1', 'yes') elif setting.value_type == "json": return json.loads(setting.value) else: return setting.value except Exception as e: logger.error(f"Failed to convert setting {key} value: {str(e)}") return default def get_all_settings( self, db: Session, category: Optional[str] = None, is_public: Optional[bool] = None ) -> List[AdminSettingResponse]: """Get all settings with optional filtering.""" try: query = db.query(AdminSetting) if category: query = query.filter(AdminSetting.category == category) if is_public is not None: query = query.filter(AdminSetting.is_public == is_public) settings = query.order_by(AdminSetting.category, AdminSetting.key).all() return [ AdminSettingResponse.model_validate(setting) for setting in settings ] except Exception as e: logger.error(f"Failed to get settings: {str(e)}") raise AdminOperationException( operation="get_all_settings", reason="Database query failed" ) def get_settings_by_category( self, db: Session, category: str ) -> Dict[str, Any]: """ Get all settings in a category as a dictionary. Returns: Dictionary of key-value pairs """ settings = self.get_all_settings(db, category=category) result = {} for setting in settings: # Convert value based on type if setting.value_type == "integer": result[setting.key] = int(setting.value) elif setting.value_type == "float": result[setting.key] = float(setting.value) elif setting.value_type == "boolean": result[setting.key] = setting.value.lower() in ('true', '1', 'yes') elif setting.value_type == "json": result[setting.key] = json.loads(setting.value) else: result[setting.key] = setting.value return result def create_setting( self, db: Session, setting_data: AdminSettingCreate, admin_user_id: int ) -> AdminSettingResponse: """Create new setting.""" try: # Check if setting already exists existing = self.get_setting_by_key(db, setting_data.key) if existing: raise ValidationException( f"Setting with key '{setting_data.key}' already exists" ) # Validate value based on type self._validate_setting_value(setting_data.value, setting_data.value_type) # TODO: Encrypt value if is_encrypted=True value_to_store = setting_data.value if setting_data.is_encrypted: # value_to_store = self._encrypt_value(setting_data.value) pass setting = AdminSetting( key=setting_data.key.lower(), value=value_to_store, value_type=setting_data.value_type, category=setting_data.category, description=setting_data.description, is_encrypted=setting_data.is_encrypted, is_public=setting_data.is_public, last_modified_by_user_id=admin_user_id ) db.add(setting) db.commit() db.refresh(setting) logger.info(f"Setting '{setting.key}' created by admin {admin_user_id}") return AdminSettingResponse.model_validate(setting) except ValidationException: db.rollback() raise except Exception as e: db.rollback() logger.error(f"Failed to create setting: {str(e)}") raise AdminOperationException( operation="create_setting", reason="Database operation failed" ) def update_setting( self, db: Session, key: str, update_data: AdminSettingUpdate, admin_user_id: int ) -> AdminSettingResponse: """Update existing setting.""" setting = self.get_setting_by_key(db, key) if not setting: raise ResourceNotFoundException( resource_type="setting", identifier=key ) try: # Validate new value self._validate_setting_value(update_data.value, setting.value_type) # TODO: Encrypt value if needed value_to_store = update_data.value if setting.is_encrypted: # value_to_store = self._encrypt_value(update_data.value) pass setting.value = value_to_store if update_data.description is not None: setting.description = update_data.description setting.last_modified_by_user_id = admin_user_id setting.updated_at = datetime.now(timezone.utc) db.commit() db.refresh(setting) logger.info(f"Setting '{setting.key}' updated by admin {admin_user_id}") return AdminSettingResponse.model_validate(setting) except ValidationException: db.rollback() raise except Exception as e: db.rollback() logger.error(f"Failed to update setting {key}: {str(e)}") raise AdminOperationException( operation="update_setting", reason="Database operation failed" ) def upsert_setting( self, db: Session, setting_data: AdminSettingCreate, admin_user_id: int ) -> AdminSettingResponse: """Create or update setting (upsert).""" existing = self.get_setting_by_key(db, setting_data.key) if existing: update_data = AdminSettingUpdate( value=setting_data.value, description=setting_data.description ) return self.update_setting(db, setting_data.key, update_data, admin_user_id) else: return self.create_setting(db, setting_data, admin_user_id) def delete_setting( self, db: Session, key: str, admin_user_id: int ) -> str: """Delete setting.""" setting = self.get_setting_by_key(db, key) if not setting: raise ResourceNotFoundException( resource_type="setting", identifier=key ) try: db.delete(setting) db.commit() logger.warning(f"Setting '{key}' deleted by admin {admin_user_id}") return f"Setting '{key}' successfully deleted" except Exception as e: db.rollback() logger.error(f"Failed to delete setting {key}: {str(e)}") raise AdminOperationException( operation="delete_setting", reason="Database operation failed" ) # ============================================================================ # HELPER METHODS # ============================================================================ def _validate_setting_value(self, value: str, value_type: str): """Validate setting value matches declared type.""" try: if value_type == "integer": int(value) elif value_type == "float": float(value) elif value_type == "boolean": if value.lower() not in ('true', 'false', '1', '0', 'yes', 'no'): raise ValueError("Invalid boolean value") elif value_type == "json": json.loads(value) except Exception as e: raise ValidationException( f"Value '{value}' is not valid for type '{value_type}': {str(e)}" ) def _encrypt_value(self, value: str) -> str: """Encrypt sensitive setting value.""" # TODO: Implement encryption using Fernet or similar # from cryptography.fernet import Fernet # return encrypted_value return value def _decrypt_value(self, encrypted_value: str) -> str: """Decrypt sensitive setting value.""" # TODO: Implement decryption return encrypted_value # Create service instance admin_settings_service = AdminSettingsService()