Files
orion/app/api/v1/shop/auth.py
Samir Boulahtit 6735d99df2 feat: implement customer authentication with JWT tokens
Implement secure customer authentication system with dedicated JWT tokens,
separate from admin/vendor authentication.

Backend Changes:
- Add customer JWT token support in deps.py
  - New get_current_customer_from_cookie_or_header dependency
  - Validates customer-specific tokens with type checking
  - Returns Customer object instead of User for shop routes
- Extend AuthService with customer token support
  - Add verify_password() method
  - Add create_access_token_with_data() for custom token payloads
- Update CustomerService authentication
  - Generate customer-specific JWT tokens with type="customer"
  - Use vendor-scoped customer lookup
- Enhance exception handler
  - Sanitize validation errors to prevent password leaks in logs
  - Fix shop login redirect to support multi-access routing
- Improve vendor context detection from Referer header
  - Consistent "path" detection method for cookie path logic

Schema Changes:
- Rename UserLogin.username to email_or_username for flexibility
- Update field validators accordingly

API Changes:
- Update admin/vendor auth endpoints to use email_or_username
- Customer auth already uses email field correctly

Route Changes:
- Update shop account routes to use Customer dependency
- Add /account redirect (without trailing slash)
- Change parameter names from current_user to current_customer

Frontend Changes:
- Update login forms to use email_or_username in API calls
- Change button text from "Log in" to "Sign in" for consistency
- Improve loading spinner layout with flexbox

Security Improvements:
- Customer tokens scoped to vendor_id
- Token type validation prevents cross-context token usage
- Password inputs redacted from validation error logs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-25 21:08:49 +01:00

337 lines
10 KiB
Python

