Files
orion/app/api/deps.py

147 lines
4.3 KiB
Python

# app/api/deps.py
"""
Authentication dependencies for FastAPI routes.
Implements dual token storage pattern:
- Checks Authorization header first (for API calls from JavaScript)
- Falls back to cookie (for browser page navigation)
This allows:
- JavaScript API calls: Use localStorage + Authorization header
- Browser page loads: Use HTTP-only cookies
"""
from typing import Optional
from fastapi import Depends, Request, Cookie
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from app.core.database import get_db
from middleware.auth import AuthManager
from middleware.rate_limiter import RateLimiter
from models.database.vendor import Vendor
from models.database.user import User
from app.exceptions import (
AdminRequiredException,
VendorNotFoundException,
UnauthorizedVendorAccessException,
InvalidTokenException
)
# Set auto_error=False to prevent automatic 403 responses
security = HTTPBearer(auto_error=False)
auth_manager = AuthManager()
rate_limiter = RateLimiter()
def get_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
admin_token: Optional[str] = Cookie(None), # Check admin_token cookie
db: Session = Depends(get_db),
):
"""
Get current authenticated user.
Checks for token in this priority order:
1. Authorization header (for API calls from JavaScript)
2. admin_token cookie (for browser page navigation)
This dual approach supports:
- API calls: JavaScript adds token from localStorage to Authorization header
- Page navigation: Browser automatically sends cookie
Args:
request: FastAPI request object
credentials: Optional Bearer token from Authorization header
admin_token: Optional token from cookie
db: Database session
Returns:
User: Authenticated user object
Raises:
InvalidTokenException: If no token found or token invalid
"""
token = None
token_source = None
# Priority 1: Authorization header (API calls from JavaScript)
if credentials:
token = credentials.credentials
token_source = "header"
# Priority 2: Cookie (browser page navigation)
elif admin_token:
token = admin_token
token_source = "cookie"
# No token found in either location
if not token:
raise InvalidTokenException("Authorization header or cookie required")
# Log token source for debugging
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Token found in {token_source} for {request.url.path}")
# Create a mock credentials object for auth_manager
mock_credentials = HTTPAuthorizationCredentials(
scheme="Bearer",
credentials=token
)
return auth_manager.get_current_user(db, mock_credentials)
def get_current_admin_user(current_user: User = Depends(get_current_user)):
"""
Require admin user.
This dependency ensures the current user has admin role.
Used for protecting admin-only routes.
Args:
current_user: User object from get_current_user dependency
Returns:
User: Admin user object
Raises:
AdminRequiredException: If user is not an admin
"""
return auth_manager.require_admin(current_user)
def get_user_vendor(
vendor_code: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Get vendor and verify user ownership.
Ensures the current user has access to the specified vendor.
Admin users can access any vendor, regular users only their own.
Args:
vendor_code: Vendor code to look up
current_user: Current authenticated user
db: Database session
Returns:
Vendor: Vendor object if user has access
Raises:
VendorNotFoundException: If vendor doesn't exist
UnauthorizedVendorAccessException: If user doesn't have access
"""
vendor = db.query(Vendor).filter(Vendor.vendor_code == vendor_code.upper()).first()
if not vendor:
raise VendorNotFoundException(vendor_code)
if current_user.role != "admin" and vendor.owner_user_id != current_user.id:
raise UnauthorizedVendorAccessException(vendor_code, current_user.id)
return vendor