Files
orion/app/api/deps.py
Samir Boulahtit cad862f469 refactor(api): introduce UserContext schema for API dependency injection
Replace direct User database model imports in API endpoints with UserContext
schema, following the architecture principle that API routes should not import
database models directly.

Changes:
- Create UserContext schema in models/schema/auth.py with from_user() factory
- Update app/api/deps.py to return UserContext from all auth dependencies
- Add _get_user_model() helper for functions needing User model access
- Update 58 API endpoint files to use UserContext instead of User
- Add noqa comments for 4 legitimate edge cases (enums, internal helpers)

Architecture validation: 0 errors (down from 61), 11 warnings remain

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 20:47:33 +01:00

1407 lines
47 KiB
Python

# app/api/deps.py
"""
Authentication dependencies for FastAPI routes.
This module provides authentication dependencies for all three contexts in the
multi-tenant application, implementing dual token storage with proper isolation:
ADMIN ROUTES (/admin/*):
- Cookie: admin_token (path=/admin) OR Authorization header
- Role: admin only
- Blocks: vendors, customers
VENDOR ROUTES (/vendor/*):
- Cookie: vendor_token (path=/vendor) OR Authorization header
- Role: vendor only
- Blocks: admins, customers
CUSTOMER/SHOP ROUTES (/shop/account/*):
- Cookie: customer_token (path=/shop) OR Authorization header
- Role: customer only
- Blocks: admins, vendors
- Note: Public shop pages (/shop/products, etc.) don't require auth
This dual authentication approach supports:
- HTML pages: Use cookies (automatic browser behavior)
- API calls: Use Authorization headers (explicit JavaScript control)
The cookie path restrictions prevent cross-context cookie leakage:
- admin_token is NEVER sent to /vendor/* or /shop/*
- vendor_token is NEVER sent to /admin/* or /shop/*
- customer_token is NEVER sent to /admin/* or /vendor/*
"""
import logging
from datetime import UTC
from fastapi import Cookie, Depends, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.exceptions import (
AdminRequiredException,
InsufficientPermissionsException,
InsufficientVendorPermissionsException,
InvalidTokenException,
UnauthorizedVendorAccessException,
VendorNotFoundException,
VendorOwnerOnlyException,
)
from app.services.vendor_service import vendor_service
from middleware.auth import AuthManager
from middleware.rate_limiter import RateLimiter
from models.database.user import User as UserModel
from models.database.vendor import Vendor
from models.schema.auth import UserContext
# Initialize dependencies
security = HTTPBearer(auto_error=False) # auto_error=False prevents automatic 403
auth_manager = AuthManager()
rate_limiter = RateLimiter()
logger = logging.getLogger(__name__)
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def _get_token_from_request(
credentials: HTTPAuthorizationCredentials | None,
cookie_value: str | None,
cookie_name: str,
request_path: str,
) -> tuple[str | None, str | None]:
"""
Extract token from Authorization header or cookie.
Priority:
1. Authorization header (for API calls from JavaScript)
2. Cookie (for browser page navigation)
Args:
credentials: Optional Bearer token from Authorization header
cookie_value: Optional token from cookie
cookie_name: Name of the cookie (for logging)
request_path: Request URL path (for logging)
Returns:
Tuple of (token, source) where source is "header" or "cookie"
"""
if credentials:
logger.debug(f"Token found in Authorization header for {request_path}")
return credentials.credentials, "header"
if cookie_value:
logger.debug(f"Token found in {cookie_name} cookie for {request_path}")
return cookie_value, "cookie"
return None, None
def _validate_user_token(token: str, db: Session) -> UserModel:
"""
Validate JWT token and return user.
Args:
token: JWT token string
db: Database session
Returns:
UserModel: Authenticated user object
Raises:
InvalidTokenException: If token is invalid
"""
mock_credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
return auth_manager.get_current_user(db, mock_credentials)
def _get_user_model(user_context: UserContext, db: Session) -> UserModel:
"""
Get User database model from UserContext.
Used internally by permission-checking functions that need
access to User model methods like has_vendor_permission().
Args:
user_context: UserContext schema instance
db: Database session
Returns:
UserModel: User database model
Raises:
InvalidTokenException: If user not found
"""
user = db.query(UserModel).filter(UserModel.id == user_context.id).first()
if not user:
raise InvalidTokenException("User not found")
# Copy token attributes from context to model for compatibility
if user_context.token_vendor_id:
user.token_vendor_id = user_context.token_vendor_id
if user_context.token_vendor_code:
user.token_vendor_code = user_context.token_vendor_code
if user_context.token_vendor_role:
user.token_vendor_role = user_context.token_vendor_role
return user
# ============================================================================
# ADMIN AUTHENTICATION
# ============================================================================
def get_current_admin_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current admin user from admin_token cookie or Authorization header.
Used for admin HTML pages (/admin/*) that need cookie-based auth.
Priority:
1. Authorization header (API calls)
2. admin_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
UserContext: Authenticated admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin
"""
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
logger.warning(f"Admin auth failed: No token for {request.url.path}")
raise InvalidTokenException("Admin authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is admin
if user.role != "admin":
logger.warning(
f"Non-admin user {user.username} attempted admin route: {request.url.path}"
)
raise AdminRequiredException("Admin privileges required")
return UserContext.from_user(user, include_vendor_context=False)
def get_current_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current admin user from Authorization header ONLY.
Used for admin API endpoints that should not accept cookies.
This prevents CSRF attacks on API endpoints.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
UserContext: Authenticated admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
if user.role != "admin":
logger.warning(f"Non-admin user {user.username} attempted admin API")
raise AdminRequiredException("Admin privileges required")
return UserContext.from_user(user, include_vendor_context=False)
# ============================================================================
# SUPER ADMIN AUTHENTICATION
# ============================================================================
def get_current_super_admin(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
"""
Require super admin role.
Used for: Global settings, user management, platform creation/deletion,
admin-platform assignment management.
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
UserContext: Authenticated super admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin or not super admin
"""
user_context = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db)
if not user_context.is_super_admin:
logger.warning(
f"Platform admin {user_context.username} attempted super admin route: {request.url.path}"
)
raise AdminRequiredException("Super admin privileges required")
return user_context
def get_current_super_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserContext:
"""
Require super admin role (API header only).
Used for super admin API endpoints that should not accept cookies.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
UserContext: Authenticated super admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin or not super admin
"""
user_context = get_current_admin_api(credentials, db)
if not user_context.is_super_admin:
logger.warning(f"Platform admin {user_context.username} attempted super admin API")
raise AdminRequiredException("Super admin privileges required")
return user_context
def require_platform_access(platform_id: int):
"""
Dependency factory to require admin access to a specific platform.
Super admins can access all platforms.
Platform admins can only access their assigned platforms.
Usage:
@router.get("/platforms/{platform_id}/vendors")
def list_vendors(
platform_id: int,
admin: UserContext = Depends(require_platform_access(platform_id))
):
...
"""
def _check_platform_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
# Super admins (accessible_platform_ids=None) can access all platforms
# Platform admins can only access their assigned platforms
can_access = (
user_context.accessible_platform_ids is None or
platform_id in (user_context.accessible_platform_ids or [])
)
if not can_access:
logger.warning(
f"Admin {user_context.username} denied access to platform_id={platform_id}"
)
raise InsufficientPermissionsException(
f"Access denied to platform {platform_id}"
)
return user_context
return _check_platform_access
def get_admin_with_platform_context(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get admin user and verify platform context from token.
For platform admins, extracts platform_id from JWT token and verifies access.
Stores platform in request.state.admin_platform for endpoint use.
Super admins bypass platform context check (they can access all platforms).
Note: This function needs the raw User model for token attributes and
platform access checks, so it uses _validate_user_token internally.
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
UserContext: Authenticated admin with platform context
Raises:
InvalidTokenException: If platform admin token missing platform info
InsufficientPermissionsException: If platform access revoked
"""
from models.database.platform import Platform
# Get raw token for platform_id extraction
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
raise InvalidTokenException("Admin authentication required")
user = _validate_user_token(token, db)
if user.role != "admin":
raise AdminRequiredException("Admin privileges required")
# Super admins bypass platform context
if user.is_super_admin:
return UserContext.from_user(user, include_vendor_context=False)
# Platform admins need platform_id in token
if not hasattr(user, "token_platform_id"):
raise InvalidTokenException(
"Token missing platform information. Please select a platform."
)
platform_id = user.token_platform_id
# Verify admin still has access to this platform
if not user.can_access_platform(platform_id):
logger.warning(
f"Admin {user.username} lost access to platform_id={platform_id}"
)
raise InsufficientPermissionsException(
"Access to this platform has been revoked. Please login again."
)
# Load platform and store in request state
platform = db.query(Platform).filter(Platform.id == platform_id).first()
request.state.admin_platform = platform
return UserContext.from_user(user, include_vendor_context=False)
# ============================================================================
# MODULE-BASED ACCESS CONTROL
# ============================================================================
def require_module_access(module_code: str):
"""
Dependency factory for module-based route access control.
Checks if the specified module is enabled for the current platform.
Use this for routes that should be gated by module enablement but aren't
tied to a specific menu item.
Usage:
@router.get("/admin/billing/stripe-config")
async def stripe_config(
current_user: User = Depends(require_module_access("billing")),
):
...
Args:
module_code: Module code to check (e.g., "billing", "marketplace")
Returns:
Dependency function that validates module access and returns User
"""
from app.modules.service import module_service
def _check_module_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
# Try admin auth first, then vendor
user_context = None
platform_id = None
# Check if this is an admin request
if admin_token or (credentials and request.url.path.startswith("/admin")):
try:
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
# Get platform context for admin
if user_context.is_super_admin:
# Super admins bypass module checks
return user_context
else:
platform = getattr(request.state, "admin_platform", None)
if platform:
platform_id = platform.id
# Note: token_platform_id is not on UserContext, would need to be added
except Exception:
pass
# Check if this is a vendor request
if not user_context and (vendor_token or (credentials and "/vendor/" in request.url.path)):
try:
user_context = get_current_vendor_from_cookie_or_header(
request, credentials, vendor_token, db
)
# Get platform from vendor context
vendor = getattr(request.state, "vendor", None)
if vendor and hasattr(vendor, "platform_id") and vendor.platform_id:
platform_id = vendor.platform_id
except Exception:
pass
if not user_context:
raise InvalidTokenException("Authentication required")
# If no platform context, allow access (module checking requires platform)
if not platform_id:
logger.debug(f"No platform context for module check: {module_code}")
return user_context
# Check if module is enabled
if not module_service.is_module_enabled(db, platform_id, module_code):
logger.warning(
f"Module access denied: {module_code} disabled for "
f"platform_id={platform_id}, user={user_context.username}"
)
raise InsufficientPermissionsException(
f"The '{module_code}' module is not enabled for this platform"
)
return user_context
return _check_module_access
# ============================================================================
# MENU-BASED ACCESS CONTROL
# ============================================================================
def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
"""
Dependency factory for menu-based page route access control.
Checks if the specified menu item is visible/accessible for the current user:
- First checks if the module providing this menu item is enabled
- Then checks visibility configuration (platform or user level)
Access denied reasons:
- Module disabled: The feature module is not enabled for this platform
- Menu hidden: The menu item is hidden by platform/user configuration
Usage:
@router.get("/admin/inventory")
async def inventory_page(
current_user: User = Depends(
require_menu_access("inventory", FrontendType.ADMIN)
),
):
...
Args:
menu_item_id: Menu item identifier from registry
frontend_type: Which frontend (ADMIN or VENDOR)
Returns:
Dependency function that validates menu access and returns User
"""
from app.modules.registry import get_menu_item_module
from app.modules.service import module_service
from app.services.menu_service import menu_service
from models.database.admin_menu_config import FrontendType as FT
def _check_menu_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
# Get current user based on frontend type
if frontend_type == FT.ADMIN:
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
if user_context.is_super_admin:
# Super admin: check user-level config
platform_id = None
user_id = user_context.id
else:
# Platform admin: need platform context
# Try to get from request state
platform = getattr(request.state, "admin_platform", None)
if platform:
platform_id = platform.id
else:
# No platform context - allow access (will be restricted elsewhere)
# This handles routes that don't have platform context yet
return user_context
user_id = None
elif frontend_type == FT.VENDOR:
user_context = get_current_vendor_from_cookie_or_header(
request, credentials, vendor_token, db
)
# Vendor: get platform from vendor's platform association
vendor = getattr(request.state, "vendor", None)
if vendor and hasattr(vendor, "platform_id") and vendor.platform_id:
platform_id = vendor.platform_id
else:
# No platform context for vendor - allow access
# This handles edge cases where vendor doesn't have platform
return user_context
user_id = None
else:
raise ValueError(f"Unsupported frontend_type: {frontend_type}")
# First check: Is the module providing this menu item enabled?
if platform_id:
module_code = get_menu_item_module(menu_item_id, frontend_type)
if module_code and not module_service.is_module_enabled(db, platform_id, module_code):
logger.warning(
f"Module access denied: {menu_item_id} (module={module_code}) for "
f"user={user_context.username}, platform_id={platform_id}"
)
raise InsufficientPermissionsException(
f"The '{module_code}' module is not enabled for this platform. "
f"Contact your administrator to enable this feature."
)
# Second check: Is the menu item visible in configuration?
can_access = menu_service.can_access_menu_item(
db, frontend_type, menu_item_id, platform_id, user_id
)
if not can_access:
logger.warning(
f"Menu visibility denied: {menu_item_id} for "
f"user={user_context.username}, frontend={frontend_type.value}, "
f"platform_id={platform_id}, user_id={user_id}"
)
raise InsufficientPermissionsException(
f"Access to '{menu_item_id}' has been restricted. "
f"Contact your administrator if you need access."
)
return user_context
return _check_menu_access
# ============================================================================
# VENDOR AUTHENTICATION
# ============================================================================
def get_current_vendor_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current vendor user from vendor_token cookie or Authorization header.
Used for vendor HTML pages (/vendor/*) that need cookie-based auth.
Priority:
1. Authorization header (API calls)
2. vendor_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
vendor_token: Optional token from vendor_token cookie
db: Database session
Returns:
UserContext: Authenticated vendor user context
Raises:
InvalidTokenException: If no token or invalid token
InsufficientPermissionsException: If user is not vendor or is admin
"""
token, source = _get_token_from_request(
credentials, vendor_token, "vendor_token", str(request.url.path)
)
if not token:
logger.warning(f"Vendor auth failed: No token for {request.url.path}")
raise InvalidTokenException("Vendor authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# CRITICAL: Block admins from vendor routes
if user.role == "admin":
logger.warning(
f"Admin user {user.username} attempted vendor route: {request.url.path}"
)
raise InsufficientPermissionsException(
"Vendor access only - admins cannot use vendor portal"
)
# Verify user is vendor
if user.role != "vendor":
logger.warning(
f"Non-vendor user {user.username} attempted vendor route: {request.url.path}"
)
raise InsufficientPermissionsException("Vendor privileges required")
return UserContext.from_user(user)
def get_current_vendor_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current vendor user from Authorization header ONLY.
Used for vendor API endpoints that should not accept cookies.
Validates that:
1. Token contains vendor context (token_vendor_id)
2. User still has access to the vendor specified in the token
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
UserContext: Authenticated vendor user context (with token_vendor_id, token_vendor_code, token_vendor_role)
Raises:
InvalidTokenException: If no token, invalid token, or missing vendor context
InsufficientPermissionsException: If user is not vendor or lost access to vendor
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
# Block admins from vendor API
if user.role == "admin":
logger.warning(f"Admin user {user.username} attempted vendor API")
raise InsufficientPermissionsException("Vendor access only")
if user.role != "vendor":
logger.warning(f"Non-vendor user {user.username} attempted vendor API")
raise InsufficientPermissionsException("Vendor privileges required")
# Require vendor context in token
if not hasattr(user, "token_vendor_id"):
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = user.token_vendor_id
# Verify user still has access to this vendor
if not user.is_member_of(vendor_id):
logger.warning(f"User {user.username} lost access to vendor_id={vendor_id}")
raise InsufficientPermissionsException(
"Access to vendor has been revoked. Please login again."
)
logger.debug(
f"Vendor API access: user={user.username}, vendor_id={vendor_id}, "
f"vendor_code={getattr(user, 'token_vendor_code', 'N/A')}"
)
return UserContext.from_user(user)
# ============================================================================
# CUSTOMER AUTHENTICATION (SHOP)
# ============================================================================
def _validate_customer_token(token: str, request: Request, db: Session):
"""
Validate customer JWT token and return CustomerContext schema.
Validates:
1. Token signature and expiration
2. Token type is "customer"
3. Customer exists and is active
4. Token vendor_id matches request vendor (URL-based)
Args:
token: JWT token string
request: FastAPI request (for vendor context)
db: Database session
Returns:
CustomerContext: Authenticated customer context schema
Raises:
InvalidTokenException: If token is invalid or expired
UnauthorizedVendorAccessException: If vendor mismatch
"""
from datetime import datetime
from jose import JWTError, jwt
from app.modules.customers.models.customer import Customer
from app.modules.customers.schemas import CustomerContext
# Decode and validate customer JWT token
try:
payload = jwt.decode(
token, auth_manager.secret_key, algorithms=[auth_manager.algorithm]
)
# Verify this is a customer token
token_type = payload.get("type")
if token_type != "customer":
logger.warning(f"Invalid token type for customer route: {token_type}")
raise InvalidTokenException("Customer authentication required")
# Get customer ID from token
customer_id: str = payload.get("sub")
if customer_id is None:
logger.warning("Token missing 'sub' (customer_id)")
raise InvalidTokenException("Invalid token")
# Verify token hasn't expired
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp, tz=UTC) < datetime.now(UTC):
logger.warning(f"Expired customer token for customer_id={customer_id}")
raise InvalidTokenException("Token has expired")
# Get vendor_id from token for validation
token_vendor_id = payload.get("vendor_id")
except JWTError as e:
logger.warning(f"JWT decode error: {str(e)}")
raise InvalidTokenException("Could not validate credentials")
# Load customer from database
customer = db.query(Customer).filter(Customer.id == int(customer_id)).first()
if not customer:
logger.warning(f"Customer not found: {customer_id}")
raise InvalidTokenException("Customer not found")
if not customer.is_active:
logger.warning(f"Inactive customer attempted access: {customer.email}")
raise InvalidTokenException("Customer account is inactive")
# Validate vendor context matches token
# This prevents using a customer token from vendor A on vendor B's shop
request_vendor = getattr(request.state, "vendor", None)
if request_vendor and token_vendor_id:
if request_vendor.id != token_vendor_id:
logger.warning(
f"Customer {customer.email} token vendor mismatch: "
f"token={token_vendor_id}, request={request_vendor.id}"
)
raise UnauthorizedVendorAccessException(
vendor_code=request_vendor.vendor_code,
user_id=customer.id,
)
logger.debug(f"Customer authenticated: {customer.email} (ID: {customer.id})")
# Return CustomerContext schema instead of database model
return CustomerContext.from_db_model(customer)
def get_current_customer_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
customer_token: str | None = Cookie(None),
db: Session = Depends(get_db),
):
"""
Get current customer from customer_token cookie or Authorization header.
Used for shop account HTML pages (/shop/account/*) that need cookie-based auth.
Note: Public shop pages (/shop/products, etc.) don't use this dependency.
Validates that token vendor_id matches request vendor (URL-based detection).
Priority:
1. Authorization header (API calls)
2. customer_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
customer_token: Optional token from customer_token cookie
db: Database session
Returns:
CustomerContext: Authenticated customer context schema
Raises:
InvalidTokenException: If no token or invalid token
UnauthorizedVendorAccessException: If vendor mismatch
"""
token, source = _get_token_from_request(
credentials, customer_token, "customer_token", str(request.url.path)
)
if not token:
logger.warning(f"Customer auth failed: No token for {request.url.path}")
raise InvalidTokenException("Customer authentication required")
return _validate_customer_token(token, request, db)
def get_current_customer_api(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
):
"""
Get current customer from Authorization header ONLY.
Used for shop API endpoints that should not accept cookies.
Validates that token vendor_id matches request vendor (URL-based detection).
Args:
request: FastAPI request (for vendor context)
credentials: Bearer token from Authorization header
db: Database session
Returns:
CustomerContext: Authenticated customer context schema
Raises:
InvalidTokenException: If no token or invalid token
UnauthorizedVendorAccessException: If vendor mismatch
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
return _validate_customer_token(credentials.credentials, request, db)
# ============================================================================
# GENERIC AUTHENTICATION (for mixed-use endpoints)
# ============================================================================
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current authenticated user from Authorization header only.
Generic authentication without role checking.
Used for endpoints accessible to any authenticated user.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
UserContext: Authenticated user context (any role)
Raises:
InvalidTokenException: If no token or invalid token
"""
if not credentials:
raise InvalidTokenException("Authorization header required")
user = _validate_user_token(credentials.credentials, db)
return UserContext.from_user(user)
# ============================================================================
# VENDOR OWNERSHIP VERIFICATION
# ============================================================================
def get_user_vendor(
vendor_code: str,
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
db: Session = Depends(get_db),
) -> Vendor:
"""
Get vendor and verify user ownership/membership.
Ensures the current user has access to the specified vendor.
- Vendor owners can access their own vendor
- Team members can access their vendor
- Admins are BLOCKED (use admin routes instead)
Args:
vendor_code: Vendor code to look up
current_user: Current authenticated vendor user
db: Database session
Returns:
Vendor: Vendor object if user has access
Raises:
VendorNotFoundException: If vendor doesn't exist
UnauthorizedVendorAccessException: If user doesn't have access
"""
from sqlalchemy.orm import joinedload
vendor = (
db.query(Vendor)
.options(joinedload(Vendor.company))
.filter(Vendor.vendor_code == vendor_code.upper())
.first()
)
if not vendor:
raise VendorNotFoundException(vendor_code)
# Check if user owns this vendor (via company ownership)
if vendor.company and vendor.company.owner_user_id == current_user.id:
return vendor
# Check if user is team member
# TODO: Add team member check when VendorUser relationship is set up
# User doesn't have access to this vendor
raise UnauthorizedVendorAccessException(vendor_code, current_user.id)
# ============================================================================
# PERMISSIONS CHECKING
# ============================================================================
def require_vendor_permission(permission: str):
"""
Dependency factory to require a specific vendor permission.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.get("/products")
def list_products(
request: Request,
user: UserContext = Depends(require_vendor_permission(VendorPermissions.PRODUCTS_VIEW.value))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has permission (need User model for this)
user_model = _get_user_model(current_user, db)
if not user_model.has_vendor_permission(vendor.id, permission):
raise InsufficientVendorPermissionsException(
required_permission=permission,
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_vendor_owner(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
"""
Dependency to require vendor owner role.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.delete("/team/{user_id}")
def remove_team_member(
request: Request,
user: UserContext = Depends(require_vendor_owner)
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Need User model for is_owner_of check
user_model = _get_user_model(current_user, db)
if not user_model.is_owner_of(vendor.id):
raise VendorOwnerOnlyException(
operation="team management",
vendor_code=vendor.vendor_code,
)
return current_user
def require_any_vendor_permission(*permissions: str):
"""
Dependency factory to require ANY of the specified permissions.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.get("/dashboard")
def dashboard(
request: Request,
user: UserContext = Depends(require_any_vendor_permission(
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.REPORTS_VIEW.value
))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has ANY of the required permissions (need User model)
user_model = _get_user_model(current_user, db)
has_permission = any(
user_model.has_vendor_permission(vendor.id, perm) for perm in permissions
)
if not has_permission:
raise InsufficientVendorPermissionsException(
required_permission=f"Any of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_all_vendor_permissions(*permissions: str):
"""
Dependency factory to require ALL of the specified permissions.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.post("/products/bulk-delete")
def bulk_delete_products(
request: Request,
user: UserContext = Depends(require_all_vendor_permissions(
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.PRODUCTS_DELETE.value
))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has ALL required permissions (need User model)
user_model = _get_user_model(current_user, db)
missing_permissions = [
perm
for perm in permissions
if not user_model.has_vendor_permission(vendor.id, perm)
]
if missing_permissions:
raise InsufficientVendorPermissionsException(
required_permission=f"All of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def get_user_permissions(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> list:
"""
Get all permissions for current user in current vendor.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
Also sets request.state.vendor for endpoint use.
Returns empty list if no vendor context in token.
"""
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
return []
vendor_id = current_user.token_vendor_id
# Load vendor from database
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Need User model for ownership and membership checks
user_model = _get_user_model(current_user, db)
# If owner, return all permissions
if user_model.is_owner_of(vendor.id):
from app.core.permissions import VendorPermissions
return [p.value for p in VendorPermissions]
# Get permissions from vendor membership
for vm in user_model.vendor_memberships:
if vm.vendor_id == vendor.id and vm.is_active:
return vm.get_all_permissions()
return []
# ============================================================================
# OPTIONAL AUTHENTICATION (For Login Page Redirects)
# ============================================================================
def get_current_admin_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext | None:
"""
Get current admin user from admin_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. admin_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
UserContext: Authenticated admin user context if valid token exists
None: If no token, invalid token, or user is not admin
"""
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is admin
if user.role == "admin":
return UserContext.from_user(user, include_vendor_context=False)
except Exception:
# Invalid token or other error
pass
return None
def get_current_vendor_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext | None:
"""
Get current vendor user from vendor_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. vendor_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
vendor_token: Optional token from vendor_token cookie
db: Database session
Returns:
UserContext: Authenticated vendor user context if valid token exists
None: If no token, invalid token, or user is not vendor
"""
token, source = _get_token_from_request(
credentials, vendor_token, "vendor_token", str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is vendor
if user.role == "vendor":
return UserContext.from_user(user)
except Exception:
# Invalid token or other error
pass
return None
def get_current_customer_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
customer_token: str | None = Cookie(None),
db: Session = Depends(get_db),
):
"""
Get current customer from customer_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. customer_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
customer_token: Optional token from customer_token cookie
db: Database session
Returns:
CustomerContext: Authenticated customer context if valid token exists
None: If no token, invalid token, or vendor mismatch
"""
token, source = _get_token_from_request(
credentials, customer_token, "customer_token", str(request.url.path)
)
if not token:
return None
try:
# Validate customer token (includes vendor validation)
return _validate_customer_token(token, request, db)
except Exception:
# Invalid token, vendor mismatch, or other error
return None