# app/modules/customers/services/admin_customer_service.py """ Admin customer management service. Handles customer operations for admin users across all stores. """ import logging from typing import Any from sqlalchemy import func from sqlalchemy.orm import Session from app.modules.customers.exceptions import CustomerNotFoundException from app.modules.customers.models import Customer logger = logging.getLogger(__name__) class AdminCustomerService: """Service for admin-level customer management across stores.""" def list_customers( self, db: Session, store_id: int | None = None, search: str | None = None, is_active: bool | None = None, skip: int = 0, limit: int = 20, ) -> tuple[list[dict[str, Any]], int]: """ Get paginated list of customers across all stores. Args: db: Database session store_id: Optional store ID filter search: Search by email, name, or customer number is_active: Filter by active status skip: Number of records to skip limit: Maximum records to return Returns: Tuple of (customers list, total count) """ from app.modules.tenancy.services.store_service import store_service # Build query query = db.query(Customer) # Apply filters if store_id: query = query.filter(Customer.store_id == store_id) if search: search_term = f"%{search}%" query = query.filter( (Customer.email.ilike(search_term)) | (Customer.first_name.ilike(search_term)) | (Customer.last_name.ilike(search_term)) | (Customer.customer_number.ilike(search_term)) ) if is_active is not None: query = query.filter(Customer.is_active == is_active) # Get total count total = query.count() # Get paginated results customers = ( query.order_by(Customer.created_at.desc()) .offset(skip) .limit(limit) .all() ) # Batch-resolve store names store_ids = {c.store_id for c in customers} store_map = {} for sid in store_ids: store = store_service.get_store_by_id_optional(db, sid) if store: store_map[sid] = (store.name, store.store_code) # Format response result = [] for customer in customers: store_name, store_code = store_map.get(customer.store_id, (None, None)) customer_dict = { "id": customer.id, "store_id": customer.store_id, "email": customer.email, "first_name": customer.first_name, "last_name": customer.last_name, "phone": customer.phone, "customer_number": customer.customer_number, "marketing_consent": customer.marketing_consent, "preferred_language": customer.preferred_language, "last_order_date": customer.last_order_date, "total_orders": customer.total_orders, "total_spent": float(customer.total_spent) if customer.total_spent else 0, "is_active": customer.is_active, "created_at": customer.created_at, "updated_at": customer.updated_at, "store_name": store_name, "store_code": store_code, } result.append(customer_dict) return result, total def get_customer_stats( self, db: Session, store_id: int | None = None, ) -> dict[str, Any]: """ Get customer statistics. Args: db: Database session store_id: Optional store ID filter Returns: Dict with customer statistics """ query = db.query(Customer) if store_id: query = query.filter(Customer.store_id == store_id) total = query.count() active = query.filter(Customer.is_active == True).count() # noqa: E712 inactive = query.filter(Customer.is_active == False).count() # noqa: E712 with_orders = query.filter(Customer.total_orders > 0).count() # Total spent across all customers total_spent_result = query.with_entities(func.sum(Customer.total_spent)).scalar() total_spent = float(total_spent_result) if total_spent_result else 0 # Average order value total_orders_result = query.with_entities(func.sum(Customer.total_orders)).scalar() total_orders = int(total_orders_result) if total_orders_result else 0 avg_order_value = total_spent / total_orders if total_orders > 0 else 0 return { "total": total, "active": active, "inactive": inactive, "with_orders": with_orders, "total_spent": total_spent, "total_orders": total_orders, "avg_order_value": round(avg_order_value, 2), } def get_customer( self, db: Session, customer_id: int, ) -> dict[str, Any]: """ Get customer details by ID. Args: db: Database session customer_id: Customer ID Returns: Customer dict with store info Raises: CustomerNotFoundException: If customer not found """ from app.modules.tenancy.services.store_service import store_service customer = ( db.query(Customer) .filter(Customer.id == customer_id) .first() ) if not customer: raise CustomerNotFoundException(str(customer_id)) store = store_service.get_store_by_id_optional(db, customer.store_id) return { "id": customer.id, "store_id": customer.store_id, "email": customer.email, "first_name": customer.first_name, "last_name": customer.last_name, "phone": customer.phone, "customer_number": customer.customer_number, "marketing_consent": customer.marketing_consent, "preferred_language": customer.preferred_language, "last_order_date": customer.last_order_date, "total_orders": customer.total_orders, "total_spent": float(customer.total_spent) if customer.total_spent else 0, "is_active": customer.is_active, "created_at": customer.created_at, "updated_at": customer.updated_at, "store_name": store.name if store else None, "store_code": store.store_code if store else None, } def toggle_customer_status( self, db: Session, customer_id: int, admin_email: str, ) -> dict[str, Any]: """ Toggle customer active status. Args: db: Database session customer_id: Customer ID admin_email: Admin user email for logging Returns: Dict with customer ID, new status, and message Raises: CustomerNotFoundException: If customer not found """ customer = db.query(Customer).filter(Customer.id == customer_id).first() if not customer: raise CustomerNotFoundException(str(customer_id)) customer.is_active = not customer.is_active db.flush() db.refresh(customer) status = "activated" if customer.is_active else "deactivated" logger.info(f"Customer {customer.email} {status} by admin {admin_email}") return { "id": customer.id, "is_active": customer.is_active, "message": f"Customer {status} successfully", } # Singleton instance admin_customer_service = AdminCustomerService()