Files
orion/app/api/deps.py

831 lines
25 KiB
Python

# app/api/deps.py
"""
Authentication dependencies for FastAPI routes.
This module provides authentication dependencies for all three contexts in the
multi-tenant application, implementing dual token storage with proper isolation:
ADMIN ROUTES (/admin/*):
- Cookie: admin_token (path=/admin) OR Authorization header
- Role: admin only
- Blocks: vendors, customers
VENDOR ROUTES (/vendor/*):
- Cookie: vendor_token (path=/vendor) OR Authorization header
- Role: vendor only
- Blocks: admins, customers
CUSTOMER/SHOP ROUTES (/shop/account/*):
- Cookie: customer_token (path=/shop) OR Authorization header
- Role: customer only
- Blocks: admins, vendors
- Note: Public shop pages (/shop/products, etc.) don't require auth
This dual authentication approach supports:
- HTML pages: Use cookies (automatic browser behavior)
- API calls: Use Authorization headers (explicit JavaScript control)
The cookie path restrictions prevent cross-context cookie leakage:
- admin_token is NEVER sent to /vendor/* or /shop/*
- vendor_token is NEVER sent to /admin/* or /shop/*
- customer_token is NEVER sent to /admin/* or /vendor/*
"""
import logging
from typing import Optional
from fastapi import Depends, Request, Cookie
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from app.core.database import get_db
from middleware.auth import AuthManager
from middleware.rate_limiter import RateLimiter
from models.database.vendor import Vendor
from models.database.user import User
from app.exceptions import (
AdminRequiredException,
InvalidTokenException,
InsufficientPermissionsException,
VendorNotFoundException,
UnauthorizedVendorAccessException
)
# Initialize dependencies
security = HTTPBearer(auto_error=False) # auto_error=False prevents automatic 403
auth_manager = AuthManager()
rate_limiter = RateLimiter()
logger = logging.getLogger(__name__)
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def _get_token_from_request(
credentials: Optional[HTTPAuthorizationCredentials],
cookie_value: Optional[str],
cookie_name: str,
request_path: str
) -> tuple[Optional[str], Optional[str]]:
"""
Extract token from Authorization header or cookie.
Priority:
1. Authorization header (for API calls from JavaScript)
2. Cookie (for browser page navigation)
Args:
credentials: Optional Bearer token from Authorization header
cookie_value: Optional token from cookie
cookie_name: Name of the cookie (for logging)
request_path: Request URL path (for logging)
Returns:
Tuple of (token, source) where source is "header" or "cookie"
"""
if credentials:
logger.debug(f"Token found in Authorization header for {request_path}")
return credentials.credentials, "header"
elif cookie_value:
logger.debug(f"Token found in {cookie_name} cookie for {request_path}")
return cookie_value, "cookie"
return None, None
def _validate_user_token(token: str, db: Session) -> User:
"""
Validate JWT token and return user.
Args:
token: JWT token string
db: Database session
Returns:
User: Authenticated user object
Raises:
InvalidTokenException: If token is invalid
"""
mock_credentials = HTTPAuthorizationCredentials(
scheme="Bearer",
credentials=token
)
return auth_manager.get_current_user(db, mock_credentials)
# ============================================================================
# ADMIN AUTHENTICATION
# ============================================================================
def get_current_admin_from_cookie_or_header(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
admin_token: Optional[str] = Cookie(None),
db: Session = Depends(get_db),
) -> User:
"""
Get current admin user from admin_token cookie or Authorization header.
Used for admin HTML pages (/admin/*) that need cookie-based auth.
Priority:
1. Authorization header (API calls)
2. admin_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
User: Authenticated admin user
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin
"""
token, source = _get_token_from_request(
credentials,
admin_token,
"admin_token",
str(request.url.path)
)
if not token:
logger.warning(f"Admin auth failed: No token for {request.url.path}")
raise InvalidTokenException("Admin authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is admin
if user.role != "admin":
logger.warning(
f"Non-admin user {user.username} attempted admin route: {request.url.path}"
)
raise AdminRequiredException("Admin privileges required")
return user
def get_current_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""
Get current admin user from Authorization header ONLY.
Used for admin API endpoints that should not accept cookies.
This prevents CSRF attacks on API endpoints.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
User: Authenticated admin user
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
if user.role != "admin":
logger.warning(f"Non-admin user {user.username} attempted admin API")
raise AdminRequiredException("Admin privileges required")
return user
# ============================================================================
# VENDOR AUTHENTICATION
# ============================================================================
def get_current_vendor_from_cookie_or_header(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
vendor_token: Optional[str] = Cookie(None),
db: Session = Depends(get_db),
) -> User:
"""
Get current vendor user from vendor_token cookie or Authorization header.
Used for vendor HTML pages (/vendor/*) that need cookie-based auth.
Priority:
1. Authorization header (API calls)
2. vendor_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
vendor_token: Optional token from vendor_token cookie
db: Database session
Returns:
User: Authenticated vendor user
Raises:
InvalidTokenException: If no token or invalid token
InsufficientPermissionsException: If user is not vendor or is admin
"""
token, source = _get_token_from_request(
credentials,
vendor_token,
"vendor_token",
str(request.url.path)
)
if not token:
logger.warning(f"Vendor auth failed: No token for {request.url.path}")
raise InvalidTokenException("Vendor authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# CRITICAL: Block admins from vendor routes
if user.role == "admin":
logger.warning(
f"Admin user {user.username} attempted vendor route: {request.url.path}"
)
raise InsufficientPermissionsException(
"Vendor access only - admins cannot use vendor portal"
)
# Verify user is vendor
if user.role != "vendor":
logger.warning(
f"Non-vendor user {user.username} attempted vendor route: {request.url.path}"
)
raise InsufficientPermissionsException("Vendor privileges required")
return user
def get_current_vendor_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""
Get current vendor user from Authorization header ONLY.
Used for vendor API endpoints that should not accept cookies.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
User: Authenticated vendor user
Raises:
InvalidTokenException: If no token or invalid token
InsufficientPermissionsException: If user is not vendor or is admin
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
# Block admins from vendor API
if user.role == "admin":
logger.warning(f"Admin user {user.username} attempted vendor API")
raise InsufficientPermissionsException("Vendor access only")
if user.role != "vendor":
logger.warning(f"Non-vendor user {user.username} attempted vendor API")
raise InsufficientPermissionsException("Vendor privileges required")
return user
# ============================================================================
# CUSTOMER AUTHENTICATION (SHOP)
# ============================================================================
def get_current_customer_from_cookie_or_header(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
customer_token: Optional[str] = Cookie(None),
db: Session = Depends(get_db),
) -> User:
"""
Get current customer user from customer_token cookie or Authorization header.
Used for shop account HTML pages (/shop/account/*) that need cookie-based auth.
Note: Public shop pages (/shop/products, etc.) don't use this dependency.
Priority:
1. Authorization header (API calls)
2. customer_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
customer_token: Optional token from customer_token cookie
db: Database session
Returns:
User: Authenticated customer user
Raises:
InvalidTokenException: If no token or invalid token
InsufficientPermissionsException: If user is not customer (admin/vendor blocked)
"""
token, source = _get_token_from_request(
credentials,
customer_token,
"customer_token",
str(request.url.path)
)
if not token:
logger.warning(f"Customer auth failed: No token for {request.url.path}")
raise InvalidTokenException("Customer authentication required")
# Validate token and get user
user = _validate_user_token(token, db)
# CRITICAL: Block admins from customer routes
if user.role == "admin":
logger.warning(
f"Admin user {user.username} attempted shop account: {request.url.path}"
)
raise InsufficientPermissionsException(
"Customer access only - admins cannot use shop"
)
# CRITICAL: Block vendors from customer routes
if user.role == "vendor":
logger.warning(
f"Vendor user {user.username} attempted shop account: {request.url.path}"
)
raise InsufficientPermissionsException(
"Customer access only - vendors cannot use shop"
)
# Verify user is customer
if user.role != "customer":
logger.warning(
f"Non-customer user {user.username} attempted shop account: {request.url.path}"
)
raise InsufficientPermissionsException("Customer privileges required")
return user
def get_current_customer_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""
Get current customer user from Authorization header ONLY.
Used for shop API endpoints that should not accept cookies.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
User: Authenticated customer user
Raises:
InvalidTokenException: If no token or invalid token
InsufficientPermissionsException: If user is not customer (admin/vendor blocked)
"""
if not credentials:
raise InvalidTokenException("Authorization header required for API calls")
user = _validate_user_token(credentials.credentials, db)
# Block admins from customer API
if user.role == "admin":
logger.warning(f"Admin user {user.username} attempted customer API")
raise InsufficientPermissionsException("Customer access only")
# Block vendors from customer API
if user.role == "vendor":
logger.warning(f"Vendor user {user.username} attempted customer API")
raise InsufficientPermissionsException("Customer access only")
if user.role != "customer":
logger.warning(f"Non-customer user {user.username} attempted customer API")
raise InsufficientPermissionsException("Customer privileges required")
return user
# ============================================================================
# GENERIC AUTHENTICATION (for mixed-use endpoints)
# ============================================================================
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""
Get current authenticated user from Authorization header only.
Generic authentication without role checking.
Used for endpoints accessible to any authenticated user.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
User: Authenticated user (any role)
Raises:
InvalidTokenException: If no token or invalid token
"""
if not credentials:
raise InvalidTokenException("Authorization header required")
return _validate_user_token(credentials.credentials, db)
# ============================================================================
# VENDOR OWNERSHIP VERIFICATION
# ============================================================================
def get_user_vendor(
vendor_code: str,
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
db: Session = Depends(get_db),
) -> Vendor:
"""
Get vendor and verify user ownership/membership.
Ensures the current user has access to the specified vendor.
- Vendor owners can access their own vendor
- Team members can access their vendor
- Admins are BLOCKED (use admin routes instead)
Args:
vendor_code: Vendor code to look up
current_user: Current authenticated vendor user
db: Database session
Returns:
Vendor: Vendor object if user has access
Raises:
VendorNotFoundException: If vendor doesn't exist
UnauthorizedVendorAccessException: If user doesn't have access
"""
vendor = db.query(Vendor).filter(
Vendor.vendor_code == vendor_code.upper()
).first()
if not vendor:
raise VendorNotFoundException(vendor_code)
# Check if user owns this vendor
if vendor.owner_user_id == current_user.id:
return vendor
# Check if user is team member
# TODO: Add team member check when VendorUser relationship is set up
# User doesn't have access to this vendor
raise UnauthorizedVendorAccessException(vendor_code, current_user.id)
# ============================================================================
# PERMISSIONS CHECKING
# ============================================================================
def require_vendor_permission(permission: str):
"""
Dependency factory to require a specific vendor permission.
Usage:
@router.get("/products")
def list_products(
vendor: Vendor = Depends(get_vendor_from_code),
user: User = Depends(require_vendor_permission(VendorPermissions.PRODUCTS_VIEW.value))
):
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
# Get vendor from request state (set by middleware)
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorAccessDeniedException("No vendor context")
# Check if user has permission
if not current_user.has_vendor_permission(vendor.id, permission):
raise InsufficientVendorPermissionsException(
required_permission=permission,
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_vendor_owner(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
"""
Dependency to require vendor owner role.
Usage:
@router.delete("/team/{user_id}")
def remove_team_member(
user: User = Depends(require_vendor_owner)
):
...
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorAccessDeniedException("No vendor context")
if not current_user.is_owner_of(vendor.id):
raise VendorOwnerOnlyException(
operation="team management",
vendor_code=vendor.vendor_code,
)
return current_user
def require_any_vendor_permission(*permissions: str):
"""
Dependency factory to require ANY of the specified permissions.
Usage:
@router.get("/dashboard")
def dashboard(
user: User = Depends(require_any_vendor_permission(
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.REPORTS_VIEW.value
))
):
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorAccessDeniedException("No vendor context")
# Check if user has ANY of the required permissions
has_permission = any(
current_user.has_vendor_permission(vendor.id, perm)
for perm in permissions
)
if not has_permission:
raise InsufficientVendorPermissionsException(
required_permission=f"Any of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_all_vendor_permissions(*permissions: str):
"""
Dependency factory to require ALL of the specified permissions.
Usage:
@router.post("/products/bulk-delete")
def bulk_delete_products(
user: User = Depends(require_all_vendor_permissions(
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.PRODUCTS_DELETE.value
))
):
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorAccessDeniedException("No vendor context")
# Check if user has ALL required permissions
missing_permissions = [
perm for perm in permissions
if not current_user.has_vendor_permission(vendor.id, perm)
]
if missing_permissions:
raise InsufficientVendorPermissionsException(
required_permission=f"All of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def get_user_permissions(
request: Request,
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> list:
"""
Get all permissions for current user in current vendor.
Returns empty list if no vendor context.
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
return []
# If owner, return all permissions
if current_user.is_owner_of(vendor.id):
from app.core.permissions import VendorPermissions
return [p.value for p in VendorPermissions]
# Get permissions from vendor membership
for vm in current_user.vendor_memberships:
if vm.vendor_id == vendor.id and vm.is_active:
return vm.get_all_permissions()
return []
# ============================================================================
# OPTIONAL AUTHENTICATION (For Login Page Redirects)
# ============================================================================
def get_current_admin_optional(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
admin_token: Optional[str] = Cookie(None),
db: Session = Depends(get_db),
) -> Optional[User]:
"""
Get current admin user from admin_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. admin_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
User: Authenticated admin user if valid token exists
None: If no token, invalid token, or user is not admin
"""
token, source = _get_token_from_request(
credentials,
admin_token,
"admin_token",
str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is admin
if user.role == "admin":
return user
except Exception:
# Invalid token or other error
pass
return None
def get_current_vendor_optional(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
vendor_token: Optional[str] = Cookie(None),
db: Session = Depends(get_db),
) -> Optional[User]:
"""
Get current vendor user from vendor_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. vendor_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
vendor_token: Optional token from vendor_token cookie
db: Database session
Returns:
User: Authenticated vendor user if valid token exists
None: If no token, invalid token, or user is not vendor
"""
token, source = _get_token_from_request(
credentials,
vendor_token,
"vendor_token",
str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is vendor
if user.role == "vendor":
return user
except Exception:
# Invalid token or other error
pass
return None
def get_current_customer_optional(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
customer_token: Optional[str] = Cookie(None),
db: Session = Depends(get_db),
) -> Optional[User]:
"""
Get current customer user from customer_token cookie or Authorization header.
Returns None instead of raising exceptions if not authenticated.
Used for login pages to check if user is already authenticated.
Priority:
1. Authorization header (API calls)
2. customer_token cookie (page navigation)
Args:
request: FastAPI request
credentials: Optional Bearer token from header
customer_token: Optional token from customer_token cookie
db: Database session
Returns:
User: Authenticated customer user if valid token exists
None: If no token, invalid token, or user is not customer
"""
token, source = _get_token_from_request(
credentials,
customer_token,
"customer_token",
str(request.url.path)
)
if not token:
return None
try:
# Validate token and get user
user = _validate_user_token(token, db)
# Verify user is customer
if user.role == "customer":
return user
except Exception:
# Invalid token or other error
pass
return None