Files
orion/app/api/deps.py
Samir Boulahtit d7a0ff8818 refactor: complete module-driven architecture migration
This commit completes the migration to a fully module-driven architecture:

## Models Migration
- Moved all domain models from models/database/ to their respective modules:
  - tenancy: User, Admin, Vendor, Company, Platform, VendorDomain, etc.
  - cms: MediaFile, VendorTheme
  - messaging: Email, VendorEmailSettings, VendorEmailTemplate
  - core: AdminMenuConfig
- models/database/ now only contains Base and TimestampMixin (infrastructure)

## Schemas Migration
- Moved all domain schemas from models/schema/ to their respective modules:
  - tenancy: company, vendor, admin, team, vendor_domain
  - cms: media, image, vendor_theme
  - messaging: email
- models/schema/ now only contains base.py and auth.py (infrastructure)

## Routes Migration
- Moved admin routes from app/api/v1/admin/ to modules:
  - menu_config.py -> core module
  - modules.py -> tenancy module
  - module_config.py -> tenancy module
- app/api/v1/admin/ now only aggregates auto-discovered module routes

## Menu System
- Implemented module-driven menu system with MenuDiscoveryService
- Extended FrontendType enum: PLATFORM, ADMIN, VENDOR, STOREFRONT
- Added MenuItemDefinition and MenuSectionDefinition dataclasses
- Each module now defines its own menu items in definition.py
- MenuService integrates with MenuDiscoveryService for template rendering

## Documentation
- Updated docs/architecture/models-structure.md
- Updated docs/architecture/menu-management.md
- Updated architecture validation rules for new exceptions

## Architecture Validation
- Updated MOD-019 rule to allow base.py in models/schema/
- Created core module exceptions.py and schemas/ directory
- All validation errors resolved (only warnings remain)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:02:56 +01:00

1407 lines
47 KiB
Python

