Files
orion/app/api/v1/shop/auth.py
Samir Boulahtit 2e1a2fc9fc feat: implement password reset for shop customers
Add complete password reset functionality:

Database:
- Add password_reset_tokens migration with token hash, expiry, used_at
- Create PasswordResetToken model with secure token hashing (SHA256)
- One active token per customer (old tokens invalidated on new request)
- 1-hour token expiry for security

API:
- Implement forgot_password endpoint with email lookup
- Implement reset_password endpoint with token validation
- No email enumeration (same response for all requests)
- Password minimum 8 characters validation

Frontend:
- Add reset-password.html template with Alpine.js
- Support for invalid/expired token states
- Success state with login redirect
- Dark mode support

Email:
- Add password_reset email templates (en, fr, de, lb)
- Uses existing EmailService with template rendering

Testing:
- Add comprehensive pytest tests (19 tests)
- Test token creation, validation, expiry, reuse prevention
- Test endpoint success and error cases

Removes critical launch blocker for password reset functionality.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-03 17:16:27 +01:00

415 lines
14 KiB
Python

# app/api/v1/shop/auth.py
"""
Shop Authentication API (Public)
Public endpoints for customer authentication in shop frontend.
Uses vendor from request.state (injected by VendorContextMiddleware).
Implements dual token storage with path restriction:
- Sets HTTP-only cookie with path=/shop (restricted to shop routes only)
- Returns token in response for localStorage (API calls)
This prevents:
- Customer cookies from being sent to admin or vendor routes
- Cross-context authentication confusion
"""
import logging
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.environment import should_use_secure_cookies
from app.exceptions import VendorNotFoundException
from app.services.customer_service import customer_service
from app.services.email_service import EmailService
from models.database.customer import Customer
from models.database.password_reset_token import PasswordResetToken
from models.schema.auth import (
LogoutResponse,
PasswordResetRequestResponse,
PasswordResetResponse,
UserLogin,
)
from models.schema.customer import CustomerRegister, CustomerResponse
router = APIRouter()
logger = logging.getLogger(__name__)
# Response model for customer login
class CustomerLoginResponse(BaseModel):
"""Customer login response with token and customer data."""
access_token: str
token_type: str
expires_in: int
user: CustomerResponse # Use CustomerResponse instead of UserResponse
@router.post("/auth/register", response_model=CustomerResponse)
def register_customer(
request: Request, customer_data: CustomerRegister, db: Session = Depends(get_db)
):
"""
Register a new customer for current vendor.
Vendor is automatically determined from request context.
Customer accounts are vendor-scoped - each vendor has independent customers.
Same email can be used for different vendors.
Request Body:
- email: Customer email address
- password: Customer password
- first_name: Customer first name
- last_name: Customer last name
- phone: Customer phone number (optional)
"""
# Get vendor from middleware
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[SHOP_API] register_customer for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email": customer_data.email,
},
)
# Create customer account
customer = customer_service.register_customer(
db=db, vendor_id=vendor.id, customer_data=customer_data
)
db.commit()
logger.info(
f"New customer registered: {customer.email} for vendor {vendor.subdomain}",
extra={
"customer_id": customer.id,
"vendor_id": vendor.id,
"email": customer.email,
},
)
return CustomerResponse.model_validate(customer)
@router.post("/auth/login", response_model=CustomerLoginResponse)
def customer_login(
request: Request,
user_credentials: UserLogin,
response: Response,
db: Session = Depends(get_db),
):
"""
Customer login for current vendor.
Vendor is automatically determined from request context.
Authenticates customer and returns JWT token.
Customer must belong to the specified vendor.
Sets token in two places:
1. HTTP-only cookie with path=/shop (for browser page navigation)
2. Response body (for localStorage and API calls)
The cookie is restricted to /shop/* routes only to prevent
it from being sent to admin or vendor routes.
Request Body:
- email_or_username: Customer email or username
- password: Customer password
"""
# Get vendor from middleware
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[SHOP_API] customer_login for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email_or_username": user_credentials.email_or_username,
},
)
# Authenticate customer
login_result = customer_service.login_customer(
db=db, vendor_id=vendor.id, credentials=user_credentials
)
logger.info(
f"Customer login successful: {login_result['customer'].email} for vendor {vendor.subdomain}",
extra={
"customer_id": login_result["customer"].id,
"vendor_id": vendor.id,
"email": login_result["customer"].email,
},
)
# Calculate cookie path based on vendor access method
vendor_context = getattr(request.state, "vendor_context", None)
access_method = (
vendor_context.get("detection_method", "unknown")
if vendor_context
else "unknown"
)
cookie_path = "/shop" # Default for domain/subdomain access
if access_method == "path":
# For path-based access like /vendors/wizamart/shop
full_prefix = (
vendor_context.get("full_prefix", "/vendor/")
if vendor_context
else "/vendor/"
)
cookie_path = f"{full_prefix}{vendor.subdomain}/shop"
# Set HTTP-only cookie for browser navigation
# Cookie path matches the vendor's shop routes
response.set_cookie(
key="customer_token",
value=login_result["token_data"]["access_token"],
httponly=True, # JavaScript cannot access (XSS protection)
secure=should_use_secure_cookies(), # HTTPS only in production/staging
samesite="lax", # CSRF protection
max_age=login_result["token_data"]["expires_in"], # Match JWT expiry
path=cookie_path, # Matches vendor's shop routes
)
logger.debug(
f"Set customer_token cookie with {login_result['token_data']['expires_in']}s expiry "
f"(path={cookie_path}, httponly=True, secure={should_use_secure_cookies()})",
extra={
"expires_in": login_result["token_data"]["expires_in"],
"secure": should_use_secure_cookies(),
"cookie_path": cookie_path,
},
)
# Return full login response
return CustomerLoginResponse(
access_token=login_result["token_data"]["access_token"],
token_type=login_result["token_data"]["token_type"],
expires_in=login_result["token_data"]["expires_in"],
user=CustomerResponse.model_validate(login_result["customer"]),
)
@router.post("/auth/logout", response_model=LogoutResponse)
def customer_logout(request: Request, response: Response):
"""
Customer logout for current vendor.
Vendor is automatically determined from request context.
Clears the customer_token cookie.
Client should also remove token from localStorage.
"""
# Get vendor from middleware (for logging)
vendor = getattr(request.state, "vendor", None)
logger.info(
f"Customer logout for vendor {vendor.subdomain if vendor else 'unknown'}",
extra={
"vendor_id": vendor.id if vendor else None,
"vendor_code": vendor.subdomain if vendor else None,
},
)
# Calculate cookie path based on vendor access method (must match login)
vendor_context = getattr(request.state, "vendor_context", None)
access_method = (
vendor_context.get("detection_method", "unknown")
if vendor_context
else "unknown"
)
cookie_path = "/shop" # Default for domain/subdomain access
if access_method == "path" and vendor:
# For path-based access like /vendors/wizamart/shop
full_prefix = (
vendor_context.get("full_prefix", "/vendor/")
if vendor_context
else "/vendor/"
)
cookie_path = f"{full_prefix}{vendor.subdomain}/shop"
# Clear the cookie (must match path used when setting)
response.delete_cookie(
key="customer_token",
path=cookie_path,
)
logger.debug(f"Deleted customer_token cookie (path={cookie_path})")
return LogoutResponse(message="Logged out successfully")
@router.post("/auth/forgot-password", response_model=PasswordResetRequestResponse)
def forgot_password(request: Request, email: str, db: Session = Depends(get_db)):
"""
Request password reset for customer.
Vendor is automatically determined from request context.
Sends password reset email to customer if account exists.
Request Body:
- email: Customer email address
"""
# Get vendor from middleware
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[SHOP_API] forgot_password for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email": email,
},
)
# Look up customer by email (vendor-scoped)
customer = (
db.query(Customer)
.filter(
Customer.vendor_id == vendor.id,
Customer.email == email.lower(),
Customer.is_active == True, # noqa: E712
)
.first()
)
# If customer exists, generate token and send email
if customer:
try:
# Generate reset token (returns plaintext token)
plaintext_token = PasswordResetToken.create_for_customer(db, customer.id)
# Build reset link
# Use request host to construct the URL
scheme = "https" if should_use_secure_cookies() else "http"
host = request.headers.get("host", "localhost")
reset_link = f"{scheme}://{host}/shop/account/reset-password?token={plaintext_token}"
# Send password reset email
email_service = EmailService(db)
email_service.send_template(
template_code="password_reset",
to_email=customer.email,
to_name=customer.full_name,
language=customer.preferred_language or "en",
variables={
"customer_name": customer.first_name or customer.full_name,
"reset_link": reset_link,
"expiry_hours": str(PasswordResetToken.TOKEN_EXPIRY_HOURS),
},
vendor_id=vendor.id,
related_type="customer",
related_id=customer.id,
)
db.commit()
logger.info(
f"Password reset email sent to {email} (vendor: {vendor.subdomain})"
)
except Exception as e:
db.rollback()
logger.error(f"Failed to send password reset email: {e}")
# Don't reveal the error to the user for security
else:
# Log but don't reveal that email doesn't exist
logger.info(
f"Password reset requested for non-existent email {email} (vendor: {vendor.subdomain})"
)
# Always return the same message (don't reveal if email exists)
return PasswordResetRequestResponse(
message="If an account exists with this email, a password reset link has been sent."
)
@router.post("/auth/reset-password", response_model=PasswordResetResponse)
def reset_password(
request: Request, reset_token: str, new_password: str, db: Session = Depends(get_db)
):
"""
Reset customer password using reset token.
Vendor is automatically determined from request context.
Request Body:
- reset_token: Password reset token from email
- new_password: New password (minimum 8 characters)
"""
# Get vendor from middleware
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorNotFoundException("context", identifier_type="subdomain")
logger.debug(
f"[SHOP_API] reset_password for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
},
)
# Validate password length
if len(new_password) < 8:
raise HTTPException(
status_code=400,
detail="Password must be at least 8 characters long",
)
# Find valid token
token_record = PasswordResetToken.find_valid_token(db, reset_token)
if not token_record:
raise HTTPException(
status_code=400,
detail="Invalid or expired password reset link. Please request a new one.",
)
# Get the customer and verify they belong to this vendor
customer = db.query(Customer).filter(Customer.id == token_record.customer_id).first()
if not customer or customer.vendor_id != vendor.id:
raise HTTPException(
status_code=400,
detail="Invalid or expired password reset link. Please request a new one.",
)
if not customer.is_active:
raise HTTPException(
status_code=400,
detail="This account is not active. Please contact support.",
)
# Hash the new password and update customer
hashed_password = customer_service.auth_service.hash_password(new_password)
customer.hashed_password = hashed_password
# Mark token as used
token_record.mark_used(db)
db.commit()
logger.info(
f"Password reset completed for customer {customer.id} (vendor: {vendor.subdomain})"
)
return PasswordResetResponse(
message="Password reset successfully. You can now log in with your new password."
)