# app/services/admin_service.py """ Admin service for managing users, vendors, and import jobs. This module provides classes and functions for: - User management and status control - Vendor creation with owner user generation - Vendor verification and activation - Marketplace import job monitoring - Platform statistics """ import logging import secrets import string from datetime import datetime, timezone from typing import List, Optional, Tuple from sqlalchemy.orm import Session from sqlalchemy import func, or_ from app.exceptions import ( UserNotFoundException, UserStatusChangeException, CannotModifySelfException, VendorNotFoundException, VendorAlreadyExistsException, VendorVerificationException, AdminOperationException, ValidationException, ) from models.schemas.marketplace_import_job import MarketplaceImportJobResponse from models.schemas.vendor import VendorCreate from models.database.marketplace_import_job import MarketplaceImportJob from models.database.vendor import Vendor, Role from models.database.user import User logger = logging.getLogger(__name__) class AdminService: """Service class for admin operations following the application's service pattern.""" # ============================================================================ # USER MANAGEMENT # ============================================================================ def get_all_users(self, db: Session, skip: int = 0, limit: int = 100) -> List[User]: """Get paginated list of all users.""" try: return db.query(User).offset(skip).limit(limit).all() except Exception as e: logger.error(f"Failed to retrieve users: {str(e)}") raise AdminOperationException( operation="get_all_users", reason="Database query failed" ) def toggle_user_status( self, db: Session, user_id: int, current_admin_id: int ) -> Tuple[User, str]: """Toggle user active status.""" user = self._get_user_by_id_or_raise(db, user_id) # Prevent self-modification if user.id == current_admin_id: raise CannotModifySelfException(user_id, "deactivate account") # Check if user is another admin if user.role == "admin" and user.id != current_admin_id: raise UserStatusChangeException( user_id=user_id, current_status="admin", attempted_action="toggle status", reason="Cannot modify another admin user" ) try: original_status = user.is_active user.is_active = not user.is_active user.updated_at = datetime.now(timezone.utc) db.commit() db.refresh(user) status_action = "activated" if user.is_active else "deactivated" message = f"User {user.username} has been {status_action}" logger.info(f"{message} by admin {current_admin_id}") return user, message except Exception as e: db.rollback() logger.error(f"Failed to toggle user {user_id} status: {str(e)}") raise UserStatusChangeException( user_id=user_id, current_status="active" if original_status else "inactive", attempted_action="toggle status", reason="Database update failed" ) # ============================================================================ # VENDOR MANAGEMENT # ============================================================================ def create_vendor_with_owner( self, db: Session, vendor_data: VendorCreate ) -> Tuple[Vendor, User, str]: """ Create vendor with owner user account. Returns: (vendor, owner_user, temporary_password) """ try: # Check if vendor code already exists existing_vendor = db.query(Vendor).filter( func.upper(Vendor.vendor_code) == vendor_data.vendor_code.upper() ).first() if existing_vendor: raise VendorAlreadyExistsException(vendor_data.vendor_code) # Check if subdomain already exists existing_subdomain = db.query(Vendor).filter( func.lower(Vendor.subdomain) == vendor_data.subdomain.lower() ).first() if existing_subdomain: raise ValidationException( f"Subdomain '{vendor_data.subdomain}' is already taken" ) # Generate temporary password for owner temp_password = self._generate_temp_password() # Create owner user from middleware.auth import AuthManager auth_manager = AuthManager() owner_username = f"{vendor_data.vendor_code.lower()}_owner" owner_email = vendor_data.owner_email if hasattr(vendor_data, 'owner_email') else f"{owner_username}@{vendor_data.subdomain}.com" # Check if user with this email already exists existing_user = db.query(User).filter( User.email == owner_email ).first() if existing_user: # Use existing user as owner owner_user = existing_user else: # Create new owner user owner_user = User( email=owner_email, username=owner_username, hashed_password=auth_manager.hash_password(temp_password), role="user", # Will be vendor owner through relationship is_active=True, ) db.add(owner_user) db.flush() # Get owner_user.id # Create vendor vendor = Vendor( vendor_code=vendor_data.vendor_code.upper(), subdomain=vendor_data.subdomain.lower(), name=vendor_data.name, description=getattr(vendor_data, 'description', None), owner_user_id=owner_user.id, contact_email=owner_email, contact_phone=getattr(vendor_data, 'contact_phone', None), website=getattr(vendor_data, 'website', None), business_address=getattr(vendor_data, 'business_address', None), tax_number=getattr(vendor_data, 'tax_number', None), letzshop_csv_url_fr=getattr(vendor_data, 'letzshop_csv_url_fr', None), letzshop_csv_url_en=getattr(vendor_data, 'letzshop_csv_url_en', None), letzshop_csv_url_de=getattr(vendor_data, 'letzshop_csv_url_de', None), theme_config=getattr(vendor_data, 'theme_config', {}), is_active=True, is_verified=True, ) db.add(vendor) db.flush() # Get vendor.id # Create default roles for vendor self._create_default_roles(db, vendor.id) db.commit() db.refresh(vendor) db.refresh(owner_user) logger.info( f"Vendor {vendor.vendor_code} created with owner {owner_user.username}" ) # TODO: Send welcome email to owner with credentials # self._send_vendor_welcome_email(owner_user, vendor, temp_password) return vendor, owner_user, temp_password except (VendorAlreadyExistsException, ValidationException): db.rollback() raise except Exception as e: db.rollback() logger.error(f"Failed to create vendor: {str(e)}") raise AdminOperationException( operation="create_vendor_with_owner", reason=f"Failed to create vendor: {str(e)}" ) def get_all_vendors( self, db: Session, skip: int = 0, limit: int = 100, search: Optional[str] = None, is_active: Optional[bool] = None, is_verified: Optional[bool] = None ) -> Tuple[List[Vendor], int]: """Get paginated list of all vendors with filtering.""" try: query = db.query(Vendor) # Apply search filter if search: search_term = f"%{search}%" query = query.filter( or_( Vendor.name.ilike(search_term), Vendor.vendor_code.ilike(search_term), Vendor.subdomain.ilike(search_term) ) ) # Apply status filters if is_active is not None: query = query.filter(Vendor.is_active == is_active) if is_verified is not None: query = query.filter(Vendor.is_verified == is_verified) total = query.count() vendors = query.offset(skip).limit(limit).all() return vendors, total except Exception as e: logger.error(f"Failed to retrieve vendors: {str(e)}") raise AdminOperationException( operation="get_all_vendors", reason="Database query failed" ) def get_vendor_by_id(self, db: Session, vendor_id: int) -> Vendor: """Get vendor by ID.""" return self._get_vendor_by_id_or_raise(db, vendor_id) def verify_vendor(self, db: Session, vendor_id: int) -> Tuple[Vendor, str]: """Toggle vendor verification status.""" vendor = self._get_vendor_by_id_or_raise(db, vendor_id) try: original_status = vendor.is_verified vendor.is_verified = not vendor.is_verified vendor.updated_at = datetime.now(timezone.utc) if vendor.is_verified: vendor.verified_at = datetime.now(timezone.utc) db.commit() db.refresh(vendor) status_action = "verified" if vendor.is_verified else "unverified" message = f"Vendor {vendor.vendor_code} has been {status_action}" logger.info(message) return vendor, message except Exception as e: db.rollback() logger.error(f"Failed to verify vendor {vendor_id}: {str(e)}") raise VendorVerificationException( vendor_id=vendor_id, reason="Database update failed", current_verification_status=original_status ) def toggle_vendor_status(self, db: Session, vendor_id: int) -> Tuple[Vendor, str]: """Toggle vendor active status.""" vendor = self._get_vendor_by_id_or_raise(db, vendor_id) try: original_status = vendor.is_active vendor.is_active = not vendor.is_active vendor.updated_at = datetime.now(timezone.utc) db.commit() db.refresh(vendor) status_action = "activated" if vendor.is_active else "deactivated" message = f"Vendor {vendor.vendor_code} has been {status_action}" logger.info(message) return vendor, message except Exception as e: db.rollback() logger.error(f"Failed to toggle vendor {vendor_id} status: {str(e)}") raise AdminOperationException( operation="toggle_vendor_status", reason="Database update failed", target_type="vendor", target_id=str(vendor_id) ) def delete_vendor(self, db: Session, vendor_id: int) -> str: """Delete vendor and all associated data.""" vendor = self._get_vendor_by_id_or_raise(db, vendor_id) try: vendor_code = vendor.vendor_code # TODO: Delete associated data in correct order # - Delete orders # - Delete customers # - Delete products # - Delete team members # - Delete roles # - Delete import jobs db.delete(vendor) db.commit() logger.warning(f"Vendor {vendor_code} and all associated data deleted") return f"Vendor {vendor_code} successfully deleted" except Exception as e: db.rollback() logger.error(f"Failed to delete vendor {vendor_id}: {str(e)}") raise AdminOperationException( operation="delete_vendor", reason="Database deletion failed" ) # ============================================================================ # MARKETPLACE IMPORT JOBS # ============================================================================ def get_marketplace_import_jobs( self, db: Session, marketplace: Optional[str] = None, vendor_name: Optional[str] = None, status: Optional[str] = None, skip: int = 0, limit: int = 100, ) -> List[MarketplaceImportJobResponse]: """Get filtered and paginated marketplace import jobs.""" try: query = db.query(MarketplaceImportJob) if marketplace: query = query.filter( MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%") ) if vendor_name: query = query.filter(MarketplaceImportJob.vendor_name.ilike(f"%{vendor_name}%")) if status: query = query.filter(MarketplaceImportJob.status == status) jobs = ( query.order_by(MarketplaceImportJob.created_at.desc()) .offset(skip) .limit(limit) .all() ) return [self._convert_job_to_response(job) for job in jobs] except Exception as e: logger.error(f"Failed to retrieve marketplace import jobs: {str(e)}") raise AdminOperationException( operation="get_marketplace_import_jobs", reason="Database query failed" ) # ============================================================================ # STATISTICS # ============================================================================ def get_user_statistics(self, db: Session) -> dict: """Get user statistics for admin dashboard.""" try: total_users = db.query(User).count() active_users = db.query(User).filter(User.is_active == True).count() inactive_users = total_users - active_users admin_users = db.query(User).filter(User.role == "admin").count() return { "total_users": total_users, "active_users": active_users, "inactive_users": inactive_users, "admin_users": admin_users, "activation_rate": (active_users / total_users * 100) if total_users > 0 else 0 } except Exception as e: logger.error(f"Failed to get user statistics: {str(e)}") raise AdminOperationException( operation="get_user_statistics", reason="Database query failed" ) def get_vendor_statistics(self, db: Session) -> dict: """Get vendor statistics for admin dashboard.""" try: total_vendors = db.query(Vendor).count() active_vendors = db.query(Vendor).filter(Vendor.is_active == True).count() verified_vendors = db.query(Vendor).filter(Vendor.is_verified == True).count() inactive_vendors = total_vendors - active_vendors return { "total_vendors": total_vendors, "active_vendors": active_vendors, "inactive_vendors": inactive_vendors, "verified_vendors": verified_vendors, "verification_rate": (verified_vendors / total_vendors * 100) if total_vendors > 0 else 0 } except Exception as e: logger.error(f"Failed to get vendor statistics: {str(e)}") raise AdminOperationException( operation="get_vendor_statistics", reason="Database query failed" ) def get_recent_vendors(self, db: Session, limit: int = 5) -> List[dict]: """Get recently created vendors.""" try: vendors = ( db.query(Vendor) .order_by(Vendor.created_at.desc()) .limit(limit) .all() ) return [ { "id": v.id, "vendor_code": v.vendor_code, "name": v.name, "subdomain": v.subdomain, "is_active": v.is_active, "is_verified": v.is_verified, "created_at": v.created_at } for v in vendors ] except Exception as e: logger.error(f"Failed to get recent vendors: {str(e)}") return [] def get_recent_import_jobs(self, db: Session, limit: int = 10) -> List[dict]: """Get recent marketplace import jobs.""" try: jobs = ( db.query(MarketplaceImportJob) .order_by(MarketplaceImportJob.created_at.desc()) .limit(limit) .all() ) return [ { "id": j.id, "marketplace": j.marketplace, "vendor_name": j.vendor_name, "status": j.status, "total_processed": j.total_processed or 0, "created_at": j.created_at } for j in jobs ] except Exception as e: logger.error(f"Failed to get recent import jobs: {str(e)}") return [] def get_product_statistics(self, db: Session) -> dict: """Get product statistics.""" # TODO: Implement when Product model is available return { "total_products": 0, "active_products": 0, "out_of_stock": 0 } def get_order_statistics(self, db: Session) -> dict: """Get order statistics.""" # TODO: Implement when Order model is available return { "total_orders": 0, "pending_orders": 0, "completed_orders": 0 } def get_import_statistics(self, db: Session) -> dict: """Get import job statistics.""" try: total = db.query(MarketplaceImportJob).count() completed = db.query(MarketplaceImportJob).filter( MarketplaceImportJob.status == "completed" ).count() failed = db.query(MarketplaceImportJob).filter( MarketplaceImportJob.status == "failed" ).count() return { "total_imports": total, "completed_imports": completed, "failed_imports": failed, "success_rate": (completed / total * 100) if total > 0 else 0 } except Exception as e: logger.error(f"Failed to get import statistics: {str(e)}") return {"total_imports": 0, "completed_imports": 0, "failed_imports": 0, "success_rate": 0} # ============================================================================ # PRIVATE HELPER METHODS # ============================================================================ def _get_user_by_id_or_raise(self, db: Session, user_id: int) -> User: """Get user by ID or raise UserNotFoundException.""" user = db.query(User).filter(User.id == user_id).first() if not user: raise UserNotFoundException(str(user_id)) return user def _get_vendor_by_id_or_raise(self, db: Session, vendor_id: int) -> Vendor: """Get vendor by ID or raise VendorNotFoundException.""" vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() if not vendor: raise VendorNotFoundException(str(vendor_id), identifier_type="id") return vendor def _generate_temp_password(self, length: int = 12) -> str: """Generate secure temporary password.""" alphabet = string.ascii_letters + string.digits + "!@#$%^&*" return ''.join(secrets.choice(alphabet) for _ in range(length)) def _create_default_roles(self, db: Session, vendor_id: int): """Create default roles for a new vendor.""" default_roles = [ { "name": "Owner", "permissions": ["*"] # Full access }, { "name": "Manager", "permissions": [ "products.*", "orders.*", "customers.view", "inventory.*", "team.view" ] }, { "name": "Editor", "permissions": [ "products.view", "products.edit", "orders.view", "inventory.view" ] }, { "name": "Viewer", "permissions": [ "products.view", "orders.view", "customers.view", "inventory.view" ] } ] for role_data in default_roles: role = Role( vendor_id=vendor_id, name=role_data["name"], permissions=role_data["permissions"] ) db.add(role) def _convert_job_to_response(self, job: MarketplaceImportJob) -> MarketplaceImportJobResponse: """Convert database model to response schema.""" return MarketplaceImportJobResponse( job_id=job.id, status=job.status, marketplace=job.marketplace, vendor_id=job.vendor.id if job.vendor else None, vendor_code=job.vendor.vendor_code if job.vendor else None, vendor_name=job.vendor_name, imported=job.imported_count or 0, updated=job.updated_count or 0, total_processed=job.total_processed or 0, error_count=job.error_count or 0, error_message=job.error_message, created_at=job.created_at, started_at=job.started_at, completed_at=job.completed_at, ) # Create service instance admin_service = AdminService()