# app/api/deps.py """ Authentication dependencies for FastAPI routes. Implements dual token storage pattern: - Checks Authorization header first (for API calls from JavaScript) - Falls back to cookie (for browser page navigation) This allows: - JavaScript API calls: Use localStorage + Authorization header - Browser page loads: Use HTTP-only cookies """ from typing import Optional from fastapi import Depends, Request, Cookie from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy.orm import Session from app.core.database import get_db from middleware.auth import AuthManager from middleware.rate_limiter import RateLimiter from models.database.vendor import Vendor from models.database.user import User from app.exceptions import ( AdminRequiredException, VendorNotFoundException, UnauthorizedVendorAccessException, InvalidTokenException ) # Set auto_error=False to prevent automatic 403 responses security = HTTPBearer(auto_error=False) auth_manager = AuthManager() rate_limiter = RateLimiter() def get_current_user( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), admin_token: Optional[str] = Cookie(None), # Check admin_token cookie db: Session = Depends(get_db), ): """ Get current authenticated user. Checks for token in this priority order: 1. Authorization header (for API calls from JavaScript) 2. admin_token cookie (for browser page navigation) This dual approach supports: - API calls: JavaScript adds token from localStorage to Authorization header - Page navigation: Browser automatically sends cookie Args: request: FastAPI request object credentials: Optional Bearer token from Authorization header admin_token: Optional token from cookie db: Database session Returns: User: Authenticated user object Raises: InvalidTokenException: If no token found or token invalid """ token = None token_source = None # Priority 1: Authorization header (API calls from JavaScript) if credentials: token = credentials.credentials token_source = "header" # Priority 2: Cookie (browser page navigation) elif admin_token: token = admin_token token_source = "cookie" # No token found in either location if not token: raise InvalidTokenException("Authorization header or cookie required") # Log token source for debugging import logging logger = logging.getLogger(__name__) logger.debug(f"Token found in {token_source} for {request.url.path}") # Create a mock credentials object for auth_manager mock_credentials = HTTPAuthorizationCredentials( scheme="Bearer", credentials=token ) return auth_manager.get_current_user(db, mock_credentials) def get_current_admin_user(current_user: User = Depends(get_current_user)): """ Require admin user. This dependency ensures the current user has admin role. Used for protecting admin-only routes. Args: current_user: User object from get_current_user dependency Returns: User: Admin user object Raises: AdminRequiredException: If user is not an admin """ return auth_manager.require_admin(current_user) def get_current_vendor_user(current_user: User = Depends(get_current_user)): """ Require vendor user (vendor owner or vendor staff). This dependency ensures the current user has vendor role. Used for protecting vendor-only routes. Args: current_user: User object from get_current_user dependency Returns: User: Vendor user object Raises: InsufficientPermissionsException: If user is not a vendor user """ return auth_manager.require_vendor(current_user) def get_current_customer_user(current_user: User = Depends(get_current_user)): """ Require customer user. This dependency ensures the current user has customer role. Used for protecting customer account routes. Args: current_user: User object from get_current_user dependency Returns: User: Customer user object Raises: InsufficientPermissionsException: If user is not a customer """ return auth_manager.require_customer(current_user) def get_user_vendor( vendor_code: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ Get vendor and verify user ownership. Ensures the current user has access to the specified vendor. Admin users can access any vendor, regular users only their own. Args: vendor_code: Vendor code to look up current_user: Current authenticated user db: Database session Returns: Vendor: Vendor object if user has access Raises: VendorNotFoundException: If vendor doesn't exist UnauthorizedVendorAccessException: If user doesn't have access """ vendor = db.query(Vendor).filter(Vendor.vendor_code == vendor_code.upper()).first() if not vendor: raise VendorNotFoundException(vendor_code) if current_user.role != "admin" and vendor.owner_user_id != current_user.id: raise UnauthorizedVendorAccessException(vendor_code, current_user.id) return vendor