# app/services/admin_service.py """ Admin service for managing users, shops, and import jobs. This module provides classes and functions for: - User management and status control - Shop verification and activation - Marketplace import job monitoring """ import logging from datetime import datetime from typing import List, Optional, Tuple from sqlalchemy.orm import Session from app.exceptions import ( UserNotFoundException, UserStatusChangeException, CannotModifySelfException, ShopNotFoundException, ShopVerificationException, AdminOperationException, ) from models.schemas.marketplace import MarketplaceImportJobResponse from models.database.marketplace import MarketplaceImportJob from models.database.shop import Shop from models.database.user import User logger = logging.getLogger(__name__) class AdminService: """Service class for admin operations following the application's service pattern.""" def get_all_users(self, db: Session, skip: int = 0, limit: int = 100) -> List[User]: """Get paginated list of all users.""" try: return db.query(User).offset(skip).limit(limit).all() except Exception as e: logger.error(f"Failed to retrieve users: {str(e)}") raise AdminOperationException( operation="get_all_users", reason="Database query failed" ) def toggle_user_status( self, db: Session, user_id: int, current_admin_id: int ) -> Tuple[User, str]: """ Toggle user active status. Args: db: Database session user_id: ID of user to toggle current_admin_id: ID of the admin performing the action Returns: Tuple of (updated_user, status_message) Raises: UserNotFoundException: If user not found CannotModifySelfException: If trying to modify own account UserStatusChangeException: If status change is not allowed """ user = self._get_user_by_id_or_raise(db, user_id) # Prevent self-modification if user.id == current_admin_id: raise CannotModifySelfException(user_id, "deactivate account") # Check if user is another admin - FIXED LOGIC if user.role == "admin" and user.id != current_admin_id: raise UserStatusChangeException( user_id=user_id, current_status="admin", attempted_action="toggle status", reason="Cannot modify another admin user" ) try: original_status = user.is_active user.is_active = not user.is_active user.updated_at = datetime.utcnow() db.commit() db.refresh(user) status_action = "activated" if user.is_active else "deactivated" message = f"User {user.username} has been {status_action}" logger.info(f"{message} by admin {current_admin_id}") return user, message except Exception as e: db.rollback() logger.error(f"Failed to toggle user {user_id} status: {str(e)}") raise UserStatusChangeException( user_id=user_id, current_status="active" if original_status else "inactive", attempted_action="toggle status", reason="Database update failed" ) def get_all_shops( self, db: Session, skip: int = 0, limit: int = 100 ) -> Tuple[List[Shop], int]: """ Get paginated list of all shops with total count. Args: db: Database session skip: Number of records to skip limit: Maximum number of records to return Returns: Tuple of (shops_list, total_count) """ try: total = db.query(Shop).count() shops = db.query(Shop).offset(skip).limit(limit).all() return shops, total except Exception as e: logger.error(f"Failed to retrieve shops: {str(e)}") raise AdminOperationException( operation="get_all_shops", reason="Database query failed" ) def verify_shop(self, db: Session, shop_id: int) -> Tuple[Shop, str]: """ Toggle shop verification status. Args: db: Database session shop_id: ID of shop to verify/unverify Returns: Tuple of (updated_shop, status_message) Raises: ShopNotFoundException: If shop not found ShopVerificationException: If verification fails """ shop = self._get_shop_by_id_or_raise(db, shop_id) try: original_status = shop.is_verified shop.is_verified = not shop.is_verified shop.updated_at = datetime.utcnow() # Add verification timestamp if implementing audit trail if shop.is_verified: shop.verified_at = datetime.utcnow() db.commit() db.refresh(shop) status_action = "verified" if shop.is_verified else "unverified" message = f"Shop {shop.shop_code} has been {status_action}" logger.info(message) return shop, message except Exception as e: db.rollback() logger.error(f"Failed to verify shop {shop_id}: {str(e)}") raise ShopVerificationException( shop_id=shop_id, reason="Database update failed", current_verification_status=original_status ) def toggle_shop_status(self, db: Session, shop_id: int) -> Tuple[Shop, str]: """ Toggle shop active status. Args: db: Database session shop_id: ID of shop to activate/deactivate Returns: Tuple of (updated_shop, status_message) Raises: ShopNotFoundException: If shop not found AdminOperationException: If status change fails """ shop = self._get_shop_by_id_or_raise(db, shop_id) try: original_status = shop.is_active shop.is_active = not shop.is_active shop.updated_at = datetime.utcnow() db.commit() db.refresh(shop) status_action = "activated" if shop.is_active else "deactivated" message = f"Shop {shop.shop_code} has been {status_action}" logger.info(message) return shop, message except Exception as e: db.rollback() logger.error(f"Failed to toggle shop {shop_id} status: {str(e)}") raise AdminOperationException( operation="toggle_shop_status", reason="Database update failed", target_type="shop", target_id=str(shop_id) ) def get_marketplace_import_jobs( self, db: Session, marketplace: Optional[str] = None, shop_name: Optional[str] = None, status: Optional[str] = None, skip: int = 0, limit: int = 100, ) -> List[MarketplaceImportJobResponse]: """ Get filtered and paginated marketplace import jobs. Args: db: Database session marketplace: Filter by marketplace name (case-insensitive partial match) shop_name: Filter by shop name (case-insensitive partial match) status: Filter by exact status skip: Number of records to skip limit: Maximum number of records to return Returns: List of MarketplaceImportJobResponse objects """ try: query = db.query(MarketplaceImportJob) # Apply filters if marketplace: query = query.filter( MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%") ) if shop_name: query = query.filter(MarketplaceImportJob.shop_name.ilike(f"%{shop_name}%")) if status: query = query.filter(MarketplaceImportJob.status == status) # Order by creation date and apply pagination jobs = ( query.order_by(MarketplaceImportJob.created_at.desc()) .offset(skip) .limit(limit) .all() ) return [self._convert_job_to_response(job) for job in jobs] except Exception as e: logger.error(f"Failed to retrieve marketplace import jobs: {str(e)}") raise AdminOperationException( operation="get_marketplace_import_jobs", reason="Database query failed" ) def get_user_statistics(self, db: Session) -> dict: """Get user statistics for admin dashboard.""" try: total_users = db.query(User).count() active_users = db.query(User).filter(User.is_active == True).count() inactive_users = total_users - active_users return { "total_users": total_users, "active_users": active_users, "inactive_users": inactive_users, "activation_rate": (active_users / total_users * 100) if total_users > 0 else 0 } except Exception as e: logger.error(f"Failed to get user statistics: {str(e)}") raise AdminOperationException( operation="get_user_statistics", reason="Database query failed" ) def get_shop_statistics(self, db: Session) -> dict: """Get shop statistics for admin dashboard.""" try: total_shops = db.query(Shop).count() active_shops = db.query(Shop).filter(Shop.is_active == True).count() verified_shops = db.query(Shop).filter(Shop.is_verified == True).count() return { "total_shops": total_shops, "active_shops": active_shops, "verified_shops": verified_shops, "verification_rate": (verified_shops / total_shops * 100) if total_shops > 0 else 0 } except Exception as e: logger.error(f"Failed to get shop statistics: {str(e)}") raise AdminOperationException( operation="get_shop_statistics", reason="Database query failed" ) # Private helper methods def _get_user_by_id_or_raise(self, db: Session, user_id: int) -> User: """Get user by ID or raise UserNotFoundException.""" user = db.query(User).filter(User.id == user_id).first() if not user: raise UserNotFoundException(str(user_id)) return user def _get_shop_by_id_or_raise(self, db: Session, shop_id: int) -> Shop: """Get shop by ID or raise ShopNotFoundException.""" shop = db.query(Shop).filter(Shop.id == shop_id).first() if not shop: raise ShopNotFoundException(str(shop_id), identifier_type="id") return shop def _convert_job_to_response(self, job: MarketplaceImportJob) -> MarketplaceImportJobResponse: """Convert database model to response schema.""" return MarketplaceImportJobResponse( job_id=job.id, status=job.status, marketplace=job.marketplace, shop_id=job.shop.id if job.shop else None, shop_code=job.shop.shop_code if job.shop else None, shop_name=job.shop_name, imported=job.imported_count or 0, updated=job.updated_count or 0, total_processed=job.total_processed or 0, error_count=job.error_count or 0, error_message=job.error_message, created_at=job.created_at, started_at=job.started_at, completed_at=job.completed_at, ) # Legacy methods for backward compatibility (mark as deprecated) def get_user_by_id(self, db: Session, user_id: int) -> Optional[User]: """Get user by ID. DEPRECATED: Use _get_user_by_id_or_raise instead.""" logger.warning("get_user_by_id is deprecated, use proper exception handling") return db.query(User).filter(User.id == user_id).first() def get_shop_by_id(self, db: Session, shop_id: int) -> Optional[Shop]: """Get shop by ID. DEPRECATED: Use _get_shop_by_id_or_raise instead.""" logger.warning("get_shop_by_id is deprecated, use proper exception handling") return db.query(Shop).filter(Shop.id == shop_id).first() def user_exists(self, db: Session, user_id: int) -> bool: """Check if user exists by ID. DEPRECATED: Use proper exception handling.""" logger.warning("user_exists is deprecated, use proper exception handling") return db.query(User).filter(User.id == user_id).first() is not None def shop_exists(self, db: Session, shop_id: int) -> bool: """Check if shop exists by ID. DEPRECATED: Use proper exception handling.""" logger.warning("shop_exists is deprecated, use proper exception handling") return db.query(Shop).filter(Shop.id == shop_id).first() is not None # Create service instance following the same pattern as product_service admin_service = AdminService()