# app/api/deps.py """ Authentication dependencies for FastAPI routes. This module provides authentication dependencies for all three contexts in the multi-tenant application, implementing dual token storage with proper isolation: ADMIN ROUTES (/admin/*): - Cookie: admin_token (path=/admin) OR Authorization header - Role: admin only - Blocks: stores, customers STORE ROUTES (/store/*): - Cookie: store_token (path=/store) OR Authorization header - Role: store only - Blocks: admins, customers MERCHANT ROUTES (/merchants/*): - Cookie: merchant_token (path=/merchants) OR Authorization header - Role: store (merchant owners are store-role users who own merchants) - Validates: User owns the merchant via Merchant.owner_user_id CUSTOMER/STOREFRONT ROUTES (/storefront/account/*): - Cookie: customer_token (path=/storefront) OR Authorization header - Role: customer only - Blocks: admins, stores - Note: Public storefront pages (/storefront/products, etc.) don't require auth This dual authentication approach supports: - HTML pages: Use cookies (automatic browser behavior) - API calls: Use Authorization headers (explicit JavaScript control) The cookie path restrictions prevent cross-context cookie leakage: - admin_token is NEVER sent to /store/* or /storefront/* - store_token is NEVER sent to /admin/* or /storefront/* - customer_token is NEVER sent to /admin/* or /store/* """ import logging from datetime import UTC from fastapi import Cookie, Depends, HTTPException, Request from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy.orm import Session from app.core.database import get_db from app.modules.enums import FrontendType from app.modules.tenancy.exceptions import ( AdminRequiredException, InsufficientPermissionsException, InsufficientStorePermissionsException, InvalidTokenException, StoreNotFoundException, StoreOwnerOnlyException, UnauthorizedStoreAccessException, ) from app.modules.tenancy.models import Store from app.modules.tenancy.models import User as UserModel from app.modules.tenancy.schemas.auth import UserContext from app.modules.tenancy.services.store_service import store_service from middleware.auth import AuthManager from middleware.rate_limiter import RateLimiter # Initialize dependencies security = HTTPBearer(auto_error=False) # auto_error=False prevents automatic 403 auth_manager = AuthManager() rate_limiter = RateLimiter() logger = logging.getLogger(__name__) # ============================================================================ # HELPER FUNCTIONS # ============================================================================ async def get_resolved_store_code(request: Request) -> str: """Get store code from path parameter (path-based) or middleware (subdomain/custom domain).""" # Path parameter from double-mount prefix (/store/{store_code}/...) store_code = request.path_params.get("store_code") if store_code: return store_code # Middleware-resolved store (subdomain or custom domain) store = getattr(request.state, "store", None) if store: return store.store_code raise HTTPException(status_code=404, detail="Store not found") def _get_token_from_request( credentials: HTTPAuthorizationCredentials | None, cookie_value: str | None, cookie_name: str, request_path: str, ) -> tuple[str | None, str | None]: """ Extract token from Authorization header or cookie. Priority: 1. Authorization header (for API calls from JavaScript) 2. Cookie (for browser page navigation) Args: credentials: Optional Bearer token from Authorization header cookie_value: Optional token from cookie cookie_name: Name of the cookie (for logging) request_path: Request URL path (for logging) Returns: Tuple of (token, source) where source is "header" or "cookie" """ if credentials: logger.debug(f"Token found in Authorization header for {request_path}") return credentials.credentials, "header" if cookie_value: logger.debug(f"Token found in {cookie_name} cookie for {request_path}") return cookie_value, "cookie" return None, None def _validate_user_token(token: str, db: Session) -> UserModel: """ Validate JWT token and return user. Args: token: JWT token string db: Database session Returns: UserModel: Authenticated user object Raises: InvalidTokenException: If token is invalid """ mock_credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) return auth_manager.get_current_user(db, mock_credentials) def _get_user_model(user_context: UserContext, db: Session) -> UserModel: """ Get User database model from UserContext. Used internally by permission-checking functions that need access to User model methods like has_store_permission(). Args: user_context: UserContext schema instance db: Database session Returns: UserModel: User database model Raises: InvalidTokenException: If user not found """ user = db.query(UserModel).filter(UserModel.id == user_context.id).first() if not user: raise InvalidTokenException("User not found") # Copy token attributes from context to model for compatibility if user_context.token_store_id: user.token_store_id = user_context.token_store_id if user_context.token_store_code: user.token_store_code = user_context.token_store_code if user_context.token_store_role: user.token_store_role = user_context.token_store_role return user # ============================================================================ # PLATFORM CONTEXT # ============================================================================ def require_platform(request: Request): """Dependency that requires platform context from middleware. Raises HTTPException(400) if no platform is set on request.state. Use as a FastAPI dependency in endpoints that need platform context. """ platform = getattr(request.state, "platform", None) if not platform: raise HTTPException(status_code=400, detail="Platform context required") return platform # ============================================================================ # ADMIN AUTHENTICATION # ============================================================================ def get_current_admin_from_cookie_or_header( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), admin_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext: """ Get current admin user from admin_token cookie or Authorization header. Used for admin HTML pages (/admin/*) that need cookie-based auth. Priority: 1. Authorization header (API calls) 2. admin_token cookie (page navigation) Args: request: FastAPI request credentials: Optional Bearer token from header admin_token: Optional token from admin_token cookie db: Database session Returns: UserContext: Authenticated admin user context Raises: InvalidTokenException: If no token or invalid token AdminRequiredException: If user is not admin """ token, source = _get_token_from_request( credentials, admin_token, "admin_token", str(request.url.path) ) if not token: logger.warning(f"Admin auth failed: No token for {request.url.path}") raise InvalidTokenException("Admin authentication required") # Validate token and get user user = _validate_user_token(token, db) # Verify user is admin if not user.is_admin: logger.warning( f"Non-admin user {user.username} attempted admin route: {request.url.path}" ) raise AdminRequiredException("Admin privileges required") return UserContext.from_user(user, include_store_context=False) def get_current_admin_api( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ) -> UserContext: """ Get current admin user from Authorization header ONLY. Used for admin API endpoints that should not accept cookies. This prevents CSRF attacks on API endpoints. Args: credentials: Bearer token from Authorization header db: Database session Returns: UserContext: Authenticated admin user context Raises: InvalidTokenException: If no token or invalid token AdminRequiredException: If user is not admin """ if not credentials: raise InvalidTokenException("Authorization header required for API calls") user = _validate_user_token(credentials.credentials, db) if not user.is_admin: logger.warning(f"Non-admin user {user.username} attempted admin API") raise AdminRequiredException("Admin privileges required") return UserContext.from_user(user, include_store_context=False) # ============================================================================ # SUPER ADMIN AUTHENTICATION # ============================================================================ def get_current_super_admin( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), admin_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext: """ Require super admin role. Used for: Global settings, user management, platform creation/deletion, admin-platform assignment management. Args: request: FastAPI request credentials: Optional Bearer token from header admin_token: Optional token from admin_token cookie db: Database session Returns: UserContext: Authenticated super admin user context Raises: InvalidTokenException: If no token or invalid token AdminRequiredException: If user is not admin or not super admin """ user_context = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db) if not user_context.is_super_admin: logger.warning( f"Platform admin {user_context.username} attempted super admin route: {request.url.path}" ) raise AdminRequiredException("Super admin privileges required") return user_context def get_current_super_admin_api( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ) -> UserContext: """ Require super admin role (API header only). Used for super admin API endpoints that should not accept cookies. Args: credentials: Bearer token from Authorization header db: Database session Returns: UserContext: Authenticated super admin user context Raises: InvalidTokenException: If no token or invalid token AdminRequiredException: If user is not admin or not super admin """ user_context = get_current_admin_api(credentials, db) if not user_context.is_super_admin: logger.warning(f"Platform admin {user_context.username} attempted super admin API") raise AdminRequiredException("Super admin privileges required") return user_context def require_platform_access(platform_id: int): """ Dependency factory to require admin access to a specific platform. Super admins can access all platforms. Platform admins can only access their assigned platforms. Usage: @router.get("/platforms/{platform_id}/stores") def list_stores( platform_id: int, admin: UserContext = Depends(require_platform_access(platform_id)) ): ... """ def _check_platform_access( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), admin_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext: user_context = get_current_admin_from_cookie_or_header( request, credentials, admin_token, db ) # Super admins (accessible_platform_ids=None) can access all platforms # Platform admins can only access their assigned platforms can_access = ( user_context.accessible_platform_ids is None or platform_id in (user_context.accessible_platform_ids or []) ) if not can_access: logger.warning( f"Admin {user_context.username} denied access to platform_id={platform_id}" ) raise InsufficientPermissionsException( f"Access denied to platform {platform_id}" ) return user_context return _check_platform_access def get_admin_with_platform_context( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), admin_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext: """ Get admin user and verify platform context from token. For platform admins, extracts platform_id from JWT token and verifies access. Stores platform in request.state.admin_platform for endpoint use. Super admins bypass platform context check (they can access all platforms). Note: This function needs the raw User model for token attributes and platform access checks, so it uses _validate_user_token internally. Args: request: FastAPI request credentials: Optional Bearer token from header admin_token: Optional token from admin_token cookie db: Database session Returns: UserContext: Authenticated admin with platform context Raises: InvalidTokenException: If platform admin token missing platform info InsufficientPermissionsException: If platform access revoked """ from app.modules.tenancy.models import Platform # Get raw token for platform_id extraction token, source = _get_token_from_request( credentials, admin_token, "admin_token", str(request.url.path) ) if not token: raise InvalidTokenException("Admin authentication required") user = _validate_user_token(token, db) if not user.is_admin: raise AdminRequiredException("Admin privileges required") # Super admins bypass platform context if user.is_super_admin: return UserContext.from_user(user, include_store_context=False) # Platform admins need platform_id in token if not hasattr(user, "token_platform_id"): raise InvalidTokenException( "Token missing platform information. Please select a platform." ) platform_id = user.token_platform_id # Verify admin still has access to this platform if not user.can_access_platform(platform_id): logger.warning( f"Admin {user.username} lost access to platform_id={platform_id}" ) raise InsufficientPermissionsException( "Access to this platform has been revoked. Please login again." ) # Load platform and store in request state platform = db.query(Platform).filter(Platform.id == platform_id).first() request.state.admin_platform = platform return UserContext.from_user(user, include_store_context=False) # ============================================================================ # MODULE-BASED ACCESS CONTROL # ============================================================================ def require_module_access(module_code: str, frontend_type: FrontendType): """ Dependency factory for module-based route access control. Checks if the specified module is enabled for the current platform. Use this for routes that should be gated by module enablement but aren't tied to a specific menu item. Usage: router = APIRouter( dependencies=[Depends(require_module_access("messaging", FrontendType.ADMIN))] ) router = APIRouter( dependencies=[Depends(require_module_access("billing", FrontendType.STORE))] ) Args: module_code: Module code to check (e.g., "billing", "marketplace") frontend_type: Frontend type (ADMIN or STORE). Required to determine which authentication method to use. Returns: Dependency function that validates module access and returns User """ from app.modules.service import module_service def _check_module_access( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), admin_token: str | None = Cookie(None), store_token: str | None = Cookie(None), merchant_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext: user_context = None platform_id = None # Use explicit frontend_type to determine authentication method is_admin_request = frontend_type == FrontendType.ADMIN if is_admin_request: try: user_context = get_current_admin_from_cookie_or_header( request, credentials, admin_token, db ) # Get platform context for admin if user_context.is_super_admin: # Super admins bypass module checks return user_context platform = getattr(request.state, "admin_platform", None) if platform: platform_id = platform.id except Exception: pass # Handle store request if not user_context and frontend_type == FrontendType.STORE: try: user_context = get_current_store_from_cookie_or_header( request, credentials, store_token, db ) # Get platform from store context store = getattr(request.state, "store", None) if store and hasattr(store, "platform_id") and store.platform_id: platform_id = store.platform_id except Exception: pass # Handle merchant request if not user_context and frontend_type == FrontendType.MERCHANT: try: user_context = get_current_merchant_from_cookie_or_header( request, credentials, merchant_token, db ) # Merchant portal is platform-agnostic; module checks not enforced return user_context except Exception: pass if not user_context: raise InvalidTokenException("Authentication required") # If no platform context, allow access (module checking requires platform) if not platform_id: logger.debug(f"No platform context for module check: {module_code}") return user_context # Check if module is enabled if not module_service.is_module_enabled(db, platform_id, module_code): logger.warning( f"Module access denied: {module_code} disabled for " f"platform_id={platform_id}, user={user_context.username}" ) raise InsufficientPermissionsException( f"The '{module_code}' module is not enabled for this platform" ) return user_context return _check_module_access # ============================================================================ # MENU-BASED ACCESS CONTROL # ============================================================================ def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"): """ Dependency factory for menu-based page route access control. Checks if the specified menu item is visible/accessible for the current user: - First checks if the module providing this menu item is enabled - Then checks visibility configuration (platform or user level) Access denied reasons: - Module disabled: The feature module is not enabled for this platform - Menu hidden: The menu item is hidden by platform/user configuration Usage: @router.get("/admin/inventory") async def inventory_page( current_user: User = Depends( require_menu_access("inventory", FrontendType.ADMIN) ), ): ... Args: menu_item_id: Menu item identifier from registry frontend_type: Which frontend (ADMIN or STORE) Returns: Dependency function that validates menu access and returns User """ from app.modules.core.services.menu_service import menu_service from app.modules.enums import FrontendType as FT from app.modules.registry import get_menu_item_module from app.modules.service import module_service def _check_menu_access( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), admin_token: str | None = Cookie(None), store_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext: # Get current user based on frontend type if frontend_type == FT.ADMIN: user_context = get_current_admin_from_cookie_or_header( request, credentials, admin_token, db ) if user_context.is_super_admin: # Super admin: use platform from token if selected, else global (no filtering) platform_id = user_context.token_platform_id user_id = None else: # Platform admin: need platform context # Try to get from request state platform = getattr(request.state, "admin_platform", None) if platform: platform_id = platform.id else: # No platform context - allow access (will be restricted elsewhere) # This handles routes that don't have platform context yet return user_context user_id = None elif frontend_type == FT.STORE: user_context = get_current_store_from_cookie_or_header( request, credentials, store_token, db ) # Store: get platform from store's platform association store = getattr(request.state, "store", None) if store and hasattr(store, "platform_id") and store.platform_id: platform_id = store.platform_id else: # No platform context for store - allow access # This handles edge cases where store doesn't have platform return user_context user_id = None else: raise ValueError(f"Unsupported frontend_type: {frontend_type}") # First check: Is the module providing this menu item enabled? if platform_id: module_code = get_menu_item_module(menu_item_id, frontend_type) if module_code and not module_service.is_module_enabled(db, platform_id, module_code): logger.warning( f"Module access denied: {menu_item_id} (module={module_code}) for " f"user={user_context.username}, platform_id={platform_id}" ) raise InsufficientPermissionsException( f"The '{module_code}' module is not enabled for this platform. " f"Contact your administrator to enable this feature." ) # Second check: Is the menu item visible in configuration? can_access = menu_service.can_access_menu_item( db, frontend_type, menu_item_id, platform_id, user_id ) if not can_access: logger.warning( f"Menu visibility denied: {menu_item_id} for " f"user={user_context.username}, frontend={frontend_type.value}, " f"platform_id={platform_id}, user_id={user_id}" ) raise InsufficientPermissionsException( f"Access to '{menu_item_id}' has been restricted. " f"Contact your administrator if you need access." ) return user_context return _check_menu_access # ============================================================================ # STORE AUTHENTICATION # ============================================================================ def get_current_store_from_cookie_or_header( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), store_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext: """ Get current store user from store_token cookie or Authorization header. Used for store HTML pages (/store/*) that need cookie-based auth. Priority: 1. Authorization header (API calls) 2. store_token cookie (page navigation) Args: request: FastAPI request credentials: Optional Bearer token from header store_token: Optional token from store_token cookie db: Database session Returns: UserContext: Authenticated store user context Raises: InvalidTokenException: If no token or invalid token InsufficientPermissionsException: If user is not store or is admin """ token, source = _get_token_from_request( credentials, store_token, "store_token", str(request.url.path) ) if not token: logger.warning(f"Store auth failed: No token for {request.url.path}") raise InvalidTokenException("Store authentication required") # Validate token and get user user = _validate_user_token(token, db) # CRITICAL: Block admins from store routes if user.is_admin: logger.warning( f"Admin user {user.username} attempted store route: {request.url.path}" ) raise InsufficientPermissionsException( "Store access only - admins cannot use store portal" ) # Verify user is store user (merchant_owner or store_member) if not user.is_store_user: logger.warning( f"Non-store user {user.username} attempted store route: {request.url.path}" ) raise InsufficientPermissionsException("Store privileges required") return UserContext.from_user(user) def get_current_store_api( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ) -> UserContext: """ Get current store user from Authorization header ONLY. Used for store API endpoints that should not accept cookies. Validates that: 1. Token contains store context (token_store_id) 2. User still has access to the store specified in the token Args: credentials: Bearer token from Authorization header db: Database session Returns: UserContext: Authenticated store user context (with token_store_id, token_store_code, token_store_role) Raises: InvalidTokenException: If no token, invalid token, or missing store context InsufficientPermissionsException: If user is not store or lost access to store """ if not credentials: raise InvalidTokenException("Authorization header required for API calls") user = _validate_user_token(credentials.credentials, db) # Block admins from store API if user.is_admin: logger.warning(f"Admin user {user.username} attempted store API") raise InsufficientPermissionsException("Store access only") if not user.is_store_user: logger.warning(f"Non-store user {user.username} attempted store API") raise InsufficientPermissionsException("Store privileges required") # Require store context in token if not hasattr(user, "token_store_id"): raise InvalidTokenException( "Token missing store information. Please login again." ) store_id = user.token_store_id # Verify user still has access to this store if not user.is_member_of(store_id): logger.warning(f"User {user.username} lost access to store_id={store_id}") raise InsufficientPermissionsException( "Access to store has been revoked. Please login again." ) logger.debug( f"Store API access: user={user.username}, store_id={store_id}, " f"store_code={getattr(user, 'token_store_code', 'N/A')}" ) return UserContext.from_user(user) # ============================================================================ # MERCHANT AUTHENTICATION (Billing Portal) # ============================================================================ def get_current_merchant_from_cookie_or_header( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), merchant_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext: """ Get current merchant user from merchant_token cookie or Authorization header. Used for merchant portal HTML pages (/merchants/*) that need cookie-based auth. Validates that the user owns at least one merchant. Priority: 1. Authorization header (API calls) 2. merchant_token cookie (page navigation) Args: request: FastAPI request credentials: Optional Bearer token from header merchant_token: Optional token from merchant_token cookie db: Database session Returns: UserContext: Authenticated merchant owner user context Raises: InvalidTokenException: If no token or invalid token InsufficientPermissionsException: If user doesn't own any merchants """ token, source = _get_token_from_request( credentials, merchant_token, "merchant_token", str(request.url.path) ) if not token: logger.warning(f"Merchant auth failed: No token for {request.url.path}") raise InvalidTokenException("Merchant authentication required") # Validate token and get user user = _validate_user_token(token, db) # Verify user owns at least one merchant from app.modules.tenancy.models import Merchant merchant_count = ( db.query(Merchant) .filter( Merchant.owner_user_id == user.id, Merchant.is_active == True, # noqa: E712 ) .count() ) if merchant_count == 0: logger.warning( f"User {user.username} attempted merchant route without owning any merchants: " f"{request.url.path}" ) raise InsufficientPermissionsException( "Merchant owner privileges required" ) return UserContext.from_user(user) def get_current_merchant_api( request: Request, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ) -> UserContext: """ Get current merchant user from Authorization header ONLY. Used for merchant API endpoints that should not accept cookies. This prevents CSRF attacks on API endpoints. Args: request: FastAPI request credentials: Bearer token from Authorization header db: Database session Returns: UserContext: Authenticated merchant owner user context Raises: InvalidTokenException: If no token or invalid token InsufficientPermissionsException: If user doesn't own any merchants """ if not credentials: raise InvalidTokenException("Authorization header required for API calls") user = _validate_user_token(credentials.credentials, db) # Verify user owns at least one merchant from app.modules.tenancy.models import Merchant merchant_count = ( db.query(Merchant) .filter( Merchant.owner_user_id == user.id, Merchant.is_active == True, # noqa: E712 ) .count() ) if merchant_count == 0: logger.warning(f"User {user.username} attempted merchant API without owning any merchants") raise InsufficientPermissionsException( "Merchant owner privileges required" ) return UserContext.from_user(user) def get_current_merchant_optional( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), merchant_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext | None: """ Get current merchant user, returning None if not authenticated. Used for login pages to check if user is already authenticated. Returns: UserContext: Authenticated merchant owner if valid token exists None: If no token, invalid token, or user doesn't own merchants """ token, source = _get_token_from_request( credentials, merchant_token, "merchant_token", str(request.url.path) ) if not token: return None try: user = _validate_user_token(token, db) # Verify user owns at least one merchant from app.modules.tenancy.models import Merchant merchant_count = ( db.query(Merchant) .filter( Merchant.owner_user_id == user.id, Merchant.is_active == True, # noqa: E712 ) .count() ) if merchant_count > 0: return UserContext.from_user(user) except Exception: pass return None def get_merchant_for_current_user( request: Request, current_user: UserContext = Depends(get_current_merchant_api), db: Session = Depends(get_db), ): """ Get the active merchant owned by the current API user. Used by merchant API endpoints (header-only auth) that need the Merchant object. Stores the merchant on request.state.merchant for endpoint use. Returns: Merchant ORM object Raises: MerchantNotFoundException: If user owns no active merchants """ from app.modules.tenancy.exceptions import MerchantNotFoundException from app.modules.tenancy.models import Merchant merchant = ( db.query(Merchant) .filter( Merchant.owner_user_id == current_user.id, Merchant.is_active == True, # noqa: E712 ) .order_by(Merchant.id) .first() ) if not merchant: raise MerchantNotFoundException( str(current_user.id), identifier_type="owner_user_id" ) request.state.merchant = merchant return merchant def get_merchant_for_current_user_page( request: Request, current_user: UserContext = Depends(get_current_merchant_from_cookie_or_header), db: Session = Depends(get_db), ): """ Get the active merchant owned by the current page user. Used by merchant page routes (cookie+header auth) that need the Merchant object. Stores the merchant on request.state.merchant for endpoint use. Returns: Merchant ORM object Raises: MerchantNotFoundException: If user owns no active merchants """ from app.modules.tenancy.exceptions import MerchantNotFoundException from app.modules.tenancy.models import Merchant merchant = ( db.query(Merchant) .filter( Merchant.owner_user_id == current_user.id, Merchant.is_active == True, # noqa: E712 ) .order_by(Merchant.id) .first() ) if not merchant: raise MerchantNotFoundException( str(current_user.id), identifier_type="owner_user_id" ) request.state.merchant = merchant return merchant # ============================================================================ # CUSTOMER AUTHENTICATION (STOREFRONT) # ============================================================================ def _validate_customer_token(token: str, request: Request, db: Session): """ Validate customer JWT token and return CustomerContext schema. Validates: 1. Token signature and expiration 2. Token type is "customer" 3. Customer exists and is active 4. Token store_id matches request store (URL-based) Args: token: JWT token string request: FastAPI request (for store context) db: Database session Returns: CustomerContext: Authenticated customer context schema Raises: InvalidTokenException: If token is invalid or expired UnauthorizedStoreAccessException: If store mismatch """ from datetime import datetime from jose import JWTError, jwt from app.modules.customers.models.customer import Customer from app.modules.customers.schemas import CustomerContext # Decode and validate customer JWT token try: payload = jwt.decode( token, auth_manager.secret_key, algorithms=[auth_manager.algorithm] ) # Verify this is a customer token token_type = payload.get("type") if token_type != "customer": logger.warning(f"Invalid token type for customer route: {token_type}") raise InvalidTokenException("Customer authentication required") # Get customer ID from token customer_id: str = payload.get("sub") if customer_id is None: logger.warning("Token missing 'sub' (customer_id)") raise InvalidTokenException("Invalid token") # Verify token hasn't expired exp = payload.get("exp") if exp and datetime.fromtimestamp(exp, tz=UTC) < datetime.now(UTC): logger.warning(f"Expired customer token for customer_id={customer_id}") raise InvalidTokenException("Token has expired") # Get store_id from token for validation token_store_id = payload.get("store_id") except JWTError as e: logger.warning(f"JWT decode error: {str(e)}") raise InvalidTokenException("Could not validate credentials") # Load customer from database customer = db.query(Customer).filter(Customer.id == int(customer_id)).first() if not customer: logger.warning(f"Customer not found: {customer_id}") raise InvalidTokenException("Customer not found") if not customer.is_active: logger.warning(f"Inactive customer attempted access: {customer.email}") raise InvalidTokenException("Customer account is inactive") # Validate store context matches token # This prevents using a customer token from store A on store B's storefront request_store = getattr(request.state, "store", None) if request_store and token_store_id: if request_store.id != token_store_id: logger.warning( f"Customer {customer.email} token store mismatch: " f"token={token_store_id}, request={request_store.id}" ) raise UnauthorizedStoreAccessException( store_code=request_store.store_code, user_id=customer.id, ) logger.debug(f"Customer authenticated: {customer.email} (ID: {customer.id})") # Return CustomerContext schema instead of database model return CustomerContext.from_db_model(customer) def get_current_customer_from_cookie_or_header( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), customer_token: str | None = Cookie(None), db: Session = Depends(get_db), ): """ Get current customer from customer_token cookie or Authorization header. Used for storefront account HTML pages (/storefront/account/*) that need cookie-based auth. Note: Public storefront pages (/storefront/products, etc.) don't use this dependency. Validates that token store_id matches request store (URL-based detection). Priority: 1. Authorization header (API calls) 2. customer_token cookie (page navigation) Args: request: FastAPI request credentials: Optional Bearer token from header customer_token: Optional token from customer_token cookie db: Database session Returns: CustomerContext: Authenticated customer context schema Raises: InvalidTokenException: If no token or invalid token UnauthorizedStoreAccessException: If store mismatch """ token, source = _get_token_from_request( credentials, customer_token, "customer_token", str(request.url.path) ) if not token: logger.warning(f"Customer auth failed: No token for {request.url.path}") raise InvalidTokenException("Customer authentication required") return _validate_customer_token(token, request, db) def get_current_customer_api( request: Request, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ): """ Get current customer from Authorization header ONLY. Used for storefront API endpoints that should not accept cookies. Validates that token store_id matches request store (URL-based detection). Args: request: FastAPI request (for store context) credentials: Bearer token from Authorization header db: Database session Returns: CustomerContext: Authenticated customer context schema Raises: InvalidTokenException: If no token or invalid token UnauthorizedStoreAccessException: If store mismatch """ if not credentials: raise InvalidTokenException("Authorization header required for API calls") return _validate_customer_token(credentials.credentials, request, db) # ============================================================================ # GENERIC AUTHENTICATION (for mixed-use endpoints) # ============================================================================ def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ) -> UserContext: """ Get current authenticated user from Authorization header only. Generic authentication without role checking. Used for endpoints accessible to any authenticated user. Args: credentials: Bearer token from Authorization header db: Database session Returns: UserContext: Authenticated user context (any role) Raises: InvalidTokenException: If no token or invalid token """ if not credentials: raise InvalidTokenException("Authorization header required") user = _validate_user_token(credentials.credentials, db) return UserContext.from_user(user) # ============================================================================ # STORE OWNERSHIP VERIFICATION # ============================================================================ def get_user_store( store_code: str, current_user: UserContext = Depends(get_current_store_from_cookie_or_header), db: Session = Depends(get_db), ) -> Store: """ Get store and verify user ownership/membership. Ensures the current user has access to the specified store. - Store owners can access their own store - Team members can access their store - Admins are BLOCKED (use admin routes instead) Args: store_code: Store code to look up current_user: Current authenticated store user db: Database session Returns: Store: Store object if user has access Raises: StoreNotFoundException: If store doesn't exist UnauthorizedStoreAccessException: If user doesn't have access """ from sqlalchemy.orm import joinedload store = ( db.query(Store) .options(joinedload(Store.merchant)) .filter(Store.store_code == store_code.upper()) .first() ) if not store: raise StoreNotFoundException(store_code) # Check if user owns this store (via merchant ownership) if store.merchant and store.merchant.owner_user_id == current_user.id: return store # Check if user is team member # TODO: Add team member check when StoreUser relationship is set up # User doesn't have access to this store raise UnauthorizedStoreAccessException(store_code, current_user.id) # ============================================================================ # PERMISSIONS CHECKING # ============================================================================ def require_store_permission(permission: str): """ Dependency factory to require a specific store permission. Uses token_store_id from JWT token (authenticated store API pattern). The store object is loaded and stored in request.state.store for endpoint use. Usage: @router.get("/products") def list_products( request: Request, user: UserContext = Depends(require_store_permission("products.view")) ): store = request.state.store # Store is set by this dependency ... """ def permission_checker( request: Request, db: Session = Depends(get_db), current_user: UserContext = Depends(get_current_store_from_cookie_or_header), ) -> UserContext: # Get store ID from JWT token if not current_user.token_store_id: raise InvalidTokenException( "Token missing store information. Please login again." ) store_id = current_user.token_store_id # Load store from database (raises StoreNotFoundException if not found) store = store_service.get_store_by_id(db, store_id) # Store store in request state for endpoint use request.state.store = store # Check if user has permission (need User model for this) user_model = _get_user_model(current_user, db) if not user_model.has_store_permission(store.id, permission): raise InsufficientStorePermissionsException( required_permission=permission, store_code=store.store_code, ) return current_user return permission_checker def require_store_owner( request: Request, db: Session = Depends(get_db), current_user: UserContext = Depends(get_current_store_from_cookie_or_header), ) -> UserContext: """ Dependency to require store owner role. Uses token_store_id from JWT token (authenticated store API pattern). The store object is loaded and stored in request.state.store for endpoint use. Usage: @router.delete("/team/{user_id}") def remove_team_member( request: Request, user: UserContext = Depends(require_store_owner) ): store = request.state.store # Store is set by this dependency ... """ # Get store ID from JWT token if not current_user.token_store_id: raise InvalidTokenException( "Token missing store information. Please login again." ) store_id = current_user.token_store_id # Load store from database (raises StoreNotFoundException if not found) store = store_service.get_store_by_id(db, store_id) # Store store in request state for endpoint use request.state.store = store # Need User model for is_owner_of check user_model = _get_user_model(current_user, db) if not user_model.is_owner_of(store.id): raise StoreOwnerOnlyException( operation="team management", store_code=store.store_code, ) return current_user def require_any_store_permission(*permissions: str): """ Dependency factory to require ANY of the specified permissions. Uses token_store_id from JWT token (authenticated store API pattern). The store object is loaded and stored in request.state.store for endpoint use. Usage: @router.get("/dashboard") def dashboard( request: Request, user: UserContext = Depends(require_any_store_permission( "dashboard.view", "reports.view" )) ): store = request.state.store # Store is set by this dependency ... """ def permission_checker( request: Request, db: Session = Depends(get_db), current_user: UserContext = Depends(get_current_store_from_cookie_or_header), ) -> UserContext: # Get store ID from JWT token if not current_user.token_store_id: raise InvalidTokenException( "Token missing store information. Please login again." ) store_id = current_user.token_store_id # Load store from database (raises StoreNotFoundException if not found) store = store_service.get_store_by_id(db, store_id) # Store store in request state for endpoint use request.state.store = store # Check if user has ANY of the required permissions (need User model) user_model = _get_user_model(current_user, db) has_permission = any( user_model.has_store_permission(store.id, perm) for perm in permissions ) if not has_permission: raise InsufficientStorePermissionsException( required_permission=f"Any of: {', '.join(permissions)}", store_code=store.store_code, ) return current_user return permission_checker def require_all_store_permissions(*permissions: str): """ Dependency factory to require ALL of the specified permissions. Uses token_store_id from JWT token (authenticated store API pattern). The store object is loaded and stored in request.state.store for endpoint use. Usage: @router.post("/products/bulk-delete") def bulk_delete_products( request: Request, user: UserContext = Depends(require_all_store_permissions( "products.view", "products.delete" )) ): store = request.state.store # Store is set by this dependency ... """ def permission_checker( request: Request, db: Session = Depends(get_db), current_user: UserContext = Depends(get_current_store_from_cookie_or_header), ) -> UserContext: # Get store ID from JWT token if not current_user.token_store_id: raise InvalidTokenException( "Token missing store information. Please login again." ) store_id = current_user.token_store_id # Load store from database (raises StoreNotFoundException if not found) store = store_service.get_store_by_id(db, store_id) # Store store in request state for endpoint use request.state.store = store # Check if user has ALL required permissions (need User model) user_model = _get_user_model(current_user, db) missing_permissions = [ perm for perm in permissions if not user_model.has_store_permission(store.id, perm) ] if missing_permissions: raise InsufficientStorePermissionsException( required_permission=f"All of: {', '.join(permissions)}", store_code=store.store_code, ) return current_user return permission_checker def get_user_permissions( request: Request, db: Session = Depends(get_db), current_user: UserContext = Depends(get_current_store_from_cookie_or_header), ) -> list: """ Get all permissions for current user in current store. Uses token_store_id from JWT token (authenticated store API pattern). Also sets request.state.store for endpoint use. Returns empty list if no store context in token. """ # Get store ID from JWT token if not current_user.token_store_id: return [] store_id = current_user.token_store_id # Load store from database store = store_service.get_store_by_id(db, store_id) # Store store in request state for endpoint use request.state.store = store # Need User model for ownership and membership checks user_model = _get_user_model(current_user, db) # If owner, return all permissions if user_model.is_owner_of(store.id): from app.modules.tenancy.services.permission_discovery_service import ( permission_discovery_service, ) return list(permission_discovery_service.get_all_permission_ids()) # Get permissions from store membership for vm in user_model.store_memberships: if vm.store_id == store.id and vm.is_active: return vm.get_all_permissions() return [] # ============================================================================ # PAGE-LEVEL PERMISSION GUARDS (For Store Page Routes) # ============================================================================ def require_store_page_permission(permission: str): """ Dependency factory to require a specific store permission for page routes. Same as require_store_permission but raises InsufficientStorePermissionsException which the exception handler intercepts for HTML requests (redirecting to login). Usage: @router.get("/products", response_class=HTMLResponse) def store_products_page( request: Request, store_code: str = Depends(get_resolved_store_code), current_user: User = Depends(require_store_page_permission("products.view")), db: Session = Depends(get_db), ): ... """ def permission_checker( request: Request, db: Session = Depends(get_db), current_user: UserContext = Depends(get_current_store_from_cookie_or_header), ) -> UserContext: if not current_user.token_store_id: raise InvalidTokenException( "Token missing store information. Please login again." ) store_id = current_user.token_store_id store = store_service.get_store_by_id(db, store_id) request.state.store = store user_model = _get_user_model(current_user, db) if not user_model.has_store_permission(store.id, permission): raise InsufficientStorePermissionsException( required_permission=permission, store_code=store.store_code, ) return current_user return permission_checker # ============================================================================ # OPTIONAL AUTHENTICATION (For Login Page Redirects) # ============================================================================ def get_current_admin_optional( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), admin_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext | None: """ Get current admin user from admin_token cookie or Authorization header. Returns None instead of raising exceptions if not authenticated. Used for login pages to check if user is already authenticated. Priority: 1. Authorization header (API calls) 2. admin_token cookie (page navigation) Args: request: FastAPI request credentials: Optional Bearer token from header admin_token: Optional token from admin_token cookie db: Database session Returns: UserContext: Authenticated admin user context if valid token exists None: If no token, invalid token, or user is not admin """ token, source = _get_token_from_request( credentials, admin_token, "admin_token", str(request.url.path) ) if not token: return None try: # Validate token and get user user = _validate_user_token(token, db) # Verify user is admin if user.is_admin: return UserContext.from_user(user, include_store_context=False) except Exception: # Invalid token or other error pass return None def get_current_store_optional( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), store_token: str | None = Cookie(None), db: Session = Depends(get_db), ) -> UserContext | None: """ Get current store user from store_token cookie or Authorization header. Returns None instead of raising exceptions if not authenticated. Used for login pages to check if user is already authenticated. Priority: 1. Authorization header (API calls) 2. store_token cookie (page navigation) Args: request: FastAPI request credentials: Optional Bearer token from header store_token: Optional token from store_token cookie db: Database session Returns: UserContext: Authenticated store user context if valid token exists None: If no token, invalid token, or user is not store """ token, source = _get_token_from_request( credentials, store_token, "store_token", str(request.url.path) ) if not token: return None try: # Validate token and get user user = _validate_user_token(token, db) # Verify user is a store user if user.is_store_user: return UserContext.from_user(user) except Exception: # Invalid token or other error pass return None def get_current_customer_optional( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), customer_token: str | None = Cookie(None), db: Session = Depends(get_db), ): """ Get current customer from customer_token cookie or Authorization header. Returns None instead of raising exceptions if not authenticated. Used for login pages to check if user is already authenticated. Priority: 1. Authorization header (API calls) 2. customer_token cookie (page navigation) Args: request: FastAPI request credentials: Optional Bearer token from header customer_token: Optional token from customer_token cookie db: Database session Returns: CustomerContext: Authenticated customer context if valid token exists None: If no token, invalid token, or store mismatch """ token, source = _get_token_from_request( credentials, customer_token, "customer_token", str(request.url.path) ) if not token: return None try: # Validate customer token (includes store validation) return _validate_customer_token(token, request, db) except Exception: # Invalid token, store mismatch, or other error return None # ============================================================================= # STOREFRONT MODULE GATING # ============================================================================= def make_storefront_module_gate(module_code: str): """ Create a FastAPI dependency that gates storefront routes by module enablement. Used by main.py at route registration time: each non-core module's storefront router gets this dependency injected automatically. The framework already knows which module owns each route via RouteInfo.module_code — no hardcoded path map. Args: module_code: The module code to check (e.g. "catalog", "orders", "loyalty") Returns: A FastAPI dependency function """ async def _check_module_enabled( request: Request, db: Session = Depends(get_db), ) -> None: from app.modules.service import module_service platform = getattr(request.state, "platform", None) if not platform: return # No platform context — let other middleware handle it if not module_service.is_module_enabled(db, platform.id, module_code): raise HTTPException(status_code=404, detail="Page not found") return _check_module_enabled