Files
orion/app/api/deps.py
Samir Boulahtit cbfbbb4654 fix: customer authentication and shop error page styling
## Customer Authentication Fixes
- Fix get_current_customer_api to properly decode customer tokens (was using User model)
- Add _validate_customer_token() helper for shared customer token validation
- Add vendor validation: token.vendor_id must match request URL vendor
- Block admin/vendor tokens from shop endpoints (type != "customer")
- Update get_current_customer_optional to use proper customer token validation
- Customer auth functions now return Customer object (not User)

## Shop Orders API
- Update orders.py to receive Customer directly from auth dependency
- Remove broken get_customer_from_user() helper
- Use VendorNotFoundException instead of HTTPException

## Shop Error Pages
- Fix all error templates (400, 401, 403, 404, 422, 429, 500, 502, generic)
- Templates were using undefined CSS classes (.btn, .status-code, etc.)
- Now properly extend base.html and override specific blocks
- Use Tailwind utility classes for consistent styling

## Documentation
- Update docs/api/authentication.md with new Customer return types
- Document vendor validation security features
- Update docs/api/authentication-quick-reference.md examples

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-04 22:48:02 +01:00

952 lines
30 KiB
Python

# app/api/deps.py
"""
Authentication dependencies for FastAPI routes.
This module provides authentication dependencies for all three contexts in the
multi-tenant application, implementing dual token storage with proper isolation:
ADMIN ROUTES (/admin/*):
- Cookie: admin_token (path=/admin) OR Authorization header
- Role: admin only
- Blocks: vendors, customers
VENDOR ROUTES (/vendor/*):
- Cookie: vendor_token (path=/vendor) OR Authorization header
- Role: vendor only
- Blocks: admins, customers
CUSTOMER/SHOP ROUTES (/shop/account/*):
- Cookie: customer_token (path=/shop) OR Authorization header
- Role: customer only
- Blocks: admins, vendors
- Note: Public shop pages (/shop/products, etc.) don't require auth
This dual authentication approach supports:
- HTML pages: Use cookies (automatic browser behavior)
- API calls: Use Authorization headers (explicit JavaScript control)
The cookie path restrictions prevent cross-context cookie leakage:
- admin_token is NEVER sent to /vendor/* or /shop/*
- vendor_token is NEVER sent to /admin/* or /shop/*
- customer_token is NEVER sent to /admin/* or /vendor/*
"""
import logging
from datetime import UTC
from fastapi import Cookie, Depends, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.exceptions import (
AdminRequiredException,
InsufficientPermissionsException,
InsufficientVendorPermissionsException,
InvalidTokenException,
UnauthorizedVendorAccessException,
VendorAccessDeniedException,
VendorNotFoundException,
VendorOwnerOnlyException,
)
from app.services.vendor_service import vendor_service
from middleware.auth import AuthManager
from middleware.rate_limiter import RateLimiter
from models.database.user import User
from models.database.vendor import Vendor
# Initialize dependencies
security = HTTPBearer(auto_error=False) # auto_error=False prevents automatic 403
auth_manager = AuthManager()
rate_limiter = RateLimiter()
logger = logging.getLogger(__name__)
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def _get_token_from_request(
credentials: HTTPAuthorizationCredentials | None,
cookie_value: str | None,
cookie_name: str,
request_path: str,
) -> tuple[str | None, str | None]:
"""
Extract token from Authorization header or cookie.
Priority:
1. Authorization header (for API calls from JavaScript)
2. Cookie (for browser page navigation)
Args:
credentials: Optional Bearer token from Authorization header
cookie_value: Optional token from cookie
cookie_name: Name of the cookie (for logging)
request_path: Request URL path (for logging)
Returns:
Tuple of (token, source) where source is "header" or "cookie"
"""
if credentials:
logger.debug(f"Token found in Authorization header for {request_path}")
return credentials.credentials, "header"
if cookie_value:
logger.debug(f"Token found in {cookie_name} cookie for {request_path}")
return cookie_value, "cookie"
return None, None
def _validate_user_token(token: str, db: Session) -> User:
"""
Validate JWT token and return user.
Args:
token: JWT token string
db: Database session
Returns:
User: Authenticated user object
Raises:
InvalidTokenException: If token is invalid
"""
mock_credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
return auth_manager.get_current_user(db, mock_credentials)
# ============================================================================
# ADMIN AUTHENTICATION
# ============================================================================
def get_current_admin_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
"""
Get current admin user from admin_token cookie or Authorization header.
Used for admin HTML pages (/admin/*) that need cookie-based auth.
Priority:
1. Authorization header (API calls)
2. admin_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
User: Authenticated admin user
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin
"""
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
logger.warning(f"Admin auth failed: No token for {request.url.path}")
raise InvalidTokenException("Admin authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is admin
if user.role != "admin":
logger.warning(
f"Non-admin user {user.username} attempted admin route: {request.url.path}"
)
raise AdminRequiredException("Admin privileges required")
return user
def get_current_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""
Get current admin user from Authorization header ONLY.
Used for admin API endpoints that should not accept cookies.
This prevents CSRF attacks on API endpoints.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
User: Authenticated admin user
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
if user.role != "admin":
logger.warning(f"Non-admin user {user.username} attempted admin API")
raise AdminRequiredException("Admin privileges required")
return user
# ============================================================================
# VENDOR AUTHENTICATION
# ============================================================================
def get_current_vendor_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
"""
Get current vendor user from vendor_token cookie or Authorization header.
Used for vendor HTML pages (/vendor/*) that need cookie-based auth.
Priority:
1. Authorization header (API calls)
2. vendor_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
vendor_token: Optional token from vendor_token cookie
db: Database session
Returns:
User: Authenticated vendor user
Raises:
InvalidTokenException: If no token or invalid token
InsufficientPermissionsException: If user is not vendor or is admin
"""
token, source = _get_token_from_request(
credentials, vendor_token, "vendor_token", str(request.url.path)
)
if not token:
logger.warning(f"Vendor auth failed: No token for {request.url.path}")
raise InvalidTokenException("Vendor authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# CRITICAL: Block admins from vendor routes
if user.role == "admin":
logger.warning(
f"Admin user {user.username} attempted vendor route: {request.url.path}"
)
raise InsufficientPermissionsException(
"Vendor access only - admins cannot use vendor portal"
)
# Verify user is vendor
if user.role != "vendor":
logger.warning(
f"Non-vendor user {user.username} attempted vendor route: {request.url.path}"
)
raise InsufficientPermissionsException("Vendor privileges required")
return user
def get_current_vendor_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""
Get current vendor user from Authorization header ONLY.
Used for vendor API endpoints that should not accept cookies.
Validates that user still has access to the vendor specified in the token.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
User: Authenticated vendor user (with token_vendor_id, token_vendor_code, token_vendor_role)
Raises:
InvalidTokenException: If no token or invalid token
InsufficientPermissionsException: If user is not vendor or lost access to vendor
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
# Block admins from vendor API
if user.role == "admin":
logger.warning(f"Admin user {user.username} attempted vendor API")
raise InsufficientPermissionsException("Vendor access only")
if user.role != "vendor":
logger.warning(f"Non-vendor user {user.username} attempted vendor API")
raise InsufficientPermissionsException("Vendor privileges required")
# Validate vendor access if token is vendor-scoped
if hasattr(user, "token_vendor_id"):
vendor_id = user.token_vendor_id
# Verify user still has access to this vendor
if not user.is_member_of(vendor_id):
logger.warning(
f"User {user.username} lost access to vendor_id={vendor_id}"
)
raise InsufficientPermissionsException(
"Access to vendor has been revoked. Please login again."
)
logger.debug(
f"Vendor API access: user={user.username}, vendor_id={vendor_id}, "
f"vendor_code={getattr(user, 'token_vendor_code', 'N/A')}"
)
return user
# ============================================================================
# CUSTOMER AUTHENTICATION (SHOP)
# ============================================================================
def _validate_customer_token(token: str, request: Request, db: Session):
"""
Validate customer JWT token and return Customer object.
Validates:
1. Token signature and expiration
2. Token type is "customer"
3. Customer exists and is active
4. Token vendor_id matches request vendor (URL-based)
Args:
token: JWT token string
request: FastAPI request (for vendor context)
db: Database session
Returns:
Customer: Authenticated customer object
Raises:
InvalidTokenException: If token is invalid or expired
UnauthorizedVendorAccessException: If vendor mismatch
"""
from datetime import datetime
from jose import JWTError, jwt
from models.database.customer import Customer
# Decode and validate customer JWT token
try:
payload = jwt.decode(
token, auth_manager.secret_key, algorithms=[auth_manager.algorithm]
)
# Verify this is a customer token
token_type = payload.get("type")
if token_type != "customer":
logger.warning(f"Invalid token type for customer route: {token_type}")
raise InvalidTokenException("Customer authentication required")
# Get customer ID from token
customer_id: str = payload.get("sub")
if customer_id is None:
logger.warning("Token missing 'sub' (customer_id)")
raise InvalidTokenException("Invalid token")
# Verify token hasn't expired
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp, tz=UTC) < datetime.now(UTC):
logger.warning(f"Expired customer token for customer_id={customer_id}")
raise InvalidTokenException("Token has expired")
# Get vendor_id from token for validation
token_vendor_id = payload.get("vendor_id")
except JWTError as e:
logger.warning(f"JWT decode error: {str(e)}")
raise InvalidTokenException("Could not validate credentials")
# Load customer from database
customer = db.query(Customer).filter(Customer.id == int(customer_id)).first()
if not customer:
logger.warning(f"Customer not found: {customer_id}")
raise InvalidTokenException("Customer not found")
if not customer.is_active:
logger.warning(f"Inactive customer attempted access: {customer.email}")
raise InvalidTokenException("Customer account is inactive")
# Validate vendor context matches token
# This prevents using a customer token from vendor A on vendor B's shop
request_vendor = getattr(request.state, "vendor", None)
if request_vendor and token_vendor_id:
if request_vendor.id != token_vendor_id:
logger.warning(
f"Customer {customer.email} token vendor mismatch: "
f"token={token_vendor_id}, request={request_vendor.id}"
)
raise UnauthorizedVendorAccessException(
vendor_code=request_vendor.vendor_code,
user_id=customer.id,
)
logger.debug(f"Customer authenticated: {customer.email} (ID: {customer.id})")
return customer
def get_current_customer_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
customer_token: str | None = Cookie(None),
db: Session = Depends(get_db),
):
"""
Get current customer from customer_token cookie or Authorization header.
Used for shop account HTML pages (/shop/account/*) that need cookie-based auth.
Note: Public shop pages (/shop/products, etc.) don't use this dependency.
Validates that token vendor_id matches request vendor (URL-based detection).
Priority:
1. Authorization header (API calls)
2. customer_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
customer_token: Optional token from customer_token cookie
db: Database session
Returns:
Customer: Authenticated customer object
Raises:
InvalidTokenException: If no token or invalid token
UnauthorizedVendorAccessException: If vendor mismatch
"""
token, source = _get_token_from_request(
credentials, customer_token, "customer_token", str(request.url.path)
)
if not token:
logger.warning(f"Customer auth failed: No token for {request.url.path}")
raise InvalidTokenException("Customer authentication required")
return _validate_customer_token(token, request, db)
def get_current_customer_api(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
):
"""
Get current customer from Authorization header ONLY.
Used for shop API endpoints that should not accept cookies.
Validates that token vendor_id matches request vendor (URL-based detection).
Args:
request: FastAPI request (for vendor context)
credentials: Bearer token from Authorization header
db: Database session
Returns:
Customer: Authenticated customer object
Raises:
InvalidTokenException: If no token or invalid token
UnauthorizedVendorAccessException: If vendor mismatch
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
return _validate_customer_token(credentials.credentials, request, db)
# ============================================================================
# GENERIC AUTHENTICATION (for mixed-use endpoints)
# ============================================================================
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""
Get current authenticated user from Authorization header only.
Generic authentication without role checking.
Used for endpoints accessible to any authenticated user.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
User: Authenticated user (any role)
Raises:
InvalidTokenException: If no token or invalid token
"""
if not credentials:
raise InvalidTokenException("Authorization header required")
return _validate_user_token(credentials.credentials, db)
# ============================================================================
# VENDOR OWNERSHIP VERIFICATION
# ============================================================================
def get_user_vendor(
vendor_code: str,
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
db: Session = Depends(get_db),
) -> Vendor:
"""
Get vendor and verify user ownership/membership.
Ensures the current user has access to the specified vendor.
- Vendor owners can access their own vendor
- Team members can access their vendor
- Admins are BLOCKED (use admin routes instead)
Args:
vendor_code: Vendor code to look up
current_user: Current authenticated vendor user
db: Database session
Returns:
Vendor: Vendor object if user has access
Raises:
VendorNotFoundException: If vendor doesn't exist
UnauthorizedVendorAccessException: If user doesn't have access
"""
from sqlalchemy.orm import joinedload
vendor = (
db.query(Vendor)
.options(joinedload(Vendor.company))
.filter(Vendor.vendor_code == vendor_code.upper())
.first()
)
if not vendor:
raise VendorNotFoundException(vendor_code)
# Check if user owns this vendor (via company ownership)
if vendor.company and vendor.company.owner_user_id == current_user.id:
return vendor
# Check if user is team member
# TODO: Add team member check when VendorUser relationship is set up
# User doesn't have access to this vendor
raise UnauthorizedVendorAccessException(vendor_code, current_user.id)
# ============================================================================
# PERMISSIONS CHECKING
# ============================================================================
def require_vendor_permission(permission: str):
"""
Dependency factory to require a specific vendor permission.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.get("/products")
def list_products(
request: Request,
user: User = Depends(require_vendor_permission(VendorPermissions.PRODUCTS_VIEW.value))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
raise InvalidTokenException("Token missing vendor information. Please login again.")
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has permission
if not current_user.has_vendor_permission(vendor.id, permission):
raise InsufficientVendorPermissionsException(
required_permission=permission,
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_vendor_owner(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
"""
Dependency to require vendor owner role.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.delete("/team/{user_id}")
def remove_team_member(
request: Request,
user: User = Depends(require_vendor_owner)
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
raise InvalidTokenException("Token missing vendor information. Please login again.")
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
if not current_user.is_owner_of(vendor.id):
raise VendorOwnerOnlyException(
operation="team management",
vendor_code=vendor.vendor_code,
)
return current_user
def require_any_vendor_permission(*permissions: str):
"""
Dependency factory to require ANY of the specified permissions.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.get("/dashboard")
def dashboard(
request: Request,
user: User = Depends(require_any_vendor_permission(
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.REPORTS_VIEW.value
))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
raise InvalidTokenException("Token missing vendor information. Please login again.")
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has ANY of the required permissions
has_permission = any(
current_user.has_vendor_permission(vendor.id, perm) for perm in permissions
)
if not has_permission:
raise InsufficientVendorPermissionsException(
required_permission=f"Any of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_all_vendor_permissions(*permissions: str):
"""
Dependency factory to require ALL of the specified permissions.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.post("/products/bulk-delete")
def bulk_delete_products(
request: Request,
user: User = Depends(require_all_vendor_permissions(
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.PRODUCTS_DELETE.value
))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
raise InvalidTokenException("Token missing vendor information. Please login again.")
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has ALL required permissions
missing_permissions = [
perm
for perm in permissions
if not current_user.has_vendor_permission(vendor.id, perm)
]
if missing_permissions:
raise InsufficientVendorPermissionsException(
required_permission=f"All of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def get_user_permissions(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> list:
"""
Get all permissions for current user in current vendor.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
Also sets request.state.vendor for endpoint use.
Returns empty list if no vendor context in token.
"""
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
return []
vendor_id = current_user.token_vendor_id
# Load vendor from database
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# If owner, return all permissions
if current_user.is_owner_of(vendor.id):
from app.core.permissions import VendorPermissions
return [p.value for p in VendorPermissions]
# Get permissions from vendor membership
for vm in current_user.vendor_memberships:
if vm.vendor_id == vendor.id and vm.is_active:
return vm.get_all_permissions()
return []
# ============================================================================
# OPTIONAL AUTHENTICATION (For Login Page Redirects)
# ============================================================================
def get_current_admin_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User | None:
"""
Get current admin user from admin_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. admin_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
User: Authenticated admin user if valid token exists
None: If no token, invalid token, or user is not admin
"""
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is admin
if user.role == "admin":
return user
except Exception:
# Invalid token or other error
pass
return None
def get_current_vendor_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User | None:
"""
Get current vendor user from vendor_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. vendor_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
vendor_token: Optional token from vendor_token cookie
db: Database session
Returns:
User: Authenticated vendor user if valid token exists
None: If no token, invalid token, or user is not vendor
"""
token, source = _get_token_from_request(
credentials, vendor_token, "vendor_token", str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is vendor
if user.role == "vendor":
return user
except Exception:
# Invalid token or other error
pass
return None
def get_current_customer_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
customer_token: str | None = Cookie(None),
db: Session = Depends(get_db),
):
"""
Get current customer from customer_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. customer_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
customer_token: Optional token from customer_token cookie
db: Database session
Returns:
Customer: Authenticated customer if valid token exists
None: If no token, invalid token, or vendor mismatch
"""
token, source = _get_token_from_request(
credentials, customer_token, "customer_token", str(request.url.path)
)
if not token:
return None
try:
# Validate customer token (includes vendor validation)
return _validate_customer_token(token, request, db)
except Exception:
# Invalid token, vendor mismatch, or other error
return None