# app/api/v1/shop/auth.py
"""
Shop Authentication API (Public)
Public endpoints for customer authentication in shop frontend.
Uses vendor from request.state (injected by VendorContextMiddleware).
Implements dual token storage with path restriction:
- Sets HTTP-only cookie with path=/shop (restricted to shop routes only)
- Returns token in response for localStorage (API calls)
This prevents:
- Customer cookies from being sent to admin or vendor routes
- Cross-context authentication confusion
"""
import logging
from fastapi import APIRouter, Depends, Response, Request, HTTPException
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.services.customer_service import customer_service
from models.schema.auth import UserLogin
from models.schema.customer import CustomerRegister, CustomerResponse
from app.core.environment import should_use_secure_cookies
from pydantic import BaseModel
router = APIRouter()
logger = logging.getLogger(__name__)
# Response model for customer login
class CustomerLoginResponse(BaseModel):
"""Customer login response with token and customer data."""
access_token: str
token_type: str
expires_in: int
user: CustomerResponse # Use CustomerResponse instead of UserResponse
@router.post("/auth/register", response_model=CustomerResponse)
def register_customer(
request: Request,
customer_data: CustomerRegister,
db: Session = Depends(get_db)
):
"""
Register a new customer for current vendor.
Vendor is automatically determined from request context.
Customer accounts are vendor-scoped - each vendor has independent customers.
Same email can be used for different vendors.
Request Body:
- email: Customer email address
- password: Customer password
- first_name: Customer first name
- last_name: Customer last name
- phone: Customer phone number (optional)
"""
# Get vendor from middleware
vendor = getattr(request.state, 'vendor', None)
if not vendor:
raise HTTPException(
status_code=404,
detail="Vendor not found. Please access via vendor domain/subdomain/path."
)
logger.debug(
f"[SHOP_API] register_customer for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email": customer_data.email,
}
)
# Create customer account
customer = customer_service.register_customer(
db=db,
vendor_id=vendor.id,
customer_data=customer_data
)
logger.info(
f"New customer registered: {customer.email} for vendor {vendor.subdomain}",
extra={
"customer_id": customer.id,
"vendor_id": vendor.id,
"email": customer.email,
}
)
return CustomerResponse.model_validate(customer)
@router.post("/auth/login", response_model=CustomerLoginResponse)
def customer_login(
request: Request,
user_credentials: UserLogin,
response: Response,
db: Session = Depends(get_db)
):
"""
Customer login for current vendor.
Vendor is automatically determined from request context.
Authenticates customer and returns JWT token.
Customer must belong to the specified vendor.
Sets token in two places:
1. HTTP-only cookie with path=/shop (for browser page navigation)
2. Response body (for localStorage and API calls)
The cookie is restricted to /shop/* routes only to prevent
it from being sent to admin or vendor routes.
Request Body:
- email_or_username: Customer email or username
- password: Customer password
"""
# Get vendor from middleware
vendor = getattr(request.state, 'vendor', None)
if not vendor:
raise HTTPException(
status_code=404,
detail="Vendor not found. Please access via vendor domain/subdomain/path."
)
logger.debug(
f"[SHOP_API] customer_login for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email_or_username": user_credentials.email_or_username,
}
)
# Authenticate customer
login_result = customer_service.login_customer(
db=db,
vendor_id=vendor.id,
credentials=user_credentials
)
logger.info(
f"Customer login successful: {login_result['customer'].email} for vendor {vendor.subdomain}",
extra={
"customer_id": login_result['customer'].id,
"vendor_id": vendor.id,
"email": login_result['customer'].email,
}
)
# Calculate cookie path based on vendor access method
vendor_context = getattr(request.state, 'vendor_context', None)
access_method = vendor_context.get('detection_method', 'unknown') if vendor_context else 'unknown'
cookie_path = "/shop" # Default for domain/subdomain access
if access_method == "path":
# For path-based access like /vendors/wizamart/shop
full_prefix = vendor_context.get('full_prefix', '/vendor/') if vendor_context else '/vendor/'
cookie_path = f"{full_prefix}{vendor.subdomain}/shop"
# Set HTTP-only cookie for browser navigation
# Cookie path matches the vendor's shop routes
response.set_cookie(
key="customer_token",
value=login_result["token_data"]["access_token"],
httponly=True, # JavaScript cannot access (XSS protection)
secure=should_use_secure_cookies(), # HTTPS only in production/staging
samesite="lax", # CSRF protection
max_age=login_result["token_data"]["expires_in"], # Match JWT expiry
path=cookie_path, # Matches vendor's shop routes
)
logger.debug(
f"Set customer_token cookie with {login_result['token_data']['expires_in']}s expiry "
f"(path={cookie_path}, httponly=True, secure={should_use_secure_cookies()})",
extra={
"expires_in": login_result['token_data']['expires_in'],
"secure": should_use_secure_cookies(),
"cookie_path": cookie_path,
}
)
# Return full login response
return CustomerLoginResponse(
access_token=login_result["token_data"]["access_token"],
token_type=login_result["token_data"]["token_type"],
expires_in=login_result["token_data"]["expires_in"],
user=CustomerResponse.model_validate(login_result["customer"]),
)
@router.post("/auth/logout")
def customer_logout(
request: Request,
response: Response
):
"""
Customer logout for current vendor.
Vendor is automatically determined from request context.
Clears the customer_token cookie.
Client should also remove token from localStorage.
"""
# Get vendor from middleware (for logging)
vendor = getattr(request.state, 'vendor', None)
logger.info(
f"Customer logout for vendor {vendor.subdomain if vendor else 'unknown'}",
extra={
"vendor_id": vendor.id if vendor else None,
"vendor_code": vendor.subdomain if vendor else None,
}
)
# Calculate cookie path based on vendor access method (must match login)
vendor_context = getattr(request.state, 'vendor_context', None)
access_method = vendor_context.get('detection_method', 'unknown') if vendor_context else 'unknown'
cookie_path = "/shop" # Default for domain/subdomain access
if access_method == "path" and vendor:
# For path-based access like /vendors/wizamart/shop
full_prefix = vendor_context.get('full_prefix', '/vendor/') if vendor_context else '/vendor/'
cookie_path = f"{full_prefix}{vendor.subdomain}/shop"
# Clear the cookie (must match path used when setting)
response.delete_cookie(
key="customer_token",
path=cookie_path,
)
logger.debug(f"Deleted customer_token cookie (path={cookie_path})")
return {"message": "Logged out successfully"}
@router.post("/auth/forgot-password")
def forgot_password(
request: Request,
email: str,
db: Session = Depends(get_db)
):
"""
Request password reset for customer.
Vendor is automatically determined from request context.
Sends password reset email to customer if account exists.
Request Body:
- email: Customer email address
"""
# Get vendor from middleware
vendor = getattr(request.state, 'vendor', None)
if not vendor:
raise HTTPException(
status_code=404,
detail="Vendor not found. Please access via vendor domain/subdomain/path."
)
logger.debug(
f"[SHOP_API] forgot_password for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
"email": email,
}
)
# TODO: Implement password reset functionality
# - Generate reset token
# - Store token in database with expiry
# - Send reset email to customer
# - Return success message (don't reveal if email exists)
logger.info(
f"Password reset requested for {email} (vendor: {vendor.subdomain})"
)
return {
"message": "If an account exists with this email, a password reset link has been sent."
}
@router.post("/auth/reset-password")
def reset_password(
request: Request,
reset_token: str,
new_password: str,
db: Session = Depends(get_db)
):
"""
Reset customer password using reset token.
Vendor is automatically determined from request context.
Request Body:
- reset_token: Password reset token from email
- new_password: New password
"""
# Get vendor from middleware
vendor = getattr(request.state, 'vendor', None)
if not vendor:
raise HTTPException(
status_code=404,
detail="Vendor not found. Please access via vendor domain/subdomain/path."
)
logger.debug(
f"[SHOP_API] reset_password for vendor {vendor.subdomain}",
extra={
"vendor_id": vendor.id,
"vendor_code": vendor.subdomain,
}
)
# TODO: Implement password reset
# - Validate reset token
# - Check token expiry
# - Update customer password
# - Invalidate reset token
# - Return success
logger.info(
f"Password reset completed (vendor: {vendor.subdomain})"
)
return {
"message": "Password reset successfully. You can now log in with your new password."
}