Files
orion/app/modules/customers/services/admin_customer_service.py
Samir Boulahtit 86e85a98b8
Some checks failed
CI / ruff (push) Successful in 9s
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / pytest (push) Has been cancelled
refactor(arch): eliminate all cross-module model imports in service layer
Enforce MOD-025/MOD-026 rules: zero top-level cross-module model imports
remain in any service file. All 66 files migrated using deferred import
patterns (method-body, _get_model() helpers, instance-cached self._Model)
and new cross-module service methods in tenancy. Documentation updated
with Pattern 6 (deferred imports), migration plan marked complete, and
violations status reflects 84→0 service-layer violations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 06:13:15 +01:00

249 lines
7.8 KiB
Python

# app/modules/customers/services/admin_customer_service.py
"""
Admin customer management service.
Handles customer operations for admin users across all stores.
"""
import logging
from typing import Any
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.modules.customers.exceptions import CustomerNotFoundException
from app.modules.customers.models import Customer
logger = logging.getLogger(__name__)
class AdminCustomerService:
"""Service for admin-level customer management across stores."""
def list_customers(
self,
db: Session,
store_id: int | None = None,
search: str | None = None,
is_active: bool | None = None,
skip: int = 0,
limit: int = 20,
) -> tuple[list[dict[str, Any]], int]:
"""
Get paginated list of customers across all stores.
Args:
db: Database session
store_id: Optional store ID filter
search: Search by email, name, or customer number
is_active: Filter by active status
skip: Number of records to skip
limit: Maximum records to return
Returns:
Tuple of (customers list, total count)
"""
from app.modules.tenancy.services.store_service import store_service
# Build query
query = db.query(Customer)
# Apply filters
if store_id:
query = query.filter(Customer.store_id == store_id)
if search:
search_term = f"%{search}%"
query = query.filter(
(Customer.email.ilike(search_term))
| (Customer.first_name.ilike(search_term))
| (Customer.last_name.ilike(search_term))
| (Customer.customer_number.ilike(search_term))
)
if is_active is not None:
query = query.filter(Customer.is_active == is_active)
# Get total count
total = query.count()
# Get paginated results
customers = (
query.order_by(Customer.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
# Batch-resolve store names
store_ids = {c.store_id for c in customers}
store_map = {}
for sid in store_ids:
store = store_service.get_store_by_id_optional(db, sid)
if store:
store_map[sid] = (store.name, store.store_code)
# Format response
result = []
for customer in customers:
store_name, store_code = store_map.get(customer.store_id, (None, None))
customer_dict = {
"id": customer.id,
"store_id": customer.store_id,
"email": customer.email,
"first_name": customer.first_name,
"last_name": customer.last_name,
"phone": customer.phone,
"customer_number": customer.customer_number,
"marketing_consent": customer.marketing_consent,
"preferred_language": customer.preferred_language,
"last_order_date": customer.last_order_date,
"total_orders": customer.total_orders,
"total_spent": float(customer.total_spent) if customer.total_spent else 0,
"is_active": customer.is_active,
"created_at": customer.created_at,
"updated_at": customer.updated_at,
"store_name": store_name,
"store_code": store_code,
}
result.append(customer_dict)
return result, total
def get_customer_stats(
self,
db: Session,
store_id: int | None = None,
) -> dict[str, Any]:
"""
Get customer statistics.
Args:
db: Database session
store_id: Optional store ID filter
Returns:
Dict with customer statistics
"""
query = db.query(Customer)
if store_id:
query = query.filter(Customer.store_id == store_id)
total = query.count()
active = query.filter(Customer.is_active == True).count() # noqa: E712
inactive = query.filter(Customer.is_active == False).count() # noqa: E712
with_orders = query.filter(Customer.total_orders > 0).count()
# Total spent across all customers
total_spent_result = query.with_entities(func.sum(Customer.total_spent)).scalar()
total_spent = float(total_spent_result) if total_spent_result else 0
# Average order value
total_orders_result = query.with_entities(func.sum(Customer.total_orders)).scalar()
total_orders = int(total_orders_result) if total_orders_result else 0
avg_order_value = total_spent / total_orders if total_orders > 0 else 0
return {
"total": total,
"active": active,
"inactive": inactive,
"with_orders": with_orders,
"total_spent": total_spent,
"total_orders": total_orders,
"avg_order_value": round(avg_order_value, 2),
}
def get_customer(
self,
db: Session,
customer_id: int,
) -> dict[str, Any]:
"""
Get customer details by ID.
Args:
db: Database session
customer_id: Customer ID
Returns:
Customer dict with store info
Raises:
CustomerNotFoundException: If customer not found
"""
from app.modules.tenancy.services.store_service import store_service
customer = (
db.query(Customer)
.filter(Customer.id == customer_id)
.first()
)
if not customer:
raise CustomerNotFoundException(str(customer_id))
store = store_service.get_store_by_id_optional(db, customer.store_id)
return {
"id": customer.id,
"store_id": customer.store_id,
"email": customer.email,
"first_name": customer.first_name,
"last_name": customer.last_name,
"phone": customer.phone,
"customer_number": customer.customer_number,
"marketing_consent": customer.marketing_consent,
"preferred_language": customer.preferred_language,
"last_order_date": customer.last_order_date,
"total_orders": customer.total_orders,
"total_spent": float(customer.total_spent) if customer.total_spent else 0,
"is_active": customer.is_active,
"created_at": customer.created_at,
"updated_at": customer.updated_at,
"store_name": store.name if store else None,
"store_code": store.store_code if store else None,
}
def toggle_customer_status(
self,
db: Session,
customer_id: int,
admin_email: str,
) -> dict[str, Any]:
"""
Toggle customer active status.
Args:
db: Database session
customer_id: Customer ID
admin_email: Admin user email for logging
Returns:
Dict with customer ID, new status, and message
Raises:
CustomerNotFoundException: If customer not found
"""
customer = db.query(Customer).filter(Customer.id == customer_id).first()
if not customer:
raise CustomerNotFoundException(str(customer_id))
customer.is_active = not customer.is_active
db.flush()
db.refresh(customer)
status = "activated" if customer.is_active else "deactivated"
logger.info(f"Customer {customer.email} {status} by admin {admin_email}")
return {
"id": customer.id,
"is_active": customer.is_active,
"message": f"Customer {status} successfully",
}
# Singleton instance
admin_customer_service = AdminCustomerService()