# app/modules/customers/services/admin_customer_service.py """ Admin customer management service. Handles customer operations for admin users across all stores. """ import logging from typing import Any from sqlalchemy.orm import Session from app.modules.customers.exceptions import CustomerNotFoundException from app.modules.customers.models import Customer logger = logging.getLogger(__name__) class AdminCustomerService: """Service for admin-level customer management across stores.""" def list_customers( self, db: Session, store_id: int | None = None, search: str | None = None, is_active: bool | None = None, skip: int = 0, limit: int = 20, ) -> tuple[list[dict[str, Any]], int]: """ Get paginated list of customers across all stores. Args: db: Database session store_id: Optional store ID filter search: Search by email, name, or customer number is_active: Filter by active status skip: Number of records to skip limit: Maximum records to return Returns: Tuple of (customers list, total count) """ from app.modules.tenancy.services.store_service import store_service # Build query query = db.query(Customer) # Apply filters if store_id: query = query.filter(Customer.store_id == store_id) if search: search_term = f"%{search}%" query = query.filter( (Customer.email.ilike(search_term)) | (Customer.first_name.ilike(search_term)) | (Customer.last_name.ilike(search_term)) | (Customer.customer_number.ilike(search_term)) ) if is_active is not None: query = query.filter(Customer.is_active == is_active) # Get total count total = query.count() # Get paginated results customers = ( query.order_by(Customer.created_at.desc()) .offset(skip) .limit(limit) .all() ) # Batch-resolve store names store_ids = {c.store_id for c in customers} store_map = {} for sid in store_ids: store = store_service.get_store_by_id_optional(db, sid) if store: store_map[sid] = (store.name, store.store_code) # Format response result = [] for customer in customers: store_name, store_code = store_map.get(customer.store_id, (None, None)) customer_dict = { "id": customer.id, "store_id": customer.store_id, "email": customer.email, "first_name": customer.first_name, "last_name": customer.last_name, "phone": customer.phone, "customer_number": customer.customer_number, "marketing_consent": customer.marketing_consent, "preferred_language": customer.preferred_language, "is_active": customer.is_active, "created_at": customer.created_at, "updated_at": customer.updated_at, "store_name": store_name, "store_code": store_code, } result.append(customer_dict) return result, total def get_customer_stats( self, db: Session, store_id: int | None = None, ) -> dict[str, Any]: """ Get customer statistics. Args: db: Database session store_id: Optional store ID filter Returns: Dict with customer statistics """ query = db.query(Customer) if store_id: query = query.filter(Customer.store_id == store_id) total = query.count() active = query.filter(Customer.is_active == True).count() # noqa: E712 inactive = query.filter(Customer.is_active == False).count() # noqa: E712 return { "total": total, "active": active, "inactive": inactive, } def get_customer( self, db: Session, customer_id: int, ) -> dict[str, Any]: """ Get customer details by ID. Args: db: Database session customer_id: Customer ID Returns: Customer dict with store info Raises: CustomerNotFoundException: If customer not found """ from app.modules.tenancy.services.store_service import store_service customer = ( db.query(Customer) .filter(Customer.id == customer_id) .first() ) if not customer: raise CustomerNotFoundException(str(customer_id)) store = store_service.get_store_by_id_optional(db, customer.store_id) return { "id": customer.id, "store_id": customer.store_id, "email": customer.email, "first_name": customer.first_name, "last_name": customer.last_name, "phone": customer.phone, "customer_number": customer.customer_number, "marketing_consent": customer.marketing_consent, "preferred_language": customer.preferred_language, "is_active": customer.is_active, "created_at": customer.created_at, "updated_at": customer.updated_at, "store_name": store.name if store else None, "store_code": store.store_code if store else None, } def toggle_customer_status( self, db: Session, customer_id: int, admin_email: str, ) -> dict[str, Any]: """ Toggle customer active status. Args: db: Database session customer_id: Customer ID admin_email: Admin user email for logging Returns: Dict with customer ID, new status, and message Raises: CustomerNotFoundException: If customer not found """ customer = db.query(Customer).filter(Customer.id == customer_id).first() if not customer: raise CustomerNotFoundException(str(customer_id)) customer.is_active = not customer.is_active db.flush() db.refresh(customer) status = "activated" if customer.is_active else "deactivated" logger.info(f"Customer {customer.email} {status} by admin {admin_email}") return { "id": customer.id, "is_active": customer.is_active, "message": f"Customer {status} successfully", } # Singleton instance admin_customer_service = AdminCustomerService()