Files
orion/app/services/customer_service.py
Samir Boulahtit 6735d99df2 feat: implement customer authentication with JWT tokens
Implement secure customer authentication system with dedicated JWT tokens,
separate from admin/vendor authentication.

Backend Changes:
- Add customer JWT token support in deps.py
  - New get_current_customer_from_cookie_or_header dependency
  - Validates customer-specific tokens with type checking
  - Returns Customer object instead of User for shop routes
- Extend AuthService with customer token support
  - Add verify_password() method
  - Add create_access_token_with_data() for custom token payloads
- Update CustomerService authentication
  - Generate customer-specific JWT tokens with type="customer"
  - Use vendor-scoped customer lookup
- Enhance exception handler
  - Sanitize validation errors to prevent password leaks in logs
  - Fix shop login redirect to support multi-access routing
- Improve vendor context detection from Referer header
  - Consistent "path" detection method for cookie path logic

Schema Changes:
- Rename UserLogin.username to email_or_username for flexibility
- Update field validators accordingly

API Changes:
- Update admin/vendor auth endpoints to use email_or_username
- Customer auth already uses email field correctly

Route Changes:
- Update shop account routes to use Customer dependency
- Add /account redirect (without trailing slash)
- Change parameter names from current_user to current_customer

Frontend Changes:
- Update login forms to use email_or_username in API calls
- Change button text from "Log in" to "Sign in" for consistency
- Improve loading spinner layout with flexbox

Security Improvements:
- Customer tokens scoped to vendor_id
- Token type validation prevents cross-context token usage
- Password inputs redacted from validation error logs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-25 21:08:49 +01:00

424 lines
12 KiB
Python

