revamped authentication system
This commit is contained in:
484
app/api/deps.py
484
app/api/deps.py
@@ -2,16 +2,38 @@
|
||||
"""
|
||||
Authentication dependencies for FastAPI routes.
|
||||
|
||||
Implements dual token storage pattern:
|
||||
- Checks Authorization header first (for API calls from JavaScript)
|
||||
- Falls back to cookie (for browser page navigation)
|
||||
This module provides authentication dependencies for all three contexts in the
|
||||
multi-tenant application, implementing dual token storage with proper isolation:
|
||||
|
||||
This allows:
|
||||
- JavaScript API calls: Use localStorage + Authorization header
|
||||
- Browser page loads: Use HTTP-only cookies
|
||||
ADMIN ROUTES (/admin/*):
|
||||
- Cookie: admin_token (path=/admin) OR Authorization header
|
||||
- Role: admin only
|
||||
- Blocks: vendors, customers
|
||||
|
||||
VENDOR ROUTES (/vendor/*):
|
||||
- Cookie: vendor_token (path=/vendor) OR Authorization header
|
||||
- Role: vendor only
|
||||
- Blocks: admins, customers
|
||||
|
||||
CUSTOMER/SHOP ROUTES (/shop/account/*):
|
||||
- Cookie: customer_token (path=/shop) OR Authorization header
|
||||
- Role: customer only
|
||||
- Blocks: admins, vendors
|
||||
- Note: Public shop pages (/shop/products, etc.) don't require auth
|
||||
|
||||
This dual authentication approach supports:
|
||||
- HTML pages: Use cookies (automatic browser behavior)
|
||||
- API calls: Use Authorization headers (explicit JavaScript control)
|
||||
|
||||
The cookie path restrictions prevent cross-context cookie leakage:
|
||||
- admin_token is NEVER sent to /vendor/* or /shop/*
|
||||
- vendor_token is NEVER sent to /admin/* or /shop/*
|
||||
- customer_token is NEVER sent to /admin/* or /vendor/*
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import Depends, Request, Cookie
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
from sqlalchemy.orm import Session
|
||||
@@ -23,148 +45,434 @@ from models.database.vendor import Vendor
|
||||
from models.database.user import User
|
||||
from app.exceptions import (
|
||||
AdminRequiredException,
|
||||
InvalidTokenException,
|
||||
InsufficientPermissionsException,
|
||||
VendorNotFoundException,
|
||||
UnauthorizedVendorAccessException,
|
||||
InvalidTokenException
|
||||
UnauthorizedVendorAccessException
|
||||
)
|
||||
|
||||
# Set auto_error=False to prevent automatic 403 responses
|
||||
security = HTTPBearer(auto_error=False)
|
||||
# Initialize dependencies
|
||||
security = HTTPBearer(auto_error=False) # auto_error=False prevents automatic 403
|
||||
auth_manager = AuthManager()
|
||||
rate_limiter = RateLimiter()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_current_user(
|
||||
request: Request,
|
||||
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
||||
admin_token: Optional[str] = Cookie(None), # Check admin_token cookie
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
# ============================================================================
|
||||
# HELPER FUNCTIONS
|
||||
# ============================================================================
|
||||
|
||||
def _get_token_from_request(
|
||||
credentials: Optional[HTTPAuthorizationCredentials],
|
||||
cookie_value: Optional[str],
|
||||
cookie_name: str,
|
||||
request_path: str
|
||||
) -> tuple[Optional[str], Optional[str]]:
|
||||
"""
|
||||
Get current authenticated user.
|
||||
Extract token from Authorization header or cookie.
|
||||
|
||||
Checks for token in this priority order:
|
||||
Priority:
|
||||
1. Authorization header (for API calls from JavaScript)
|
||||
2. admin_token cookie (for browser page navigation)
|
||||
|
||||
This dual approach supports:
|
||||
- API calls: JavaScript adds token from localStorage to Authorization header
|
||||
- Page navigation: Browser automatically sends cookie
|
||||
2. Cookie (for browser page navigation)
|
||||
|
||||
Args:
|
||||
request: FastAPI request object
|
||||
credentials: Optional Bearer token from Authorization header
|
||||
admin_token: Optional token from cookie
|
||||
cookie_value: Optional token from cookie
|
||||
cookie_name: Name of the cookie (for logging)
|
||||
request_path: Request URL path (for logging)
|
||||
|
||||
Returns:
|
||||
Tuple of (token, source) where source is "header" or "cookie"
|
||||
"""
|
||||
if credentials:
|
||||
logger.debug(f"Token found in Authorization header for {request_path}")
|
||||
return credentials.credentials, "header"
|
||||
elif cookie_value:
|
||||
logger.debug(f"Token found in {cookie_name} cookie for {request_path}")
|
||||
return cookie_value, "cookie"
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
def _validate_user_token(token: str, db: Session) -> User:
|
||||
"""
|
||||
Validate JWT token and return user.
|
||||
|
||||
Args:
|
||||
token: JWT token string
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
User: Authenticated user object
|
||||
|
||||
Raises:
|
||||
InvalidTokenException: If no token found or token invalid
|
||||
InvalidTokenException: If token is invalid
|
||||
"""
|
||||
token = None
|
||||
token_source = None
|
||||
|
||||
# Priority 1: Authorization header (API calls from JavaScript)
|
||||
if credentials:
|
||||
token = credentials.credentials
|
||||
token_source = "header"
|
||||
|
||||
# Priority 2: Cookie (browser page navigation)
|
||||
elif admin_token:
|
||||
token = admin_token
|
||||
token_source = "cookie"
|
||||
|
||||
# No token found in either location
|
||||
if not token:
|
||||
raise InvalidTokenException("Authorization header or cookie required")
|
||||
|
||||
# Log token source for debugging
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.debug(f"Token found in {token_source} for {request.url.path}")
|
||||
|
||||
# Create a mock credentials object for auth_manager
|
||||
mock_credentials = HTTPAuthorizationCredentials(
|
||||
scheme="Bearer",
|
||||
credentials=token
|
||||
)
|
||||
|
||||
return auth_manager.get_current_user(db, mock_credentials)
|
||||
|
||||
|
||||
def get_current_admin_user(current_user: User = Depends(get_current_user)):
|
||||
"""
|
||||
Require admin user.
|
||||
# ============================================================================
|
||||
# ADMIN AUTHENTICATION
|
||||
# ============================================================================
|
||||
|
||||
This dependency ensures the current user has admin role.
|
||||
Used for protecting admin-only routes.
|
||||
def get_current_admin_from_cookie_or_header(
|
||||
request: Request,
|
||||
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
||||
admin_token: Optional[str] = Cookie(None),
|
||||
db: Session = Depends(get_db),
|
||||
) -> User:
|
||||
"""
|
||||
Get current admin user from admin_token cookie or Authorization header.
|
||||
|
||||
Used for admin HTML pages (/admin/*) that need cookie-based auth.
|
||||
|
||||
Priority:
|
||||
1. Authorization header (API calls)
|
||||
2. admin_token cookie (page navigation)
|
||||
|
||||
Args:
|
||||
current_user: User object from get_current_user dependency
|
||||
request: FastAPI request
|
||||
credentials: Optional Bearer token from header
|
||||
admin_token: Optional token from admin_token cookie
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
User: Admin user object
|
||||
User: Authenticated admin user
|
||||
|
||||
Raises:
|
||||
AdminRequiredException: If user is not an admin
|
||||
InvalidTokenException: If no token or invalid token
|
||||
AdminRequiredException: If user is not admin
|
||||
"""
|
||||
return auth_manager.require_admin(current_user)
|
||||
token, source = _get_token_from_request(
|
||||
credentials,
|
||||
admin_token,
|
||||
"admin_token",
|
||||
str(request.url.path)
|
||||
)
|
||||
|
||||
if not token:
|
||||
logger.warning(f"Admin auth failed: No token for {request.url.path}")
|
||||
raise InvalidTokenException("Admin authentication required")
|
||||
|
||||
# Validate token and get user
|
||||
user = _validate_user_token(token, db)
|
||||
|
||||
# Verify user is admin
|
||||
if user.role != "admin":
|
||||
logger.warning(
|
||||
f"Non-admin user {user.username} attempted admin route: {request.url.path}"
|
||||
)
|
||||
raise AdminRequiredException("Admin privileges required")
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def get_current_vendor_user(current_user: User = Depends(get_current_user)):
|
||||
def get_current_admin_api(
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
db: Session = Depends(get_db),
|
||||
) -> User:
|
||||
"""
|
||||
Require vendor user (vendor owner or vendor staff).
|
||||
Get current admin user from Authorization header ONLY.
|
||||
|
||||
This dependency ensures the current user has vendor role.
|
||||
Used for protecting vendor-only routes.
|
||||
Used for admin API endpoints that should not accept cookies.
|
||||
This prevents CSRF attacks on API endpoints.
|
||||
|
||||
Args:
|
||||
current_user: User object from get_current_user dependency
|
||||
credentials: Bearer token from Authorization header
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
User: Vendor user object
|
||||
User: Authenticated admin user
|
||||
|
||||
Raises:
|
||||
InsufficientPermissionsException: If user is not a vendor user
|
||||
InvalidTokenException: If no token or invalid token
|
||||
AdminRequiredException: If user is not admin
|
||||
"""
|
||||
return auth_manager.require_vendor(current_user)
|
||||
if not credentials:
|
||||
raise InvalidTokenException("Authorization header required for API calls")
|
||||
|
||||
user = _validate_user_token(credentials.credentials, db)
|
||||
|
||||
if user.role != "admin":
|
||||
logger.warning(f"Non-admin user {user.username} attempted admin API")
|
||||
raise AdminRequiredException("Admin privileges required")
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def get_current_customer_user(current_user: User = Depends(get_current_user)):
|
||||
# ============================================================================
|
||||
# VENDOR AUTHENTICATION
|
||||
# ============================================================================
|
||||
|
||||
def get_current_vendor_from_cookie_or_header(
|
||||
request: Request,
|
||||
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
||||
vendor_token: Optional[str] = Cookie(None),
|
||||
db: Session = Depends(get_db),
|
||||
) -> User:
|
||||
"""
|
||||
Require customer user.
|
||||
Get current vendor user from vendor_token cookie or Authorization header.
|
||||
|
||||
This dependency ensures the current user has customer role.
|
||||
Used for protecting customer account routes.
|
||||
Used for vendor HTML pages (/vendor/*) that need cookie-based auth.
|
||||
|
||||
Priority:
|
||||
1. Authorization header (API calls)
|
||||
2. vendor_token cookie (page navigation)
|
||||
|
||||
Args:
|
||||
current_user: User object from get_current_user dependency
|
||||
request: FastAPI request
|
||||
credentials: Optional Bearer token from header
|
||||
vendor_token: Optional token from vendor_token cookie
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
User: Customer user object
|
||||
User: Authenticated vendor user
|
||||
|
||||
Raises:
|
||||
InsufficientPermissionsException: If user is not a customer
|
||||
InvalidTokenException: If no token or invalid token
|
||||
InsufficientPermissionsException: If user is not vendor or is admin
|
||||
"""
|
||||
return auth_manager.require_customer(current_user)
|
||||
token, source = _get_token_from_request(
|
||||
credentials,
|
||||
vendor_token,
|
||||
"vendor_token",
|
||||
str(request.url.path)
|
||||
)
|
||||
|
||||
if not token:
|
||||
logger.warning(f"Vendor auth failed: No token for {request.url.path}")
|
||||
raise InvalidTokenException("Vendor authentication required")
|
||||
|
||||
# Validate token and get user
|
||||
user = _validate_user_token(token, db)
|
||||
|
||||
# CRITICAL: Block admins from vendor routes
|
||||
if user.role == "admin":
|
||||
logger.warning(
|
||||
f"Admin user {user.username} attempted vendor route: {request.url.path}"
|
||||
)
|
||||
raise InsufficientPermissionsException(
|
||||
"Vendor access only - admins cannot use vendor portal"
|
||||
)
|
||||
|
||||
# Verify user is vendor
|
||||
if user.role != "vendor":
|
||||
logger.warning(
|
||||
f"Non-vendor user {user.username} attempted vendor route: {request.url.path}"
|
||||
)
|
||||
raise InsufficientPermissionsException("Vendor privileges required")
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def get_current_vendor_api(
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
db: Session = Depends(get_db),
|
||||
) -> User:
|
||||
"""
|
||||
Get current vendor user from Authorization header ONLY.
|
||||
|
||||
Used for vendor API endpoints that should not accept cookies.
|
||||
|
||||
Args:
|
||||
credentials: Bearer token from Authorization header
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
User: Authenticated vendor user
|
||||
|
||||
Raises:
|
||||
InvalidTokenException: If no token or invalid token
|
||||
InsufficientPermissionsException: If user is not vendor or is admin
|
||||
"""
|
||||
if not credentials:
|
||||
raise InvalidTokenException("Authorization header required for API calls")
|
||||
|
||||
user = _validate_user_token(credentials.credentials, db)
|
||||
|
||||
# Block admins from vendor API
|
||||
if user.role == "admin":
|
||||
logger.warning(f"Admin user {user.username} attempted vendor API")
|
||||
raise InsufficientPermissionsException("Vendor access only")
|
||||
|
||||
if user.role != "vendor":
|
||||
logger.warning(f"Non-vendor user {user.username} attempted vendor API")
|
||||
raise InsufficientPermissionsException("Vendor privileges required")
|
||||
|
||||
return user
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CUSTOMER AUTHENTICATION (SHOP)
|
||||
# ============================================================================
|
||||
|
||||
def get_current_customer_from_cookie_or_header(
|
||||
request: Request,
|
||||
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
||||
customer_token: Optional[str] = Cookie(None),
|
||||
db: Session = Depends(get_db),
|
||||
) -> User:
|
||||
"""
|
||||
Get current customer user from customer_token cookie or Authorization header.
|
||||
|
||||
Used for shop account HTML pages (/shop/account/*) that need cookie-based auth.
|
||||
Note: Public shop pages (/shop/products, etc.) don't use this dependency.
|
||||
|
||||
Priority:
|
||||
1. Authorization header (API calls)
|
||||
2. customer_token cookie (page navigation)
|
||||
|
||||
Args:
|
||||
request: FastAPI request
|
||||
credentials: Optional Bearer token from header
|
||||
customer_token: Optional token from customer_token cookie
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
User: Authenticated customer user
|
||||
|
||||
Raises:
|
||||
InvalidTokenException: If no token or invalid token
|
||||
InsufficientPermissionsException: If user is not customer (admin/vendor blocked)
|
||||
"""
|
||||
token, source = _get_token_from_request(
|
||||
credentials,
|
||||
customer_token,
|
||||
"customer_token",
|
||||
str(request.url.path)
|
||||
)
|
||||
|
||||
if not token:
|
||||
logger.warning(f"Customer auth failed: No token for {request.url.path}")
|
||||
raise InvalidTokenException("Customer authentication required")
|
||||
|
||||
# Validate token and get user
|
||||
user = _validate_user_token(token, db)
|
||||
|
||||
# CRITICAL: Block admins from customer routes
|
||||
if user.role == "admin":
|
||||
logger.warning(
|
||||
f"Admin user {user.username} attempted shop account: {request.url.path}"
|
||||
)
|
||||
raise InsufficientPermissionsException(
|
||||
"Customer access only - admins cannot use shop"
|
||||
)
|
||||
|
||||
# CRITICAL: Block vendors from customer routes
|
||||
if user.role == "vendor":
|
||||
logger.warning(
|
||||
f"Vendor user {user.username} attempted shop account: {request.url.path}"
|
||||
)
|
||||
raise InsufficientPermissionsException(
|
||||
"Customer access only - vendors cannot use shop"
|
||||
)
|
||||
|
||||
# Verify user is customer
|
||||
if user.role != "customer":
|
||||
logger.warning(
|
||||
f"Non-customer user {user.username} attempted shop account: {request.url.path}"
|
||||
)
|
||||
raise InsufficientPermissionsException("Customer privileges required")
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def get_current_customer_api(
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
db: Session = Depends(get_db),
|
||||
) -> User:
|
||||
"""
|
||||
Get current customer user from Authorization header ONLY.
|
||||
|
||||
Used for shop API endpoints that should not accept cookies.
|
||||
|
||||
Args:
|
||||
credentials: Bearer token from Authorization header
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
User: Authenticated customer user
|
||||
|
||||
Raises:
|
||||
InvalidTokenException: If no token or invalid token
|
||||
InsufficientPermissionsException: If user is not customer (admin/vendor blocked)
|
||||
"""
|
||||
if not credentials:
|
||||
raise InvalidTokenException("Authorization header required for API calls")
|
||||
|
||||
user = _validate_user_token(credentials.credentials, db)
|
||||
|
||||
# Block admins from customer API
|
||||
if user.role == "admin":
|
||||
logger.warning(f"Admin user {user.username} attempted customer API")
|
||||
raise InsufficientPermissionsException("Customer access only")
|
||||
|
||||
# Block vendors from customer API
|
||||
if user.role == "vendor":
|
||||
logger.warning(f"Vendor user {user.username} attempted customer API")
|
||||
raise InsufficientPermissionsException("Customer access only")
|
||||
|
||||
if user.role != "customer":
|
||||
logger.warning(f"Non-customer user {user.username} attempted customer API")
|
||||
raise InsufficientPermissionsException("Customer privileges required")
|
||||
|
||||
return user
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# GENERIC AUTHENTICATION (for mixed-use endpoints)
|
||||
# ============================================================================
|
||||
|
||||
def get_current_user(
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
db: Session = Depends(get_db)
|
||||
) -> User:
|
||||
"""
|
||||
Get current authenticated user from Authorization header only.
|
||||
|
||||
Generic authentication without role checking.
|
||||
Used for endpoints accessible to any authenticated user.
|
||||
|
||||
Args:
|
||||
credentials: Bearer token from Authorization header
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
User: Authenticated user (any role)
|
||||
|
||||
Raises:
|
||||
InvalidTokenException: If no token or invalid token
|
||||
"""
|
||||
if not credentials:
|
||||
raise InvalidTokenException("Authorization header required")
|
||||
|
||||
return _validate_user_token(credentials.credentials, db)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# VENDOR OWNERSHIP VERIFICATION
|
||||
# ============================================================================
|
||||
|
||||
def get_user_vendor(
|
||||
vendor_code: str,
|
||||
current_user: User = Depends(get_current_user),
|
||||
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
) -> Vendor:
|
||||
"""
|
||||
Get vendor and verify user ownership.
|
||||
Get vendor and verify user ownership/membership.
|
||||
|
||||
Ensures the current user has access to the specified vendor.
|
||||
Admin users can access any vendor, regular users only their own.
|
||||
- Vendor owners can access their own vendor
|
||||
- Team members can access their vendor
|
||||
- Admins are BLOCKED (use admin routes instead)
|
||||
|
||||
Args:
|
||||
vendor_code: Vendor code to look up
|
||||
current_user: Current authenticated user
|
||||
current_user: Current authenticated vendor user
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
@@ -174,11 +482,19 @@ def get_user_vendor(
|
||||
VendorNotFoundException: If vendor doesn't exist
|
||||
UnauthorizedVendorAccessException: If user doesn't have access
|
||||
"""
|
||||
vendor = db.query(Vendor).filter(Vendor.vendor_code == vendor_code.upper()).first()
|
||||
vendor = db.query(Vendor).filter(
|
||||
Vendor.vendor_code == vendor_code.upper()
|
||||
).first()
|
||||
|
||||
if not vendor:
|
||||
raise VendorNotFoundException(vendor_code)
|
||||
|
||||
if current_user.role != "admin" and vendor.owner_user_id != current_user.id:
|
||||
raise UnauthorizedVendorAccessException(vendor_code, current_user.id)
|
||||
# Check if user owns this vendor
|
||||
if vendor.owner_user_id == current_user.id:
|
||||
return vendor
|
||||
|
||||
return vendor
|
||||
# Check if user is team member
|
||||
# TODO: Add team member check when VendorUser relationship is set up
|
||||
|
||||
# User doesn't have access to this vendor
|
||||
raise UnauthorizedVendorAccessException(vendor_code, current_user.id)
|
||||
|
||||
Reference in New Issue
Block a user