Files
orion/app/modules/customers/services/admin_customer_service.py
Samir Boulahtit a77a8a3a98
All checks were successful
CI / ruff (push) Successful in 12s
CI / pytest (push) Successful in 50m57s
CI / validate (push) Successful in 24s
CI / dependency-scanning (push) Successful in 29s
CI / docs (push) Successful in 40s
CI / deploy (push) Successful in 51s
feat: multi-module improvements across merchant, store, i18n, and customer systems
- Fix platform-grouped merchant sidebar menu with core items at root level
- Add merchant store management (detail page, create store, team page)
- Fix store settings 500 error by removing dead stripe/API tab
- Move onboarding translations to module-owned locale files
- Fix onboarding banner i18n with server-side rendering + context inheritance
- Refactor login language selectors to use languageSelector() function (LANG-002)
- Move HTTPException handling to global exception handler in merchant routes (API-003)
- Add language selector to all login pages and portal headers
- Fix customer module: drop order stats from customer model, add to orders module
- Fix admin menu config visibility for super admin platform context
- Fix storefront auth and layout issues
- Add missing i18n translations for onboarding steps (en/fr/de/lb)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 23:48:25 +01:00

228 lines
6.7 KiB
Python

# app/modules/customers/services/admin_customer_service.py
"""
Admin customer management service.
Handles customer operations for admin users across all stores.
"""
import logging
from typing import Any
from sqlalchemy.orm import Session
from app.modules.customers.exceptions import CustomerNotFoundException
from app.modules.customers.models import Customer
logger = logging.getLogger(__name__)
class AdminCustomerService:
"""Service for admin-level customer management across stores."""
def list_customers(
self,
db: Session,
store_id: int | None = None,
search: str | None = None,
is_active: bool | None = None,
skip: int = 0,
limit: int = 20,
) -> tuple[list[dict[str, Any]], int]:
"""
Get paginated list of customers across all stores.
Args:
db: Database session
store_id: Optional store ID filter
search: Search by email, name, or customer number
is_active: Filter by active status
skip: Number of records to skip
limit: Maximum records to return
Returns:
Tuple of (customers list, total count)
"""
from app.modules.tenancy.services.store_service import store_service
# Build query
query = db.query(Customer)
# Apply filters
if store_id:
query = query.filter(Customer.store_id == store_id)
if search:
search_term = f"%{search}%"
query = query.filter(
(Customer.email.ilike(search_term))
| (Customer.first_name.ilike(search_term))
| (Customer.last_name.ilike(search_term))
| (Customer.customer_number.ilike(search_term))
)
if is_active is not None:
query = query.filter(Customer.is_active == is_active)
# Get total count
total = query.count()
# Get paginated results
customers = (
query.order_by(Customer.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
# Batch-resolve store names
store_ids = {c.store_id for c in customers}
store_map = {}
for sid in store_ids:
store = store_service.get_store_by_id_optional(db, sid)
if store:
store_map[sid] = (store.name, store.store_code)
# Format response
result = []
for customer in customers:
store_name, store_code = store_map.get(customer.store_id, (None, None))
customer_dict = {
"id": customer.id,
"store_id": customer.store_id,
"email": customer.email,
"first_name": customer.first_name,
"last_name": customer.last_name,
"phone": customer.phone,
"customer_number": customer.customer_number,
"marketing_consent": customer.marketing_consent,
"preferred_language": customer.preferred_language,
"is_active": customer.is_active,
"created_at": customer.created_at,
"updated_at": customer.updated_at,
"store_name": store_name,
"store_code": store_code,
}
result.append(customer_dict)
return result, total
def get_customer_stats(
self,
db: Session,
store_id: int | None = None,
) -> dict[str, Any]:
"""
Get customer statistics.
Args:
db: Database session
store_id: Optional store ID filter
Returns:
Dict with customer statistics
"""
query = db.query(Customer)
if store_id:
query = query.filter(Customer.store_id == store_id)
total = query.count()
active = query.filter(Customer.is_active == True).count() # noqa: E712
inactive = query.filter(Customer.is_active == False).count() # noqa: E712
return {
"total": total,
"active": active,
"inactive": inactive,
}
def get_customer(
self,
db: Session,
customer_id: int,
) -> dict[str, Any]:
"""
Get customer details by ID.
Args:
db: Database session
customer_id: Customer ID
Returns:
Customer dict with store info
Raises:
CustomerNotFoundException: If customer not found
"""
from app.modules.tenancy.services.store_service import store_service
customer = (
db.query(Customer)
.filter(Customer.id == customer_id)
.first()
)
if not customer:
raise CustomerNotFoundException(str(customer_id))
store = store_service.get_store_by_id_optional(db, customer.store_id)
return {
"id": customer.id,
"store_id": customer.store_id,
"email": customer.email,
"first_name": customer.first_name,
"last_name": customer.last_name,
"phone": customer.phone,
"customer_number": customer.customer_number,
"marketing_consent": customer.marketing_consent,
"preferred_language": customer.preferred_language,
"is_active": customer.is_active,
"created_at": customer.created_at,
"updated_at": customer.updated_at,
"store_name": store.name if store else None,
"store_code": store.store_code if store else None,
}
def toggle_customer_status(
self,
db: Session,
customer_id: int,
admin_email: str,
) -> dict[str, Any]:
"""
Toggle customer active status.
Args:
db: Database session
customer_id: Customer ID
admin_email: Admin user email for logging
Returns:
Dict with customer ID, new status, and message
Raises:
CustomerNotFoundException: If customer not found
"""
customer = db.query(Customer).filter(Customer.id == customer_id).first()
if not customer:
raise CustomerNotFoundException(str(customer_id))
customer.is_active = not customer.is_active
db.flush()
db.refresh(customer)
status = "activated" if customer.is_active else "deactivated"
logger.info(f"Customer {customer.email} {status} by admin {admin_email}")
return {
"id": customer.id,
"is_active": customer.is_active,
"message": f"Customer {status} successfully",
}
# Singleton instance
admin_customer_service = AdminCustomerService()