Files
orion/app/modules/customers/routes/api/storefront.py
Samir Boulahtit d48dc85d5f refactor: update module imports to use module locations
Update all module files to import from canonical module locations
instead of legacy re-export files:
- checkout, orders, customers routes: use module schemas
- catalog, marketplace schemas: use inventory module schemas
- marketplace, customers, inventory, analytics services: use module models

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 08:35:01 +01:00

731 lines
23 KiB
Python

# app/modules/customers/routes/api/storefront.py
"""
Customers Module - Storefront API Routes
Public and authenticated endpoints for customer operations in storefront:
- Authentication (register, login, logout, password reset)
- Profile management
- Address management
Uses vendor from middleware context (VendorContextMiddleware).
Implements dual token storage with path restriction:
- Sets HTTP-only cookie with path=/shop (restricted to shop routes only)
- Returns token in response for localStorage (API calls)
"""
import logging
from fastapi import APIRouter, Depends, Path, Request, Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_customer_api
from app.core.database import get_db
from app.core.environment import should_use_secure_cookies
from app.exceptions import ValidationException, VendorNotFoundException
from app.modules.customers.schemas import CustomerContext
from app.modules.customers.services import (
customer_address_service,
customer_service,
)
from app.services.auth_service import AuthService
from app.services.email_service import EmailService
from models.database.password_reset_token import PasswordResetToken
from models.schema.auth import (
LogoutResponse,
PasswordResetRequestResponse,
PasswordResetResponse,
UserLogin,
)
from app.modules.customers.schemas import (
CustomerAddressCreate,
CustomerAddressListResponse,
CustomerAddressResponse,
CustomerAddressUpdate,
CustomerPasswordChange,
CustomerRegister,
CustomerResponse,
CustomerUpdate,
)
router = APIRouter()
logger = logging.getLogger(__name__)
# Auth service for password operations
auth_service = AuthService()
# ============================================================================
# Response Models
# ============================================================================
class CustomerLoginResponse(BaseModel):
"""Customer login response with token and customer data."""
access_token: str
token_type: str
expires_in: int
user: CustomerResponse
# ============================================================================
# AUTHENTICATION ENDPOINTS
# ============================================================================
@router.post("/auth/register", response_model=CustomerResponse)
def register_customer(
request: Request, customer_data: CustomerRegister, db: Session = Depends(get_db)
):
"""
Register a new customer for current vendor.
Vendor is automatically determined from request context.
Customer accounts are vendor-scoped - each vendor has independent customers.
Same email can be used for different vendors.
Request Body:
- email: Customer email address
- password: Customer password
- first_name: Customer first name
- last_name: Customer last name
- phone: Customer phone number (optional)
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] register_customer for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email": customer_data.email,
},
)
customer = customer_service.register_customer(
db=db, vendor_id=vendor.id, customer_data=customer_data
)
db.commit()
logger.info(
f"New customer registered: {customer.email} for vendor {vendor.subdomain}",
extra={
"customer_id": customer.id,
"vendor_id": vendor.id,
"email": customer.email,
},
)
return CustomerResponse.model_validate(customer)
@router.post("/auth/login", response_model=CustomerLoginResponse)
def customer_login(
request: Request,
user_credentials: UserLogin,
response: Response,
db: Session = Depends(get_db),
):
"""
Customer login for current vendor.
Vendor is automatically determined from request context.
Authenticates customer and returns JWT token.
Customer must belong to the specified vendor.
Sets token in two places:
1. HTTP-only cookie with path=/shop (for browser page navigation)
2. Response body (for localStorage and API calls)
Request Body:
- email_or_username: Customer email or username
- password: Customer password
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] customer_login for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email_or_username": user_credentials.email_or_username,
},
)
login_result = customer_service.login_customer(
db=db, vendor_id=vendor.id, credentials=user_credentials
)
logger.info(
f"Customer login successful: {login_result['customer'].email} for vendor {vendor.subdomain}",
extra={
"customer_id": login_result["customer"].id,
"vendor_id": vendor.id,
"email": login_result["customer"].email,
},
)
# Calculate cookie path based on vendor access method
vendor_context = getattr(request.state, "vendor_context", None)
access_method = (
vendor_context.get("detection_method", "unknown")
if vendor_context
else "unknown"
)
cookie_path = "/shop"
if access_method == "path":
full_prefix = (
vendor_context.get("full_prefix", "/vendor/")
if vendor_context
else "/vendor/"
)
cookie_path = f"{full_prefix}{vendor.subdomain}/shop"
response.set_cookie(
key="customer_token",
value=login_result["token_data"]["access_token"],
httponly=True,
secure=should_use_secure_cookies(),
samesite="lax",
max_age=login_result["token_data"]["expires_in"],
path=cookie_path,
)
logger.debug(
f"Set customer_token cookie with {login_result['token_data']['expires_in']}s expiry "
f"(path={cookie_path}, httponly=True, secure={should_use_secure_cookies()})",
)
return CustomerLoginResponse(
access_token=login_result["token_data"]["access_token"],
token_type=login_result["token_data"]["token_type"],
expires_in=login_result["token_data"]["expires_in"],
user=CustomerResponse.model_validate(login_result["customer"]),
)
@router.post("/auth/logout", response_model=LogoutResponse)
def customer_logout(request: Request, response: Response):
"""
Customer logout for current vendor.
Vendor is automatically determined from request context.
Clears the customer_token cookie.
Client should also remove token from localStorage.
"""
vendor = getattr(request.state, "vendor", None)
logger.info(
f"Customer logout for vendor {vendor.subdomain if vendor else 'unknown'}",
extra={
"vendor_id": vendor.id if vendor else None,
"vendor_code": vendor.subdomain if vendor else None,
},
)
vendor_context = getattr(request.state, "vendor_context", None)
access_method = (
vendor_context.get("detection_method", "unknown")
if vendor_context
else "unknown"
)
cookie_path = "/shop"
if access_method == "path" and vendor:
full_prefix = (
vendor_context.get("full_prefix", "/vendor/")
if vendor_context
else "/vendor/"
)
cookie_path = f"{full_prefix}{vendor.subdomain}/shop"
response.delete_cookie(key="customer_token", path=cookie_path)
logger.debug(f"Deleted customer_token cookie (path={cookie_path})")
return LogoutResponse(message="Logged out successfully")
@router.post("/auth/forgot-password", response_model=PasswordResetRequestResponse)
def forgot_password(request: Request, email: str, db: Session = Depends(get_db)):
"""
Request password reset for customer.
Vendor is automatically determined from request context.
Sends password reset email to customer if account exists.
Request Body:
- email: Customer email address
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] forgot_password for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email": email,
},
)
customer = customer_service.get_customer_for_password_reset(db, vendor.id, email)
if customer:
try:
plaintext_token = PasswordResetToken.create_for_customer(db, customer.id)
scheme = "https" if should_use_secure_cookies() else "http"
host = request.headers.get("host", "localhost")
reset_link = f"{scheme}://{host}/shop/account/reset-password?token={plaintext_token}"
email_service = EmailService(db)
email_service.send_template(
template_code="password_reset",
to_email=customer.email,
to_name=customer.full_name,
language=customer.preferred_language or "en",
variables={
"customer_name": customer.first_name or customer.full_name,
"reset_link": reset_link,
"expiry_hours": str(PasswordResetToken.TOKEN_EXPIRY_HOURS),
},
vendor_id=vendor.id,
related_type="customer",
related_id=customer.id,
)
db.commit()
logger.info(
f"Password reset email sent to {email} (vendor: {vendor.subdomain})"
)
except Exception as e:
db.rollback()
logger.error(f"Failed to send password reset email: {e}")
else:
logger.info(
f"Password reset requested for non-existent email {email} (vendor: {vendor.subdomain})"
)
return PasswordResetRequestResponse(
message="If an account exists with this email, a password reset link has been sent."
)
@router.post("/auth/reset-password", response_model=PasswordResetResponse)
def reset_password(
request: Request, reset_token: str, new_password: str, db: Session = Depends(get_db)
):
"""
Reset customer password using reset token.
Vendor is automatically determined from request context.
Request Body:
- reset_token: Password reset token from email
- new_password: New password (minimum 8 characters)
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] reset_password for vendor {vendor.subdomain}",
extra={"vendor_id": vendor.id, "vendor_code": vendor.subdomain},
)
customer = customer_service.validate_and_reset_password(
db=db,
vendor_id=vendor.id,
reset_token=reset_token,
new_password=new_password,
)
db.commit()
logger.info(
f"Password reset completed for customer {customer.id} (vendor: {vendor.subdomain})"
)
return PasswordResetResponse(
message="Password reset successfully. You can now log in with your new password."
)
# ============================================================================
# PROFILE ENDPOINTS
# ============================================================================
@router.get("/profile", response_model=CustomerResponse) # authenticated
def get_profile(
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
Get current customer profile.
Returns the authenticated customer's profile information.
"""
logger.debug(
f"[CUSTOMER_STOREFRONT] get_profile for customer {customer.id}",
extra={"customer_id": customer.id, "email": customer.email},
)
return CustomerResponse.model_validate(customer)
@router.put("/profile", response_model=CustomerResponse)
def update_profile(
update_data: CustomerUpdate,
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
Update current customer profile.
Allows updating profile fields like name, phone, marketing consent, etc.
Email changes require the new email to be unique within the vendor.
Request Body:
- email: New email address (optional)
- first_name: First name (optional)
- last_name: Last name (optional)
- phone: Phone number (optional)
- marketing_consent: Marketing consent (optional)
- preferred_language: Preferred language (optional)
"""
logger.debug(
f"[CUSTOMER_STOREFRONT] update_profile for customer {customer.id}",
extra={
"customer_id": customer.id,
"email": customer.email,
"update_fields": [
k for k, v in update_data.model_dump().items() if v is not None
],
},
)
if update_data.email and update_data.email != customer.email:
existing = customer_service.get_customer_by_email(
db, customer.vendor_id, update_data.email
)
if existing and existing.id != customer.id:
raise ValidationException("Email already in use")
update_dict = update_data.model_dump(exclude_unset=True)
for field, value in update_dict.items():
if value is not None:
setattr(customer, field, value)
db.commit()
db.refresh(customer)
logger.info(
f"Customer {customer.id} updated profile",
extra={"customer_id": customer.id, "updated_fields": list(update_dict.keys())},
)
return CustomerResponse.model_validate(customer)
@router.put("/profile/password", response_model=dict)
def change_password(
password_data: CustomerPasswordChange,
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
Change customer password.
Requires current password verification and matching new password confirmation.
Request Body:
- current_password: Current password
- new_password: New password (min 8 chars, must contain letter and digit)
- confirm_password: Confirmation of new password
"""
logger.debug(
f"[CUSTOMER_STOREFRONT] change_password for customer {customer.id}",
extra={"customer_id": customer.id, "email": customer.email},
)
if not auth_service.auth_manager.verify_password(
password_data.current_password, customer.hashed_password
):
raise ValidationException("Current password is incorrect")
if password_data.new_password != password_data.confirm_password:
raise ValidationException("New passwords do not match")
if password_data.new_password == password_data.current_password:
raise ValidationException("New password must be different from current password")
customer.hashed_password = auth_service.hash_password(password_data.new_password)
db.commit()
logger.info(
f"Customer {customer.id} changed password",
extra={"customer_id": customer.id, "email": customer.email},
)
return {"message": "Password changed successfully"}
# ============================================================================
# ADDRESS ENDPOINTS
# ============================================================================
@router.get("/addresses", response_model=CustomerAddressListResponse) # authenticated
def list_addresses(
request: Request,
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
List all addresses for authenticated customer.
Vendor is automatically determined from request context.
Returns all addresses sorted by default first, then by creation date.
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] list_addresses for customer {customer.id}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"customer_id": customer.id,
},
)
addresses = customer_address_service.list_addresses(
db=db, vendor_id=vendor.id, customer_id=customer.id
)
return CustomerAddressListResponse(
addresses=[CustomerAddressResponse.model_validate(a) for a in addresses],
total=len(addresses),
)
@router.get("/addresses/{address_id}", response_model=CustomerAddressResponse)
def get_address(
request: Request,
address_id: int = Path(..., description="Address ID", gt=0),
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
Get specific address by ID.
Vendor is automatically determined from request context.
Customer can only access their own addresses.
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] get_address {address_id} for customer {customer.id}",
extra={
"vendor_id": vendor.id,
"customer_id": customer.id,
"address_id": address_id,
},
)
address = customer_address_service.get_address(
db=db, vendor_id=vendor.id, customer_id=customer.id, address_id=address_id
)
return CustomerAddressResponse.model_validate(address)
@router.post("/addresses", response_model=CustomerAddressResponse, status_code=201)
def create_address(
request: Request,
address_data: CustomerAddressCreate,
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
Create new address for authenticated customer.
Vendor is automatically determined from request context.
Maximum 10 addresses per customer.
If is_default=True, clears default flag on other addresses of same type.
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] create_address for customer {customer.id}",
extra={
"vendor_id": vendor.id,
"customer_id": customer.id,
"address_type": address_data.address_type,
},
)
address = customer_address_service.create_address(
db=db,
vendor_id=vendor.id,
customer_id=customer.id,
address_data=address_data,
)
db.commit()
logger.info(
f"Created address {address.id} for customer {customer.id} "
f"(type={address_data.address_type})",
extra={
"address_id": address.id,
"customer_id": customer.id,
"address_type": address_data.address_type,
},
)
return CustomerAddressResponse.model_validate(address)
@router.put("/addresses/{address_id}", response_model=CustomerAddressResponse)
def update_address(
request: Request,
address_data: CustomerAddressUpdate,
address_id: int = Path(..., description="Address ID", gt=0),
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
Update existing address.
Vendor is automatically determined from request context.
Customer can only update their own addresses.
If is_default=True, clears default flag on other addresses of same type.
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] update_address {address_id} for customer {customer.id}",
extra={
"vendor_id": vendor.id,
"customer_id": customer.id,
"address_id": address_id,
},
)
address = customer_address_service.update_address(
db=db,
vendor_id=vendor.id,
customer_id=customer.id,
address_id=address_id,
address_data=address_data,
)
db.commit()
logger.info(
f"Updated address {address_id} for customer {customer.id}",
extra={"address_id": address_id, "customer_id": customer.id},
)
return CustomerAddressResponse.model_validate(address)
@router.delete("/addresses/{address_id}", status_code=204)
def delete_address(
request: Request,
address_id: int = Path(..., description="Address ID", gt=0),
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
Delete address.
Vendor is automatically determined from request context.
Customer can only delete their own addresses.
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] delete_address {address_id} for customer {customer.id}",
extra={
"vendor_id": vendor.id,
"customer_id": customer.id,
"address_id": address_id,
},
)
customer_address_service.delete_address(
db=db, vendor_id=vendor.id, customer_id=customer.id, address_id=address_id
)
db.commit()
logger.info(
f"Deleted address {address_id} for customer {customer.id}",
extra={"address_id": address_id, "customer_id": customer.id},
)
@router.put("/addresses/{address_id}/default", response_model=CustomerAddressResponse)
def set_address_default(
request: Request,
address_id: int = Path(..., description="Address ID", gt=0),
customer: CustomerContext = Depends(get_current_customer_api),
db: Session = Depends(get_db),
):
"""
Set address as default for its type.
Vendor is automatically determined from request context.
Clears default flag on other addresses of the same type.
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[CUSTOMER_STOREFRONT] set_address_default {address_id} for customer {customer.id}",
extra={
"vendor_id": vendor.id,
"customer_id": customer.id,
"address_id": address_id,
},
)
address = customer_address_service.set_default(
db=db, vendor_id=vendor.id, customer_id=customer.id, address_id=address_id
)
db.commit()
logger.info(
f"Set address {address_id} as default for customer {customer.id}",
extra={
"address_id": address_id,
"customer_id": customer.id,
"address_type": address.address_type,
},
)
return CustomerAddressResponse.model_validate(address)