# app/api/deps.py
"""
Authentication dependencies for FastAPI routes.
This module provides authentication dependencies for all three contexts in the
multi-tenant application, implementing dual token storage with proper isolation:
ADMIN ROUTES (/admin/*):
- Cookie: admin_token (path=/admin) OR Authorization header
- Role: admin only
- Blocks: vendors, customers
VENDOR ROUTES (/vendor/*):
- Cookie: vendor_token (path=/vendor) OR Authorization header
- Role: vendor only
- Blocks: admins, customers
CUSTOMER/SHOP ROUTES (/shop/account/*):
- Cookie: customer_token (path=/shop) OR Authorization header
- Role: customer only
- Blocks: admins, vendors
- Note: Public shop pages (/shop/products, etc.) don't require auth
This dual authentication approach supports:
- HTML pages: Use cookies (automatic browser behavior)
- API calls: Use Authorization headers (explicit JavaScript control)
The cookie path restrictions prevent cross-context cookie leakage:
- admin_token is NEVER sent to /vendor/* or /shop/*
- vendor_token is NEVER sent to /admin/* or /shop/*
- customer_token is NEVER sent to /admin/* or /vendor/*
"""
import logging
from datetime import UTC
from fastapi import Cookie, Depends, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.modules.tenancy.exceptions import (
AdminRequiredException,
InsufficientPermissionsException,
InsufficientVendorPermissionsException,
InvalidTokenException,
UnauthorizedVendorAccessException,
VendorNotFoundException,
VendorOwnerOnlyException,
)
from app.modules.tenancy.services.vendor_service import vendor_service
from middleware.auth import AuthManager
from middleware.rate_limiter import RateLimiter
from app.modules.tenancy.models import User as UserModel
from app.modules.tenancy.models import Vendor
from models.schema.auth import UserContext
# Initialize dependencies
security = HTTPBearer(auto_error=False) # auto_error=False prevents automatic 403
auth_manager = AuthManager()
rate_limiter = RateLimiter()
logger = logging.getLogger(__name__)
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def _get_token_from_request(
credentials: HTTPAuthorizationCredentials | None,
cookie_value: str | None,
cookie_name: str,
request_path: str,
) -> tuple[str | None, str | None]:
"""
Extract token from Authorization header or cookie.
Priority:
1. Authorization header (for API calls from JavaScript)
2. Cookie (for browser page navigation)
Args:
credentials: Optional Bearer token from Authorization header
cookie_value: Optional token from cookie
cookie_name: Name of the cookie (for logging)
request_path: Request URL path (for logging)
Returns:
Tuple of (token, source) where source is "header" or "cookie"
"""
if credentials:
logger.debug(f"Token found in Authorization header for {request_path}")
return credentials.credentials, "header"
if cookie_value:
logger.debug(f"Token found in {cookie_name} cookie for {request_path}")
return cookie_value, "cookie"
return None, None
def _validate_user_token(token: str, db: Session) -> UserModel:
"""
Validate JWT token and return user.
Args:
token: JWT token string
db: Database session
Returns:
UserModel: Authenticated user object
Raises:
InvalidTokenException: If token is invalid
"""
mock_credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
return auth_manager.get_current_user(db, mock_credentials)
def _get_user_model(user_context: UserContext, db: Session) -> UserModel:
"""
Get User database model from UserContext.
Used internally by permission-checking functions that need
access to User model methods like has_vendor_permission().
Args:
user_context: UserContext schema instance
db: Database session
Returns:
UserModel: User database model
Raises:
InvalidTokenException: If user not found
"""
user = db.query(UserModel).filter(UserModel.id == user_context.id).first()
if not user:
raise InvalidTokenException("User not found")
# Copy token attributes from context to model for compatibility
if user_context.token_vendor_id:
user.token_vendor_id = user_context.token_vendor_id
if user_context.token_vendor_code:
user.token_vendor_code = user_context.token_vendor_code
if user_context.token_vendor_role:
user.token_vendor_role = user_context.token_vendor_role
return user
# ============================================================================
# ADMIN AUTHENTICATION
# ============================================================================
def get_current_admin_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current admin user from admin_token cookie or Authorization header.
Used for admin HTML pages (/admin/*) that need cookie-based auth.
Priority:
1. Authorization header (API calls)
2. admin_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
UserContext: Authenticated admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin
"""
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
logger.warning(f"Admin auth failed: No token for {request.url.path}")
raise InvalidTokenException("Admin authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is admin
if user.role != "admin":
logger.warning(
f"Non-admin user {user.username} attempted admin route: {request.url.path}"
)
raise AdminRequiredException("Admin privileges required")
return UserContext.from_user(user, include_vendor_context=False)
def get_current_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current admin user from Authorization header ONLY.
Used for admin API endpoints that should not accept cookies.
This prevents CSRF attacks on API endpoints.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
UserContext: Authenticated admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
if user.role != "admin":
logger.warning(f"Non-admin user {user.username} attempted admin API")
raise AdminRequiredException("Admin privileges required")
return UserContext.from_user(user, include_vendor_context=False)
# ============================================================================
# SUPER ADMIN AUTHENTICATION
# ============================================================================
def get_current_super_admin(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
"""
Require super admin role.
Used for: Global settings, user management, platform creation/deletion,
admin-platform assignment management.
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
UserContext: Authenticated super admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin or not super admin
"""
user_context = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db)
if not user_context.is_super_admin:
logger.warning(
f"Platform admin {user_context.username} attempted super admin route: {request.url.path}"
)
raise AdminRequiredException("Super admin privileges required")
return user_context
def get_current_super_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserContext:
"""
Require super admin role (API header only).
Used for super admin API endpoints that should not accept cookies.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
UserContext: Authenticated super admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin or not super admin
"""
user_context = get_current_admin_api(credentials, db)
if not user_context.is_super_admin:
logger.warning(f"Platform admin {user_context.username} attempted super admin API")
raise AdminRequiredException("Super admin privileges required")
return user_context
def require_platform_access(platform_id: int):
"""
Dependency factory to require admin access to a specific platform.
Super admins can access all platforms.
Platform admins can only access their assigned platforms.
Usage:
@router.get("/platforms/{platform_id}/vendors")
def list_vendors(
platform_id: int,
admin: UserContext = Depends(require_platform_access(platform_id))
):
...
"""
def _check_platform_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
# Super admins (accessible_platform_ids=None) can access all platforms
# Platform admins can only access their assigned platforms
can_access = (
user_context.accessible_platform_ids is None or
platform_id in (user_context.accessible_platform_ids or [])
)
if not can_access:
logger.warning(
f"Admin {user_context.username} denied access to platform_id={platform_id}"
)
raise InsufficientPermissionsException(
f"Access denied to platform {platform_id}"
)
return user_context
return _check_platform_access
def get_admin_with_platform_context(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get admin user and verify platform context from token.
For platform admins, extracts platform_id from JWT token and verifies access.
Stores platform in request.state.admin_platform for endpoint use.
Super admins bypass platform context check (they can access all platforms).
Note: This function needs the raw User model for token attributes and
platform access checks, so it uses _validate_user_token internally.
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
UserContext: Authenticated admin with platform context
Raises:
InvalidTokenException: If platform admin token missing platform info
InsufficientPermissionsException: If platform access revoked
"""
from app.modules.tenancy.models import Platform
# Get raw token for platform_id extraction
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
raise InvalidTokenException("Admin authentication required")
user = _validate_user_token(token, db)
if user.role != "admin":
raise AdminRequiredException("Admin privileges required")
# Super admins bypass platform context
if user.is_super_admin:
return UserContext.from_user(user, include_vendor_context=False)
# Platform admins need platform_id in token
if not hasattr(user, "token_platform_id"):
raise InvalidTokenException(
"Token missing platform information. Please select a platform."
)
platform_id = user.token_platform_id
# Verify admin still has access to this platform
if not user.can_access_platform(platform_id):
logger.warning(
f"Admin {user.username} lost access to platform_id={platform_id}"
)
raise InsufficientPermissionsException(
"Access to this platform has been revoked. Please login again."
)
# Load platform and store in request state
platform = db.query(Platform).filter(Platform.id == platform_id).first()
request.state.admin_platform = platform
return UserContext.from_user(user, include_vendor_context=False)
# ============================================================================
# MODULE-BASED ACCESS CONTROL
# ============================================================================
def require_module_access(module_code: str):
"""
Dependency factory for module-based route access control.
Checks if the specified module is enabled for the current platform.
Use this for routes that should be gated by module enablement but aren't
tied to a specific menu item.
Usage:
@router.get("/admin/billing/stripe-config")
async def stripe_config(
current_user: User = Depends(require_module_access("billing")),
):
...
Args:
module_code: Module code to check (e.g., "billing", "marketplace")
Returns:
Dependency function that validates module access and returns User
"""
from app.modules.service import module_service
def _check_module_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
# Try admin auth first, then vendor
user_context = None
platform_id = None
# Check if this is an admin request
if admin_token or (credentials and request.url.path.startswith("/admin")):
try:
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
# Get platform context for admin
if user_context.is_super_admin:
# Super admins bypass module checks
return user_context
else:
platform = getattr(request.state, "admin_platform", None)
if platform:
platform_id = platform.id
# Note: token_platform_id is not on UserContext, would need to be added
except Exception:
pass
# Check if this is a vendor request
if not user_context and (vendor_token or (credentials and "/vendor/" in request.url.path)):
try:
user_context = get_current_vendor_from_cookie_or_header(
request, credentials, vendor_token, db
)
# Get platform from vendor context
vendor = getattr(request.state, "vendor", None)
if vendor and hasattr(vendor, "platform_id") and vendor.platform_id:
platform_id = vendor.platform_id
except Exception:
pass
if not user_context:
raise InvalidTokenException("Authentication required")
# If no platform context, allow access (module checking requires platform)
if not platform_id:
logger.debug(f"No platform context for module check: {module_code}")
return user_context
# Check if module is enabled
if not module_service.is_module_enabled(db, platform_id, module_code):
logger.warning(
f"Module access denied: {module_code} disabled for "
f"platform_id={platform_id}, user={user_context.username}"
)
raise InsufficientPermissionsException(
f"The '{module_code}' module is not enabled for this platform"
)
return user_context
return _check_module_access
# ============================================================================
# MENU-BASED ACCESS CONTROL
# ============================================================================
def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
"""
Dependency factory for menu-based page route access control.
Checks if the specified menu item is visible/accessible for the current user:
- First checks if the module providing this menu item is enabled
- Then checks visibility configuration (platform or user level)
Access denied reasons:
- Module disabled: The feature module is not enabled for this platform
- Menu hidden: The menu item is hidden by platform/user configuration
Usage:
@router.get("/admin/inventory")
async def inventory_page(
current_user: User = Depends(
require_menu_access("inventory", FrontendType.ADMIN)
),
):
...
Args:
menu_item_id: Menu item identifier from registry
frontend_type: Which frontend (ADMIN or VENDOR)
Returns:
Dependency function that validates menu access and returns User
"""
from app.modules.registry import get_menu_item_module
from app.modules.service import module_service
from app.modules.core.services.menu_service import menu_service
from app.modules.enums import FrontendType as FT
def _check_menu_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
# Get current user based on frontend type
if frontend_type == FT.ADMIN:
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
if user_context.is_super_admin:
# Super admin: check user-level config
platform_id = None
user_id = user_context.id
else:
# Platform admin: need platform context
# Try to get from request state
platform = getattr(request.state, "admin_platform", None)
if platform:
platform_id = platform.id
else:
# No platform context - allow access (will be restricted elsewhere)
# This handles routes that don't have platform context yet
return user_context
user_id = None
elif frontend_type == FT.VENDOR:
user_context = get_current_vendor_from_cookie_or_header(
request, credentials, vendor_token, db
)
# Vendor: get platform from vendor's platform association
vendor = getattr(request.state, "vendor", None)
if vendor and hasattr(vendor, "platform_id") and vendor.platform_id:
platform_id = vendor.platform_id
else:
# No platform context for vendor - allow access
# This handles edge cases where vendor doesn't have platform
return user_context
user_id = None
else:
raise ValueError(f"Unsupported frontend_type: {frontend_type}")
# First check: Is the module providing this menu item enabled?
if platform_id:
module_code = get_menu_item_module(menu_item_id, frontend_type)
if module_code and not module_service.is_module_enabled(db, platform_id, module_code):
logger.warning(
f"Module access denied: {menu_item_id} (module={module_code}) for "
f"user={user_context.username}, platform_id={platform_id}"
)
raise InsufficientPermissionsException(
f"The '{module_code}' module is not enabled for this platform. "
f"Contact your administrator to enable this feature."
)
# Second check: Is the menu item visible in configuration?
can_access = menu_service.can_access_menu_item(
db, frontend_type, menu_item_id, platform_id, user_id
)
if not can_access:
logger.warning(
f"Menu visibility denied: {menu_item_id} for "
f"user={user_context.username}, frontend={frontend_type.value}, "
f"platform_id={platform_id}, user_id={user_id}"
)
raise InsufficientPermissionsException(
f"Access to '{menu_item_id}' has been restricted. "
f"Contact your administrator if you need access."
)
return user_context
return _check_menu_access
# ============================================================================
# VENDOR AUTHENTICATION
# ============================================================================
def get_current_vendor_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current vendor user from vendor_token cookie or Authorization header.
Used for vendor HTML pages (/vendor/*) that need cookie-based auth.
Priority:
1. Authorization header (API calls)
2. vendor_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
vendor_token: Optional token from vendor_token cookie
db: Database session
Returns:
UserContext: Authenticated vendor user context
Raises:
InvalidTokenException: If no token or invalid token
InsufficientPermissionsException: If user is not vendor or is admin
"""
token, source = _get_token_from_request(
credentials, vendor_token, "vendor_token", str(request.url.path)
)
if not token:
logger.warning(f"Vendor auth failed: No token for {request.url.path}")
raise InvalidTokenException("Vendor authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# CRITICAL: Block admins from vendor routes
if user.role == "admin":
logger.warning(
f"Admin user {user.username} attempted vendor route: {request.url.path}"
)
raise InsufficientPermissionsException(
"Vendor access only - admins cannot use vendor portal"
)
# Verify user is vendor
if user.role != "vendor":
logger.warning(
f"Non-vendor user {user.username} attempted vendor route: {request.url.path}"
)
raise InsufficientPermissionsException("Vendor privileges required")
return UserContext.from_user(user)
def get_current_vendor_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current vendor user from Authorization header ONLY.
Used for vendor API endpoints that should not accept cookies.
Validates that:
1. Token contains vendor context (token_vendor_id)
2. User still has access to the vendor specified in the token
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
UserContext: Authenticated vendor user context (with token_vendor_id, token_vendor_code, token_vendor_role)
Raises:
InvalidTokenException: If no token, invalid token, or missing vendor context
InsufficientPermissionsException: If user is not vendor or lost access to vendor
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
# Block admins from vendor API
if user.role == "admin":
logger.warning(f"Admin user {user.username} attempted vendor API")
raise InsufficientPermissionsException("Vendor access only")
if user.role != "vendor":
logger.warning(f"Non-vendor user {user.username} attempted vendor API")
raise InsufficientPermissionsException("Vendor privileges required")
# Require vendor context in token
if not hasattr(user, "token_vendor_id"):
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = user.token_vendor_id
# Verify user still has access to this vendor
if not user.is_member_of(vendor_id):
logger.warning(f"User {user.username} lost access to vendor_id={vendor_id}")
raise InsufficientPermissionsException(
"Access to vendor has been revoked. Please login again."
)
logger.debug(
f"Vendor API access: user={user.username}, vendor_id={vendor_id}, "
f"vendor_code={getattr(user, 'token_vendor_code', 'N/A')}"
)
return UserContext.from_user(user)
# ============================================================================
# CUSTOMER AUTHENTICATION (SHOP)
# ============================================================================
def _validate_customer_token(token: str, request: Request, db: Session):
"""
Validate customer JWT token and return CustomerContext schema.
Validates:
1. Token signature and expiration
2. Token type is "customer"
3. Customer exists and is active
4. Token vendor_id matches request vendor (URL-based)
Args:
token: JWT token string
request: FastAPI request (for vendor context)
db: Database session
Returns:
CustomerContext: Authenticated customer context schema
Raises:
InvalidTokenException: If token is invalid or expired
UnauthorizedVendorAccessException: If vendor mismatch
"""
from datetime import datetime
from jose import JWTError, jwt
from app.modules.customers.models.customer import Customer
from app.modules.customers.schemas import CustomerContext
# Decode and validate customer JWT token
try:
payload = jwt.decode(
token, auth_manager.secret_key, algorithms=[auth_manager.algorithm]
)
# Verify this is a customer token
token_type = payload.get("type")
if token_type != "customer":
logger.warning(f"Invalid token type for customer route: {token_type}")
raise InvalidTokenException("Customer authentication required")
# Get customer ID from token
customer_id: str = payload.get("sub")
if customer_id is None:
logger.warning("Token missing 'sub' (customer_id)")
raise InvalidTokenException("Invalid token")
# Verify token hasn't expired
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp, tz=UTC) < datetime.now(UTC):
logger.warning(f"Expired customer token for customer_id={customer_id}")
raise InvalidTokenException("Token has expired")
# Get vendor_id from token for validation
token_vendor_id = payload.get("vendor_id")
except JWTError as e:
logger.warning(f"JWT decode error: {str(e)}")
raise InvalidTokenException("Could not validate credentials")
# Load customer from database
customer = db.query(Customer).filter(Customer.id == int(customer_id)).first()
if not customer:
logger.warning(f"Customer not found: {customer_id}")
raise InvalidTokenException("Customer not found")
if not customer.is_active:
logger.warning(f"Inactive customer attempted access: {customer.email}")
raise InvalidTokenException("Customer account is inactive")
# Validate vendor context matches token
# This prevents using a customer token from vendor A on vendor B's shop
request_vendor = getattr(request.state, "vendor", None)
if request_vendor and token_vendor_id:
if request_vendor.id != token_vendor_id:
logger.warning(
f"Customer {customer.email} token vendor mismatch: "
f"token={token_vendor_id}, request={request_vendor.id}"
)
raise UnauthorizedVendorAccessException(
vendor_code=request_vendor.vendor_code,
user_id=customer.id,
)
logger.debug(f"Customer authenticated: {customer.email} (ID: {customer.id})")
# Return CustomerContext schema instead of database model
return CustomerContext.from_db_model(customer)
def get_current_customer_from_cookie_or_header(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
customer_token: str | None = Cookie(None),
db: Session = Depends(get_db),
):
"""
Get current customer from customer_token cookie or Authorization header.
Used for shop account HTML pages (/shop/account/*) that need cookie-based auth.
Note: Public shop pages (/shop/products, etc.) don't use this dependency.
Validates that token vendor_id matches request vendor (URL-based detection).
Priority:
1. Authorization header (API calls)
2. customer_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
customer_token: Optional token from customer_token cookie
db: Database session
Returns:
CustomerContext: Authenticated customer context schema
Raises:
InvalidTokenException: If no token or invalid token
UnauthorizedVendorAccessException: If vendor mismatch
"""
token, source = _get_token_from_request(
credentials, customer_token, "customer_token", str(request.url.path)
)
if not token:
logger.warning(f"Customer auth failed: No token for {request.url.path}")
raise InvalidTokenException("Customer authentication required")
return _validate_customer_token(token, request, db)
def get_current_customer_api(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
):
"""
Get current customer from Authorization header ONLY.
Used for shop API endpoints that should not accept cookies.
Validates that token vendor_id matches request vendor (URL-based detection).
Args:
request: FastAPI request (for vendor context)
credentials: Bearer token from Authorization header
db: Database session
Returns:
CustomerContext: Authenticated customer context schema
Raises:
InvalidTokenException: If no token or invalid token
UnauthorizedVendorAccessException: If vendor mismatch
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
return _validate_customer_token(credentials.credentials, request, db)
# ============================================================================
# GENERIC AUTHENTICATION (for mixed-use endpoints)
# ============================================================================
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserContext:
"""
Get current authenticated user from Authorization header only.
Generic authentication without role checking.
Used for endpoints accessible to any authenticated user.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
UserContext: Authenticated user context (any role)
Raises:
InvalidTokenException: If no token or invalid token
"""
if not credentials:
raise InvalidTokenException("Authorization header required")
user = _validate_user_token(credentials.credentials, db)
return UserContext.from_user(user)
# ============================================================================
# VENDOR OWNERSHIP VERIFICATION
# ============================================================================
def get_user_vendor(
vendor_code: str,
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
db: Session = Depends(get_db),
) -> Vendor:
"""
Get vendor and verify user ownership/membership.
Ensures the current user has access to the specified vendor.
- Vendor owners can access their own vendor
- Team members can access their vendor
- Admins are BLOCKED (use admin routes instead)
Args:
vendor_code: Vendor code to look up
current_user: Current authenticated vendor user
db: Database session
Returns:
Vendor: Vendor object if user has access
Raises:
VendorNotFoundException: If vendor doesn't exist
UnauthorizedVendorAccessException: If user doesn't have access
"""
from sqlalchemy.orm import joinedload
vendor = (
db.query(Vendor)
.options(joinedload(Vendor.company))
.filter(Vendor.vendor_code == vendor_code.upper())
.first()
)
if not vendor:
raise VendorNotFoundException(vendor_code)
# Check if user owns this vendor (via company ownership)
if vendor.company and vendor.company.owner_user_id == current_user.id:
return vendor
# Check if user is team member
# TODO: Add team member check when VendorUser relationship is set up
# User doesn't have access to this vendor
raise UnauthorizedVendorAccessException(vendor_code, current_user.id)
# ============================================================================
# PERMISSIONS CHECKING
# ============================================================================
def require_vendor_permission(permission: str):
"""
Dependency factory to require a specific vendor permission.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.get("/products")
def list_products(
request: Request,
user: UserContext = Depends(require_vendor_permission(VendorPermissions.PRODUCTS_VIEW.value))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has permission (need User model for this)
user_model = _get_user_model(current_user, db)
if not user_model.has_vendor_permission(vendor.id, permission):
raise InsufficientVendorPermissionsException(
required_permission=permission,
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_vendor_owner(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
"""
Dependency to require vendor owner role.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.delete("/team/{user_id}")
def remove_team_member(
request: Request,
user: UserContext = Depends(require_vendor_owner)
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Need User model for is_owner_of check
user_model = _get_user_model(current_user, db)
if not user_model.is_owner_of(vendor.id):
raise VendorOwnerOnlyException(
operation="team management",
vendor_code=vendor.vendor_code,
)
return current_user
def require_any_vendor_permission(*permissions: str):
"""
Dependency factory to require ANY of the specified permissions.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.get("/dashboard")
def dashboard(
request: Request,
user: UserContext = Depends(require_any_vendor_permission(
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.REPORTS_VIEW.value
))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has ANY of the required permissions (need User model)
user_model = _get_user_model(current_user, db)
has_permission = any(
user_model.has_vendor_permission(vendor.id, perm) for perm in permissions
)
if not has_permission:
raise InsufficientVendorPermissionsException(
required_permission=f"Any of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_all_vendor_permissions(*permissions: str):
"""
Dependency factory to require ALL of the specified permissions.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The vendor object is loaded and stored in request.state.vendor for endpoint use.
Usage:
@router.post("/products/bulk-delete")
def bulk_delete_products(
request: Request,
user: UserContext = Depends(require_all_vendor_permissions(
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.PRODUCTS_DELETE.value
))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
vendor_id = current_user.token_vendor_id
# Load vendor from database (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has ALL required permissions (need User model)
user_model = _get_user_model(current_user, db)
missing_permissions = [
perm
for perm in permissions
if not user_model.has_vendor_permission(vendor.id, perm)
]
if missing_permissions:
raise InsufficientVendorPermissionsException(
required_permission=f"All of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def get_user_permissions(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> list:
"""
Get all permissions for current user in current vendor.
Uses token_vendor_id from JWT token (authenticated vendor API pattern).
Also sets request.state.vendor for endpoint use.
Returns empty list if no vendor context in token.
"""
# Get vendor ID from JWT token
if not current_user.token_vendor_id:
return []
vendor_id = current_user.token_vendor_id
# Load vendor from database
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Need User model for ownership and membership checks
user_model = _get_user_model(current_user, db)
# If owner, return all permissions
if user_model.is_owner_of(vendor.id):
from app.core.permissions import VendorPermissions
return [p.value for p in VendorPermissions]
# Get permissions from vendor membership
for vm in user_model.vendor_memberships:
if vm.vendor_id == vendor.id and vm.is_active:
return vm.get_all_permissions()
return []
# ============================================================================
# OPTIONAL AUTHENTICATION (For Login Page Redirects)
# ============================================================================
def get_current_admin_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext | None:
"""
Get current admin user from admin_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. admin_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
UserContext: Authenticated admin user context if valid token exists
None: If no token, invalid token, or user is not admin
"""
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is admin
if user.role == "admin":
return UserContext.from_user(user, include_vendor_context=False)
except Exception:
# Invalid token or other error
pass
return None
def get_current_vendor_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> UserContext | None:
"""
Get current vendor user from vendor_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. vendor_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
vendor_token: Optional token from vendor_token cookie
db: Database session
Returns:
UserContext: Authenticated vendor user context if valid token exists
None: If no token, invalid token, or user is not vendor
"""
token, source = _get_token_from_request(
credentials, vendor_token, "vendor_token", str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is vendor
if user.role == "vendor":
return UserContext.from_user(user)
except Exception:
# Invalid token or other error
pass
return None
def get_current_customer_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
customer_token: str | None = Cookie(None),
db: Session = Depends(get_db),
):
"""
Get current customer from customer_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. customer_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
customer_token: Optional token from customer_token cookie
db: Database session
Returns:
CustomerContext: Authenticated customer context if valid token exists
None: If no token, invalid token, or vendor mismatch
"""
token, source = _get_token_from_request(
credentials, customer_token, "customer_token", str(request.url.path)
)
if not token:
return None
try:
# Validate customer token (includes vendor validation)
return _validate_customer_token(token, request, db)
except Exception:
# Invalid token, vendor mismatch, or other error
return None