Files
orion/app/modules/customers/services/customer_service.py
Samir Boulahtit 8c0967e215
Some checks failed
CI / ruff (push) Successful in 11s
CI / pytest (push) Failing after 44m18s
CI / validate (push) Successful in 23s
CI / dependency-scanning (push) Successful in 28s
CI / docs (push) Has been skipped
CI / deploy (push) Has been skipped
fix(arch): resolve all 14 architecture validation warnings
- Add missing module dependency declarations (IMPORT-002): analytics
  requires catalog/inventory/marketplace/orders, orders requires
  marketplace, inventory requires orders
- Replace broad except Exception with specific types (EXC-003):
  StoreNotFoundException in auth_service, PlatformNotFoundException in
  admin_subscription_service, SQLAlchemyError in customer_service
- Use number_stepper macro in loyalty program-edit template (FE-008)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 06:24:57 +01:00

671 lines
20 KiB
Python

# app/modules/customers/services/customer_service.py
"""
Customer management service.
Handles customer registration, authentication, and profile management
with complete store isolation.
"""
import logging
from datetime import UTC, datetime, timedelta
from typing import Any
from sqlalchemy import and_
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.modules.core.services.auth_service import AuthService
from app.modules.customers.exceptions import (
CustomerNotActiveException,
CustomerNotFoundException,
CustomerValidationException,
DuplicateCustomerEmailException,
InvalidCustomerCredentialsException,
InvalidPasswordResetTokenException,
PasswordTooShortException,
)
from app.modules.customers.models import Customer, PasswordResetToken
from app.modules.customers.schemas import CustomerRegister, CustomerUpdate
from app.modules.tenancy.exceptions import (
StoreNotActiveException,
StoreNotFoundException,
)
from app.modules.tenancy.services.store_service import store_service as _store_service
logger = logging.getLogger(__name__)
class CustomerService:
"""Service for managing store-scoped customers."""
def __init__(self):
self.auth_service = AuthService()
def register_customer(
self, db: Session, store_id: int, customer_data: CustomerRegister
) -> Customer:
"""
Register a new customer for a specific store.
Args:
db: Database session
store_id: Store ID
customer_data: Customer registration data
Returns:
Customer: Created customer object
Raises:
StoreNotFoundException: If store doesn't exist
StoreNotActiveException: If store is not active
DuplicateCustomerEmailException: If email already exists for this store
CustomerValidationException: If customer data is invalid
"""
# Verify store exists and is active
store = _store_service.get_store_by_id_optional(db, store_id)
if not store:
raise StoreNotFoundException(str(store_id), identifier_type="id")
if not store.is_active:
raise StoreNotActiveException(store.store_code)
# Check if email already exists for this store
existing_customer = (
db.query(Customer)
.filter(
and_(
Customer.store_id == store_id,
Customer.email == customer_data.email.lower(),
)
)
.first()
)
if existing_customer:
raise DuplicateCustomerEmailException(
customer_data.email, store.store_code
)
# Generate unique customer number for this store
customer_number = self._generate_customer_number(
db, store_id, store.store_code
)
# Hash password
hashed_password = self.auth_service.hash_password(customer_data.password)
# Create customer
customer = Customer(
store_id=store_id,
email=customer_data.email.lower(),
hashed_password=hashed_password,
first_name=customer_data.first_name,
last_name=customer_data.last_name,
phone=customer_data.phone,
customer_number=customer_number,
marketing_consent=(
customer_data.marketing_consent
if hasattr(customer_data, "marketing_consent")
else False
),
is_active=True,
)
try:
db.add(customer)
db.flush()
db.refresh(customer)
logger.info(
f"Customer registered successfully: {customer.email} "
f"(ID: {customer.id}, Number: {customer.customer_number}) "
f"for store {store.store_code}"
)
return customer
except SQLAlchemyError as e:
logger.error(f"Error registering customer: {str(e)}")
raise CustomerValidationException(
message="Failed to register customer", details={"error": str(e)}
)
def login_customer(
self, db: Session, store_id: int, credentials
) -> dict[str, Any]:
"""
Authenticate customer and generate JWT token.
Args:
db: Database session
store_id: Store ID
credentials: Login credentials (UserLogin schema)
Returns:
Dict containing customer and token data
Raises:
StoreNotFoundException: If store doesn't exist
InvalidCustomerCredentialsException: If credentials are invalid
CustomerNotActiveException: If customer account is inactive
"""
# Verify store exists
store = _store_service.get_store_by_id_optional(db, store_id)
if not store:
raise StoreNotFoundException(str(store_id), identifier_type="id")
# Find customer by email (store-scoped)
customer = (
db.query(Customer)
.filter(
and_(
Customer.store_id == store_id,
Customer.email == credentials.email_or_username.lower(),
)
)
.first()
)
if not customer:
raise InvalidCustomerCredentialsException()
# Verify password using auth_manager directly
if not self.auth_service.auth_manager.verify_password(
credentials.password, customer.hashed_password
):
raise InvalidCustomerCredentialsException()
# Check if customer is active
if not customer.is_active:
raise CustomerNotActiveException(customer.email)
# Generate JWT token with customer context
from jose import jwt
auth_manager = self.auth_service.auth_manager
expires_delta = timedelta(minutes=auth_manager.token_expire_minutes)
expire = datetime.now(UTC) + expires_delta
payload = {
"sub": str(customer.id),
"email": customer.email,
"store_id": store_id,
"type": "customer",
"exp": expire,
"iat": datetime.now(UTC),
}
token = jwt.encode(
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
)
token_data = {
"access_token": token,
"token_type": "bearer",
"expires_in": auth_manager.token_expire_minutes * 60,
}
logger.info(
f"Customer login successful: {customer.email} "
f"for store {store.store_code}"
)
return {"customer": customer, "token_data": token_data}
def get_customer(self, db: Session, store_id: int, customer_id: int) -> Customer:
"""
Get customer by ID with store isolation.
Args:
db: Database session
store_id: Store ID
customer_id: Customer ID
Returns:
Customer: Customer object
Raises:
CustomerNotFoundException: If customer not found
"""
customer = (
db.query(Customer)
.filter(and_(Customer.id == customer_id, Customer.store_id == store_id))
.first()
)
if not customer:
raise CustomerNotFoundException(str(customer_id))
return customer
def get_customer_by_email(
self, db: Session, store_id: int, email: str
) -> Customer | None:
"""
Get customer by email (store-scoped).
Args:
db: Database session
store_id: Store ID
email: Customer email
Returns:
Optional[Customer]: Customer object or None
"""
return (
db.query(Customer)
.filter(
and_(Customer.store_id == store_id, Customer.email == email.lower())
)
.first()
)
def get_store_customers(
self,
db: Session,
store_id: int,
skip: int = 0,
limit: int = 100,
search: str | None = None,
is_active: bool | None = None,
) -> tuple[list[Customer], int]:
"""
Get all customers for a store with filtering and pagination.
Args:
db: Database session
store_id: Store ID
skip: Pagination offset
limit: Pagination limit
search: Search in name/email
is_active: Filter by active status
Returns:
Tuple of (customers, total_count)
"""
from sqlalchemy import or_
query = db.query(Customer).filter(Customer.store_id == store_id)
if search:
search_pattern = f"%{search}%"
query = query.filter(
or_(
Customer.email.ilike(search_pattern),
Customer.first_name.ilike(search_pattern),
Customer.last_name.ilike(search_pattern),
Customer.customer_number.ilike(search_pattern),
)
)
if is_active is not None:
query = query.filter(Customer.is_active == is_active)
# Order by most recent first
query = query.order_by(Customer.created_at.desc())
total = query.count()
customers = query.offset(skip).limit(limit).all()
return customers, total
# Note: Customer order methods have been moved to the orders module.
# Use orders.services.customer_order_service for:
# - get_customer_orders()
# Use orders.services.order_metrics.get_customer_order_metrics() for:
# - customer order statistics
def toggle_customer_status(
self, db: Session, store_id: int, customer_id: int
) -> Customer:
"""
Toggle customer active status.
Args:
db: Database session
store_id: Store ID
customer_id: Customer ID
Returns:
Customer: Updated customer
"""
customer = self.get_customer(db, store_id, customer_id)
customer.is_active = not customer.is_active
db.flush()
db.refresh(customer)
action = "activated" if customer.is_active else "deactivated"
logger.info(f"Customer {action}: {customer.email} (ID: {customer.id})")
return customer
def update_customer(
self,
db: Session,
store_id: int,
customer_id: int,
customer_data: CustomerUpdate,
) -> Customer:
"""
Update customer profile.
Args:
db: Database session
store_id: Store ID
customer_id: Customer ID
customer_data: Updated customer data
Returns:
Customer: Updated customer object
Raises:
CustomerNotFoundException: If customer not found
CustomerValidationException: If update data is invalid
"""
customer = self.get_customer(db, store_id, customer_id)
# Update fields
update_data = customer_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
if field == "email" and value:
# Check if new email already exists for this store
existing = (
db.query(Customer)
.filter(
and_(
Customer.store_id == store_id,
Customer.email == value.lower(),
Customer.id != customer_id,
)
)
.first()
)
if existing:
raise DuplicateCustomerEmailException(value, "store")
setattr(customer, field, value.lower())
elif hasattr(customer, field):
setattr(customer, field, value)
try:
db.flush()
db.refresh(customer)
logger.info(f"Customer updated: {customer.email} (ID: {customer.id})")
return customer
except SQLAlchemyError as e:
logger.error(f"Error updating customer: {str(e)}")
raise CustomerValidationException(
message="Failed to update customer", details={"error": str(e)}
)
def deactivate_customer(
self, db: Session, store_id: int, customer_id: int
) -> Customer:
"""
Deactivate customer account.
Args:
db: Database session
store_id: Store ID
customer_id: Customer ID
Returns:
Customer: Deactivated customer object
Raises:
CustomerNotFoundException: If customer not found
"""
customer = self.get_customer(db, store_id, customer_id)
customer.is_active = False
db.flush()
db.refresh(customer)
logger.info(f"Customer deactivated: {customer.email} (ID: {customer.id})")
return customer
def update_customer_stats(
self, db: Session, customer_id: int, order_total: float
) -> None:
"""
Update customer statistics after order.
Args:
db: Database session
customer_id: Customer ID
order_total: Order total amount
"""
customer = db.query(Customer).filter(Customer.id == customer_id).first()
if customer:
customer.total_orders += 1
customer.total_spent += order_total
customer.last_order_date = datetime.utcnow()
logger.debug(f"Updated stats for customer {customer.email}")
def _generate_customer_number(
self, db: Session, store_id: int, store_code: str
) -> str:
"""
Generate unique customer number for store.
Format: {STORE_CODE}-CUST-{SEQUENCE}
Example: STOREA-CUST-00001
Args:
db: Database session
store_id: Store ID
store_code: Store code
Returns:
str: Unique customer number
"""
# Get count of customers for this store
count = db.query(Customer).filter(Customer.store_id == store_id).count()
# Generate number with padding
sequence = str(count + 1).zfill(5)
customer_number = f"{store_code.upper()}-CUST-{sequence}"
# Ensure uniqueness (in case of deletions)
while (
db.query(Customer)
.filter(
and_(
Customer.store_id == store_id,
Customer.customer_number == customer_number,
)
)
.first()
):
count += 1
sequence = str(count + 1).zfill(5)
customer_number = f"{store_code.upper()}-CUST-{sequence}"
return customer_number
def get_customer_for_password_reset(
self, db: Session, store_id: int, email: str
) -> Customer | None:
"""
Get active customer by email for password reset.
Args:
db: Database session
store_id: Store ID
email: Customer email
Returns:
Customer if found and active, None otherwise
"""
return (
db.query(Customer)
.filter(
Customer.store_id == store_id,
Customer.email == email.lower(),
Customer.is_active == True, # noqa: E712
)
.first()
)
def validate_and_reset_password(
self,
db: Session,
store_id: int,
reset_token: str,
new_password: str,
) -> Customer:
"""
Validate reset token and update customer password.
Args:
db: Database session
store_id: Store ID
reset_token: Password reset token from email
new_password: New password
Returns:
Customer: Updated customer
Raises:
PasswordTooShortException: If password too short
InvalidPasswordResetTokenException: If token invalid/expired
CustomerNotActiveException: If customer not active
"""
# Validate password length
if len(new_password) < 8:
raise PasswordTooShortException(min_length=8)
# Find valid token
token_record = PasswordResetToken.find_valid_token(db, reset_token)
if not token_record:
raise InvalidPasswordResetTokenException()
# Get the customer and verify they belong to this store
customer = (
db.query(Customer)
.filter(Customer.id == token_record.customer_id)
.first()
)
if not customer or customer.store_id != store_id:
raise InvalidPasswordResetTokenException()
if not customer.is_active:
raise CustomerNotActiveException(customer.email)
# Hash the new password and update customer
hashed_password = self.auth_service.hash_password(new_password)
customer.hashed_password = hashed_password
# Mark token as used
token_record.mark_used(db)
logger.info(f"Password reset completed for customer {customer.id}") # noqa: SEC021
return customer
# ========================================================================
# Cross-module public API methods
# ========================================================================
def create_customer_for_enrollment(
self,
db: Session,
store_id: int,
email: str,
first_name: str = "",
last_name: str = "",
phone: str | None = None,
) -> Customer:
"""
Create a customer for loyalty/external enrollment.
Creates a customer with an unusable password hash.
Args:
db: Database session
store_id: Store ID
email: Customer email
first_name: First name
last_name: Last name
phone: Phone number
Returns:
Created Customer object
"""
import secrets
unusable_hash = f"!enrollment!{secrets.token_hex(32)}"
store_code = "STORE"
try:
from app.modules.tenancy.services.store_service import store_service
store = store_service.get_store_by_id_optional(db, store_id)
if store:
store_code = store.store_code
except SQLAlchemyError:
pass
cust_number = self._generate_customer_number(db, store_id, store_code)
customer = Customer(
email=email,
first_name=first_name,
last_name=last_name,
phone=phone,
hashed_password=unusable_hash,
customer_number=cust_number,
store_id=store_id,
is_active=True,
)
db.add(customer)
db.flush()
return customer
def get_customer_by_id(self, db: Session, customer_id: int) -> Customer | None:
"""
Get customer by ID without store scope.
Args:
db: Database session
customer_id: Customer ID
Returns:
Customer object or None
"""
return db.query(Customer).filter(Customer.id == customer_id).first()
def get_store_customer_count(self, db: Session, store_id: int) -> int:
"""
Count customers for a store.
Args:
db: Database session
store_id: Store ID
Returns:
Customer count
"""
from sqlalchemy import func
return (
db.query(func.count(Customer.id))
.filter(Customer.store_id == store_id)
.scalar()
or 0
)
# Singleton instance
customer_service = CustomerService()