# app/services/customer_service.py
"""
Customer management service.
Handles customer registration, authentication, and profile management
with complete vendor isolation.
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import and_
from models.database.customer import Customer, CustomerAddress
from models.database.vendor import Vendor
from models.schema.customer import CustomerRegister, CustomerUpdate
from models.schema.auth import UserLogin
from app.exceptions.customer import (
CustomerNotFoundException,
CustomerAlreadyExistsException,
CustomerNotActiveException,
InvalidCustomerCredentialsException,
CustomerValidationException,
DuplicateCustomerEmailException
)
from app.exceptions.vendor import VendorNotFoundException, VendorNotActiveException
from app.services.auth_service import AuthService
logger = logging.getLogger(__name__)
class CustomerService:
"""Service for managing vendor-scoped customers."""
def __init__(self):
self.auth_service = AuthService()
def register_customer(
self,
db: Session,
vendor_id: int,
customer_data: CustomerRegister
) -> Customer:
"""
Register a new customer for a specific vendor.
Args:
db: Database session
vendor_id: Vendor ID
customer_data: Customer registration data
Returns:
Customer: Created customer object
Raises:
VendorNotFoundException: If vendor doesn't exist
VendorNotActiveException: If vendor is not active
DuplicateCustomerEmailException: If email already exists for this vendor
CustomerValidationException: If customer data is invalid
"""
# Verify vendor exists and is active
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
if not vendor.is_active:
raise VendorNotActiveException(vendor.vendor_code)
# Check if email already exists for this vendor
existing_customer = db.query(Customer).filter(
and_(
Customer.vendor_id == vendor_id,
Customer.email == customer_data.email.lower()
)
).first()
if existing_customer:
raise DuplicateCustomerEmailException(customer_data.email, vendor.vendor_code)
# Generate unique customer number for this vendor
customer_number = self._generate_customer_number(db, vendor_id, vendor.vendor_code)
# Hash password
hashed_password = self.auth_service.hash_password(customer_data.password)
# Create customer
customer = Customer(
vendor_id=vendor_id,
email=customer_data.email.lower(),
hashed_password=hashed_password,
first_name=customer_data.first_name,
last_name=customer_data.last_name,
phone=customer_data.phone,
customer_number=customer_number,
marketing_consent=customer_data.marketing_consent if hasattr(customer_data, 'marketing_consent') else False,
is_active=True
)
try:
db.add(customer)
db.commit()
db.refresh(customer)
logger.info(
f"Customer registered successfully: {customer.email} "
f"(ID: {customer.id}, Number: {customer.customer_number}) "
f"for vendor {vendor.vendor_code}"
)
return customer
except Exception as e:
db.rollback()
logger.error(f"Error registering customer: {str(e)}")
raise CustomerValidationException(
message="Failed to register customer",
details={"error": str(e)}
)
def login_customer(
self,
db: Session,
vendor_id: int,
credentials: UserLogin
) -> Dict[str, Any]:
"""
Authenticate customer and generate JWT token.
Args:
db: Database session
vendor_id: Vendor ID
credentials: Login credentials
Returns:
Dict containing customer and token data
Raises:
VendorNotFoundException: If vendor doesn't exist
InvalidCustomerCredentialsException: If credentials are invalid
CustomerNotActiveException: If customer account is inactive
"""
# Verify vendor exists
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
# Find customer by email (vendor-scoped)
customer = db.query(Customer).filter(
and_(
Customer.vendor_id == vendor_id,
Customer.email == credentials.email_or_username.lower()
)
).first()
if not customer:
raise InvalidCustomerCredentialsException()
# Verify password using auth_manager directly
if not self.auth_service.auth_manager.verify_password(
credentials.password,
customer.hashed_password
):
raise InvalidCustomerCredentialsException()
# Check if customer is active
if not customer.is_active:
raise CustomerNotActiveException(customer.email)
# Generate JWT token with customer context
# Use auth_manager directly since Customer is not a User model
from datetime import datetime, timedelta, timezone
from jose import jwt
auth_manager = self.auth_service.auth_manager
expires_delta = timedelta(minutes=auth_manager.token_expire_minutes)
expire = datetime.now(timezone.utc) + expires_delta
payload = {
"sub": str(customer.id),
"email": customer.email,
"vendor_id": vendor_id,
"type": "customer",
"exp": expire,
"iat": datetime.now(timezone.utc),
}
token = jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
token_data = {
"access_token": token,
"token_type": "bearer",
"expires_in": auth_manager.token_expire_minutes * 60,
}
logger.info(
f"Customer login successful: {customer.email} "
f"for vendor {vendor.vendor_code}"
)
return {
"customer": customer,
"token_data": token_data
}
def get_customer(
self,
db: Session,
vendor_id: int,
customer_id: int
) -> Customer:
"""
Get customer by ID with vendor isolation.
Args:
db: Database session
vendor_id: Vendor ID
customer_id: Customer ID
Returns:
Customer: Customer object
Raises:
CustomerNotFoundException: If customer not found
"""
customer = db.query(Customer).filter(
and_(
Customer.id == customer_id,
Customer.vendor_id == vendor_id
)
).first()
if not customer:
raise CustomerNotFoundException(str(customer_id))
return customer
def get_customer_by_email(
self,
db: Session,
vendor_id: int,
email: str
) -> Optional[Customer]:
"""
Get customer by email (vendor-scoped).
Args:
db: Database session
vendor_id: Vendor ID
email: Customer email
Returns:
Optional[Customer]: Customer object or None
"""
return db.query(Customer).filter(
and_(
Customer.vendor_id == vendor_id,
Customer.email == email.lower()
)
).first()
def update_customer(
self,
db: Session,
vendor_id: int,
customer_id: int,
customer_data: CustomerUpdate
) -> Customer:
"""
Update customer profile.
Args:
db: Database session
vendor_id: Vendor ID
customer_id: Customer ID
customer_data: Updated customer data
Returns:
Customer: Updated customer object
Raises:
CustomerNotFoundException: If customer not found
CustomerValidationException: If update data is invalid
"""
customer = self.get_customer(db, vendor_id, customer_id)
# Update fields
update_data = customer_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
if field == "email" and value:
# Check if new email already exists for this vendor
existing = db.query(Customer).filter(
and_(
Customer.vendor_id == vendor_id,
Customer.email == value.lower(),
Customer.id != customer_id
)
).first()
if existing:
raise DuplicateCustomerEmailException(value, "vendor")
setattr(customer, field, value.lower())
elif hasattr(customer, field):
setattr(customer, field, value)
try:
db.commit()
db.refresh(customer)
logger.info(f"Customer updated: {customer.email} (ID: {customer.id})")
return customer
except Exception as e:
db.rollback()
logger.error(f"Error updating customer: {str(e)}")
raise CustomerValidationException(
message="Failed to update customer",
details={"error": str(e)}
)
def deactivate_customer(
self,
db: Session,
vendor_id: int,
customer_id: int
) -> Customer:
"""
Deactivate customer account.
Args:
db: Database session
vendor_id: Vendor ID
customer_id: Customer ID
Returns:
Customer: Deactivated customer object
Raises:
CustomerNotFoundException: If customer not found
"""
customer = self.get_customer(db, vendor_id, customer_id)
customer.is_active = False
db.commit()
db.refresh(customer)
logger.info(f"Customer deactivated: {customer.email} (ID: {customer.id})")
return customer
def update_customer_stats(
self,
db: Session,
customer_id: int,
order_total: float
) -> None:
"""
Update customer statistics after order.
Args:
db: Database session
customer_id: Customer ID
order_total: Order total amount
"""
customer = db.query(Customer).filter(Customer.id == customer_id).first()
if customer:
customer.total_orders += 1
customer.total_spent += order_total
customer.last_order_date = datetime.utcnow()
db.commit()
logger.debug(f"Updated stats for customer {customer.email}")
def _generate_customer_number(
self,
db: Session,
vendor_id: int,
vendor_code: str
) -> str:
"""
Generate unique customer number for vendor.
Format: {VENDOR_CODE}-CUST-{SEQUENCE}
Example: VENDORA-CUST-00001
Args:
db: Database session
vendor_id: Vendor ID
vendor_code: Vendor code
Returns:
str: Unique customer number
"""
# Get count of customers for this vendor
count = db.query(Customer).filter(
Customer.vendor_id == vendor_id
).count()
# Generate number with padding
sequence = str(count + 1).zfill(5)
customer_number = f"{vendor_code.upper()}-CUST-{sequence}"
# Ensure uniqueness (in case of deletions)
while db.query(Customer).filter(
and_(
Customer.vendor_id == vendor_id,
Customer.customer_number == customer_number
)
).first():
count += 1
sequence = str(count + 1).zfill(5)
customer_number = f"{vendor_code.upper()}-CUST-{sequence}"
return customer_number
# Singleton instance
customer_service = CustomerService()