All checks were successful
- Fix platform-grouped merchant sidebar menu with core items at root level - Add merchant store management (detail page, create store, team page) - Fix store settings 500 error by removing dead stripe/API tab - Move onboarding translations to module-owned locale files - Fix onboarding banner i18n with server-side rendering + context inheritance - Refactor login language selectors to use languageSelector() function (LANG-002) - Move HTTPException handling to global exception handler in merchant routes (API-003) - Add language selector to all login pages and portal headers - Fix customer module: drop order stats from customer model, add to orders module - Fix admin menu config visibility for super admin platform context - Fix storefront auth and layout issues - Add missing i18n translations for onboarding steps (en/fr/de/lb) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1747 lines
57 KiB
Python
1747 lines
57 KiB
Python
# app/api/deps.py
|
|
"""
|
|
Authentication dependencies for FastAPI routes.
|
|
|
|
This module provides authentication dependencies for all three contexts in the
|
|
multi-tenant application, implementing dual token storage with proper isolation:
|
|
|
|
ADMIN ROUTES (/admin/*):
|
|
- Cookie: admin_token (path=/admin) OR Authorization header
|
|
- Role: admin only
|
|
- Blocks: stores, customers
|
|
|
|
STORE ROUTES (/store/*):
|
|
- Cookie: store_token (path=/store) OR Authorization header
|
|
- Role: store only
|
|
- Blocks: admins, customers
|
|
|
|
MERCHANT ROUTES (/merchants/*):
|
|
- Cookie: merchant_token (path=/merchants) OR Authorization header
|
|
- Role: store (merchant owners are store-role users who own merchants)
|
|
- Validates: User owns the merchant via Merchant.owner_user_id
|
|
|
|
CUSTOMER/STOREFRONT ROUTES (/storefront/account/*):
|
|
- Cookie: customer_token (path=/storefront) OR Authorization header
|
|
- Role: customer only
|
|
- Blocks: admins, stores
|
|
- Note: Public storefront pages (/storefront/products, etc.) don't require auth
|
|
|
|
This dual authentication approach supports:
|
|
- HTML pages: Use cookies (automatic browser behavior)
|
|
- API calls: Use Authorization headers (explicit JavaScript control)
|
|
|
|
The cookie path restrictions prevent cross-context cookie leakage:
|
|
- admin_token is NEVER sent to /store/* or /storefront/*
|
|
- store_token is NEVER sent to /admin/* or /storefront/*
|
|
- customer_token is NEVER sent to /admin/* or /store/*
|
|
"""
|
|
|
|
import logging
|
|
from datetime import UTC
|
|
|
|
from fastapi import Cookie, Depends, HTTPException, Request
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.database import get_db
|
|
from app.modules.enums import FrontendType
|
|
from app.modules.tenancy.exceptions import (
|
|
AdminRequiredException,
|
|
InsufficientPermissionsException,
|
|
InsufficientStorePermissionsException,
|
|
InvalidTokenException,
|
|
StoreNotFoundException,
|
|
StoreOwnerOnlyException,
|
|
UnauthorizedStoreAccessException,
|
|
)
|
|
from app.modules.tenancy.models import Store
|
|
from app.modules.tenancy.models import User as UserModel
|
|
from app.modules.tenancy.schemas.auth import UserContext
|
|
from app.modules.tenancy.services.store_service import store_service
|
|
from middleware.auth import AuthManager
|
|
from middleware.rate_limiter import RateLimiter
|
|
|
|
# Initialize dependencies
|
|
security = HTTPBearer(auto_error=False) # auto_error=False prevents automatic 403
|
|
auth_manager = AuthManager()
|
|
rate_limiter = RateLimiter()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# HELPER FUNCTIONS
|
|
# ============================================================================
|
|
|
|
|
|
async def get_resolved_store_code(request: Request) -> str:
|
|
"""Get store code from path parameter (path-based) or middleware (subdomain/custom domain)."""
|
|
# Path parameter from double-mount prefix (/store/{store_code}/...)
|
|
store_code = request.path_params.get("store_code")
|
|
if store_code:
|
|
return store_code
|
|
# Middleware-resolved store (subdomain or custom domain)
|
|
store = getattr(request.state, "store", None)
|
|
if store:
|
|
return store.store_code
|
|
raise HTTPException(status_code=404, detail="Store not found")
|
|
|
|
|
|
def _get_token_from_request(
|
|
credentials: HTTPAuthorizationCredentials | None,
|
|
cookie_value: str | None,
|
|
cookie_name: str,
|
|
request_path: str,
|
|
) -> tuple[str | None, str | None]:
|
|
"""
|
|
Extract token from Authorization header or cookie.
|
|
|
|
Priority:
|
|
1. Authorization header (for API calls from JavaScript)
|
|
2. Cookie (for browser page navigation)
|
|
|
|
Args:
|
|
credentials: Optional Bearer token from Authorization header
|
|
cookie_value: Optional token from cookie
|
|
cookie_name: Name of the cookie (for logging)
|
|
request_path: Request URL path (for logging)
|
|
|
|
Returns:
|
|
Tuple of (token, source) where source is "header" or "cookie"
|
|
"""
|
|
if credentials:
|
|
logger.debug(f"Token found in Authorization header for {request_path}")
|
|
return credentials.credentials, "header"
|
|
if cookie_value:
|
|
logger.debug(f"Token found in {cookie_name} cookie for {request_path}")
|
|
return cookie_value, "cookie"
|
|
|
|
return None, None
|
|
|
|
|
|
def _validate_user_token(token: str, db: Session) -> UserModel:
|
|
"""
|
|
Validate JWT token and return user.
|
|
|
|
Args:
|
|
token: JWT token string
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserModel: Authenticated user object
|
|
|
|
Raises:
|
|
InvalidTokenException: If token is invalid
|
|
"""
|
|
mock_credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
|
|
return auth_manager.get_current_user(db, mock_credentials)
|
|
|
|
|
|
def _get_user_model(user_context: UserContext, db: Session) -> UserModel:
|
|
"""
|
|
Get User database model from UserContext.
|
|
|
|
Used internally by permission-checking functions that need
|
|
access to User model methods like has_store_permission().
|
|
|
|
Args:
|
|
user_context: UserContext schema instance
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserModel: User database model
|
|
|
|
Raises:
|
|
InvalidTokenException: If user not found
|
|
"""
|
|
user = db.query(UserModel).filter(UserModel.id == user_context.id).first()
|
|
if not user:
|
|
raise InvalidTokenException("User not found")
|
|
|
|
# Copy token attributes from context to model for compatibility
|
|
if user_context.token_store_id:
|
|
user.token_store_id = user_context.token_store_id
|
|
if user_context.token_store_code:
|
|
user.token_store_code = user_context.token_store_code
|
|
if user_context.token_store_role:
|
|
user.token_store_role = user_context.token_store_role
|
|
|
|
return user
|
|
|
|
|
|
# ============================================================================
|
|
# PLATFORM CONTEXT
|
|
# ============================================================================
|
|
|
|
|
|
def require_platform(request: Request):
|
|
"""Dependency that requires platform context from middleware.
|
|
|
|
Raises HTTPException(400) if no platform is set on request.state.
|
|
Use as a FastAPI dependency in endpoints that need platform context.
|
|
"""
|
|
platform = getattr(request.state, "platform", None)
|
|
if not platform:
|
|
raise HTTPException(status_code=400, detail="Platform context required")
|
|
return platform
|
|
|
|
|
|
# ============================================================================
|
|
# ADMIN AUTHENTICATION
|
|
# ============================================================================
|
|
|
|
|
|
def get_current_admin_from_cookie_or_header(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
admin_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Get current admin user from admin_token cookie or Authorization header.
|
|
|
|
Used for admin HTML pages (/admin/*) that need cookie-based auth.
|
|
|
|
Priority:
|
|
1. Authorization header (API calls)
|
|
2. admin_token cookie (page navigation)
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
admin_token: Optional token from admin_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated admin user context
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
AdminRequiredException: If user is not admin
|
|
"""
|
|
token, source = _get_token_from_request(
|
|
credentials, admin_token, "admin_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
logger.warning(f"Admin auth failed: No token for {request.url.path}")
|
|
raise InvalidTokenException("Admin authentication required")
|
|
|
|
# Validate token and get user
|
|
user = _validate_user_token(token, db)
|
|
|
|
# Verify user is admin
|
|
if not user.is_admin:
|
|
logger.warning(
|
|
f"Non-admin user {user.username} attempted admin route: {request.url.path}"
|
|
)
|
|
raise AdminRequiredException("Admin privileges required")
|
|
|
|
return UserContext.from_user(user, include_store_context=False)
|
|
|
|
|
|
def get_current_admin_api(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Get current admin user from Authorization header ONLY.
|
|
|
|
Used for admin API endpoints that should not accept cookies.
|
|
This prevents CSRF attacks on API endpoints.
|
|
|
|
Args:
|
|
credentials: Bearer token from Authorization header
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated admin user context
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
AdminRequiredException: If user is not admin
|
|
"""
|
|
if not credentials:
|
|
raise InvalidTokenException("Authorization header required for API calls")
|
|
|
|
user = _validate_user_token(credentials.credentials, db)
|
|
|
|
if not user.is_admin:
|
|
logger.warning(f"Non-admin user {user.username} attempted admin API")
|
|
raise AdminRequiredException("Admin privileges required")
|
|
|
|
return UserContext.from_user(user, include_store_context=False)
|
|
|
|
|
|
# ============================================================================
|
|
# SUPER ADMIN AUTHENTICATION
|
|
# ============================================================================
|
|
|
|
|
|
def get_current_super_admin(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
admin_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Require super admin role.
|
|
|
|
Used for: Global settings, user management, platform creation/deletion,
|
|
admin-platform assignment management.
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
admin_token: Optional token from admin_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated super admin user context
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
AdminRequiredException: If user is not admin or not super admin
|
|
"""
|
|
user_context = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db)
|
|
|
|
if not user_context.is_super_admin:
|
|
logger.warning(
|
|
f"Platform admin {user_context.username} attempted super admin route: {request.url.path}"
|
|
)
|
|
raise AdminRequiredException("Super admin privileges required")
|
|
|
|
return user_context
|
|
|
|
|
|
def get_current_super_admin_api(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Require super admin role (API header only).
|
|
|
|
Used for super admin API endpoints that should not accept cookies.
|
|
|
|
Args:
|
|
credentials: Bearer token from Authorization header
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated super admin user context
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
AdminRequiredException: If user is not admin or not super admin
|
|
"""
|
|
user_context = get_current_admin_api(credentials, db)
|
|
|
|
if not user_context.is_super_admin:
|
|
logger.warning(f"Platform admin {user_context.username} attempted super admin API")
|
|
raise AdminRequiredException("Super admin privileges required")
|
|
|
|
return user_context
|
|
|
|
|
|
def require_platform_access(platform_id: int):
|
|
"""
|
|
Dependency factory to require admin access to a specific platform.
|
|
|
|
Super admins can access all platforms.
|
|
Platform admins can only access their assigned platforms.
|
|
|
|
Usage:
|
|
@router.get("/platforms/{platform_id}/stores")
|
|
def list_stores(
|
|
platform_id: int,
|
|
admin: UserContext = Depends(require_platform_access(platform_id))
|
|
):
|
|
...
|
|
"""
|
|
|
|
def _check_platform_access(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
admin_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
user_context = get_current_admin_from_cookie_or_header(
|
|
request, credentials, admin_token, db
|
|
)
|
|
|
|
# Super admins (accessible_platform_ids=None) can access all platforms
|
|
# Platform admins can only access their assigned platforms
|
|
can_access = (
|
|
user_context.accessible_platform_ids is None or
|
|
platform_id in (user_context.accessible_platform_ids or [])
|
|
)
|
|
if not can_access:
|
|
logger.warning(
|
|
f"Admin {user_context.username} denied access to platform_id={platform_id}"
|
|
)
|
|
raise InsufficientPermissionsException(
|
|
f"Access denied to platform {platform_id}"
|
|
)
|
|
|
|
return user_context
|
|
|
|
return _check_platform_access
|
|
|
|
|
|
def get_admin_with_platform_context(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
admin_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Get admin user and verify platform context from token.
|
|
|
|
For platform admins, extracts platform_id from JWT token and verifies access.
|
|
Stores platform in request.state.admin_platform for endpoint use.
|
|
|
|
Super admins bypass platform context check (they can access all platforms).
|
|
|
|
Note: This function needs the raw User model for token attributes and
|
|
platform access checks, so it uses _validate_user_token internally.
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
admin_token: Optional token from admin_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated admin with platform context
|
|
|
|
Raises:
|
|
InvalidTokenException: If platform admin token missing platform info
|
|
InsufficientPermissionsException: If platform access revoked
|
|
"""
|
|
from app.modules.tenancy.models import Platform
|
|
|
|
# Get raw token for platform_id extraction
|
|
token, source = _get_token_from_request(
|
|
credentials, admin_token, "admin_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
raise InvalidTokenException("Admin authentication required")
|
|
|
|
user = _validate_user_token(token, db)
|
|
|
|
if not user.is_admin:
|
|
raise AdminRequiredException("Admin privileges required")
|
|
|
|
# Super admins bypass platform context
|
|
if user.is_super_admin:
|
|
return UserContext.from_user(user, include_store_context=False)
|
|
|
|
# Platform admins need platform_id in token
|
|
if not hasattr(user, "token_platform_id"):
|
|
raise InvalidTokenException(
|
|
"Token missing platform information. Please select a platform."
|
|
)
|
|
|
|
platform_id = user.token_platform_id
|
|
|
|
# Verify admin still has access to this platform
|
|
if not user.can_access_platform(platform_id):
|
|
logger.warning(
|
|
f"Admin {user.username} lost access to platform_id={platform_id}"
|
|
)
|
|
raise InsufficientPermissionsException(
|
|
"Access to this platform has been revoked. Please login again."
|
|
)
|
|
|
|
# Load platform and store in request state
|
|
platform = db.query(Platform).filter(Platform.id == platform_id).first()
|
|
request.state.admin_platform = platform
|
|
|
|
return UserContext.from_user(user, include_store_context=False)
|
|
|
|
|
|
# ============================================================================
|
|
# MODULE-BASED ACCESS CONTROL
|
|
# ============================================================================
|
|
|
|
|
|
def require_module_access(module_code: str, frontend_type: FrontendType):
|
|
"""
|
|
Dependency factory for module-based route access control.
|
|
|
|
Checks if the specified module is enabled for the current platform.
|
|
Use this for routes that should be gated by module enablement but aren't
|
|
tied to a specific menu item.
|
|
|
|
Usage:
|
|
router = APIRouter(
|
|
dependencies=[Depends(require_module_access("messaging", FrontendType.ADMIN))]
|
|
)
|
|
|
|
router = APIRouter(
|
|
dependencies=[Depends(require_module_access("billing", FrontendType.STORE))]
|
|
)
|
|
|
|
Args:
|
|
module_code: Module code to check (e.g., "billing", "marketplace")
|
|
frontend_type: Frontend type (ADMIN or STORE). Required to determine
|
|
which authentication method to use.
|
|
|
|
Returns:
|
|
Dependency function that validates module access and returns User
|
|
"""
|
|
from app.modules.service import module_service
|
|
|
|
def _check_module_access(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
admin_token: str | None = Cookie(None),
|
|
store_token: str | None = Cookie(None),
|
|
merchant_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
user_context = None
|
|
platform_id = None
|
|
|
|
# Use explicit frontend_type to determine authentication method
|
|
is_admin_request = frontend_type == FrontendType.ADMIN
|
|
|
|
if is_admin_request:
|
|
try:
|
|
user_context = get_current_admin_from_cookie_or_header(
|
|
request, credentials, admin_token, db
|
|
)
|
|
# Get platform context for admin
|
|
if user_context.is_super_admin:
|
|
# Super admins bypass module checks
|
|
return user_context
|
|
platform = getattr(request.state, "admin_platform", None)
|
|
if platform:
|
|
platform_id = platform.id
|
|
except Exception:
|
|
pass
|
|
|
|
# Handle store request
|
|
if not user_context and frontend_type == FrontendType.STORE:
|
|
try:
|
|
user_context = get_current_store_from_cookie_or_header(
|
|
request, credentials, store_token, db
|
|
)
|
|
# Get platform from store context
|
|
store = getattr(request.state, "store", None)
|
|
if store and hasattr(store, "platform_id") and store.platform_id:
|
|
platform_id = store.platform_id
|
|
except Exception:
|
|
pass
|
|
|
|
# Handle merchant request
|
|
if not user_context and frontend_type == FrontendType.MERCHANT:
|
|
try:
|
|
user_context = get_current_merchant_from_cookie_or_header(
|
|
request, credentials, merchant_token, db
|
|
)
|
|
# Merchant portal is platform-agnostic; module checks not enforced
|
|
return user_context
|
|
except Exception:
|
|
pass
|
|
|
|
if not user_context:
|
|
raise InvalidTokenException("Authentication required")
|
|
|
|
# If no platform context, allow access (module checking requires platform)
|
|
if not platform_id:
|
|
logger.debug(f"No platform context for module check: {module_code}")
|
|
return user_context
|
|
|
|
# Check if module is enabled
|
|
if not module_service.is_module_enabled(db, platform_id, module_code):
|
|
logger.warning(
|
|
f"Module access denied: {module_code} disabled for "
|
|
f"platform_id={platform_id}, user={user_context.username}"
|
|
)
|
|
raise InsufficientPermissionsException(
|
|
f"The '{module_code}' module is not enabled for this platform"
|
|
)
|
|
|
|
return user_context
|
|
|
|
return _check_module_access
|
|
|
|
|
|
# ============================================================================
|
|
# MENU-BASED ACCESS CONTROL
|
|
# ============================================================================
|
|
|
|
|
|
def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
|
|
"""
|
|
Dependency factory for menu-based page route access control.
|
|
|
|
Checks if the specified menu item is visible/accessible for the current user:
|
|
- First checks if the module providing this menu item is enabled
|
|
- Then checks visibility configuration (platform or user level)
|
|
|
|
Access denied reasons:
|
|
- Module disabled: The feature module is not enabled for this platform
|
|
- Menu hidden: The menu item is hidden by platform/user configuration
|
|
|
|
Usage:
|
|
@router.get("/admin/inventory")
|
|
async def inventory_page(
|
|
current_user: User = Depends(
|
|
require_menu_access("inventory", FrontendType.ADMIN)
|
|
),
|
|
):
|
|
...
|
|
|
|
Args:
|
|
menu_item_id: Menu item identifier from registry
|
|
frontend_type: Which frontend (ADMIN or STORE)
|
|
|
|
Returns:
|
|
Dependency function that validates menu access and returns User
|
|
"""
|
|
from app.modules.core.services.menu_service import menu_service
|
|
from app.modules.enums import FrontendType as FT
|
|
from app.modules.registry import get_menu_item_module
|
|
from app.modules.service import module_service
|
|
|
|
def _check_menu_access(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
admin_token: str | None = Cookie(None),
|
|
store_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
# Get current user based on frontend type
|
|
if frontend_type == FT.ADMIN:
|
|
user_context = get_current_admin_from_cookie_or_header(
|
|
request, credentials, admin_token, db
|
|
)
|
|
|
|
if user_context.is_super_admin:
|
|
# Super admin: use platform from token if selected, else global (no filtering)
|
|
platform_id = user_context.token_platform_id
|
|
user_id = None
|
|
else:
|
|
# Platform admin: need platform context
|
|
# Try to get from request state
|
|
platform = getattr(request.state, "admin_platform", None)
|
|
if platform:
|
|
platform_id = platform.id
|
|
else:
|
|
# No platform context - allow access (will be restricted elsewhere)
|
|
# This handles routes that don't have platform context yet
|
|
return user_context
|
|
user_id = None
|
|
|
|
elif frontend_type == FT.STORE:
|
|
user_context = get_current_store_from_cookie_or_header(
|
|
request, credentials, store_token, db
|
|
)
|
|
|
|
# Store: get platform from store's platform association
|
|
store = getattr(request.state, "store", None)
|
|
if store and hasattr(store, "platform_id") and store.platform_id:
|
|
platform_id = store.platform_id
|
|
else:
|
|
# No platform context for store - allow access
|
|
# This handles edge cases where store doesn't have platform
|
|
return user_context
|
|
user_id = None
|
|
|
|
else:
|
|
raise ValueError(f"Unsupported frontend_type: {frontend_type}")
|
|
|
|
# First check: Is the module providing this menu item enabled?
|
|
if platform_id:
|
|
module_code = get_menu_item_module(menu_item_id, frontend_type)
|
|
if module_code and not module_service.is_module_enabled(db, platform_id, module_code):
|
|
logger.warning(
|
|
f"Module access denied: {menu_item_id} (module={module_code}) for "
|
|
f"user={user_context.username}, platform_id={platform_id}"
|
|
)
|
|
raise InsufficientPermissionsException(
|
|
f"The '{module_code}' module is not enabled for this platform. "
|
|
f"Contact your administrator to enable this feature."
|
|
)
|
|
|
|
# Second check: Is the menu item visible in configuration?
|
|
can_access = menu_service.can_access_menu_item(
|
|
db, frontend_type, menu_item_id, platform_id, user_id
|
|
)
|
|
|
|
if not can_access:
|
|
logger.warning(
|
|
f"Menu visibility denied: {menu_item_id} for "
|
|
f"user={user_context.username}, frontend={frontend_type.value}, "
|
|
f"platform_id={platform_id}, user_id={user_id}"
|
|
)
|
|
raise InsufficientPermissionsException(
|
|
f"Access to '{menu_item_id}' has been restricted. "
|
|
f"Contact your administrator if you need access."
|
|
)
|
|
|
|
return user_context
|
|
|
|
return _check_menu_access
|
|
|
|
|
|
# ============================================================================
|
|
# STORE AUTHENTICATION
|
|
# ============================================================================
|
|
|
|
|
|
def get_current_store_from_cookie_or_header(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
store_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Get current store user from store_token cookie or Authorization header.
|
|
|
|
Used for store HTML pages (/store/*) that need cookie-based auth.
|
|
|
|
Priority:
|
|
1. Authorization header (API calls)
|
|
2. store_token cookie (page navigation)
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
store_token: Optional token from store_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated store user context
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
InsufficientPermissionsException: If user is not store or is admin
|
|
"""
|
|
token, source = _get_token_from_request(
|
|
credentials, store_token, "store_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
logger.warning(f"Store auth failed: No token for {request.url.path}")
|
|
raise InvalidTokenException("Store authentication required")
|
|
|
|
# Validate token and get user
|
|
user = _validate_user_token(token, db)
|
|
|
|
# CRITICAL: Block admins from store routes
|
|
if user.is_admin:
|
|
logger.warning(
|
|
f"Admin user {user.username} attempted store route: {request.url.path}"
|
|
)
|
|
raise InsufficientPermissionsException(
|
|
"Store access only - admins cannot use store portal"
|
|
)
|
|
|
|
# Verify user is store user (merchant_owner or store_member)
|
|
if not user.is_store_user:
|
|
logger.warning(
|
|
f"Non-store user {user.username} attempted store route: {request.url.path}"
|
|
)
|
|
raise InsufficientPermissionsException("Store privileges required")
|
|
|
|
return UserContext.from_user(user)
|
|
|
|
|
|
def get_current_store_api(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Get current store user from Authorization header ONLY.
|
|
|
|
Used for store API endpoints that should not accept cookies.
|
|
Validates that:
|
|
1. Token contains store context (token_store_id)
|
|
2. User still has access to the store specified in the token
|
|
|
|
Args:
|
|
credentials: Bearer token from Authorization header
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated store user context (with token_store_id, token_store_code, token_store_role)
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token, invalid token, or missing store context
|
|
InsufficientPermissionsException: If user is not store or lost access to store
|
|
"""
|
|
if not credentials:
|
|
raise InvalidTokenException("Authorization header required for API calls")
|
|
|
|
user = _validate_user_token(credentials.credentials, db)
|
|
|
|
# Block admins from store API
|
|
if user.is_admin:
|
|
logger.warning(f"Admin user {user.username} attempted store API")
|
|
raise InsufficientPermissionsException("Store access only")
|
|
|
|
if not user.is_store_user:
|
|
logger.warning(f"Non-store user {user.username} attempted store API")
|
|
raise InsufficientPermissionsException("Store privileges required")
|
|
|
|
# Require store context in token
|
|
if not hasattr(user, "token_store_id"):
|
|
raise InvalidTokenException(
|
|
"Token missing store information. Please login again."
|
|
)
|
|
|
|
store_id = user.token_store_id
|
|
|
|
# Verify user still has access to this store
|
|
if not user.is_member_of(store_id):
|
|
logger.warning(f"User {user.username} lost access to store_id={store_id}")
|
|
raise InsufficientPermissionsException(
|
|
"Access to store has been revoked. Please login again."
|
|
)
|
|
|
|
logger.debug(
|
|
f"Store API access: user={user.username}, store_id={store_id}, "
|
|
f"store_code={getattr(user, 'token_store_code', 'N/A')}"
|
|
)
|
|
|
|
return UserContext.from_user(user)
|
|
|
|
|
|
# ============================================================================
|
|
# MERCHANT AUTHENTICATION (Billing Portal)
|
|
# ============================================================================
|
|
|
|
|
|
def get_current_merchant_from_cookie_or_header(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
merchant_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Get current merchant user from merchant_token cookie or Authorization header.
|
|
|
|
Used for merchant portal HTML pages (/merchants/*) that need cookie-based auth.
|
|
Validates that the user owns at least one merchant.
|
|
|
|
Priority:
|
|
1. Authorization header (API calls)
|
|
2. merchant_token cookie (page navigation)
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
merchant_token: Optional token from merchant_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated merchant owner user context
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
InsufficientPermissionsException: If user doesn't own any merchants
|
|
"""
|
|
token, source = _get_token_from_request(
|
|
credentials, merchant_token, "merchant_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
logger.warning(f"Merchant auth failed: No token for {request.url.path}")
|
|
raise InvalidTokenException("Merchant authentication required")
|
|
|
|
# Validate token and get user
|
|
user = _validate_user_token(token, db)
|
|
|
|
# Verify user owns at least one merchant
|
|
from app.modules.tenancy.models import Merchant
|
|
merchant_count = (
|
|
db.query(Merchant)
|
|
.filter(
|
|
Merchant.owner_user_id == user.id,
|
|
Merchant.is_active == True, # noqa: E712
|
|
)
|
|
.count()
|
|
)
|
|
|
|
if merchant_count == 0:
|
|
logger.warning(
|
|
f"User {user.username} attempted merchant route without owning any merchants: "
|
|
f"{request.url.path}"
|
|
)
|
|
raise InsufficientPermissionsException(
|
|
"Merchant owner privileges required"
|
|
)
|
|
|
|
return UserContext.from_user(user)
|
|
|
|
|
|
def get_current_merchant_api(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Get current merchant user from Authorization header ONLY.
|
|
|
|
Used for merchant API endpoints that should not accept cookies.
|
|
This prevents CSRF attacks on API endpoints.
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Bearer token from Authorization header
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated merchant owner user context
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
InsufficientPermissionsException: If user doesn't own any merchants
|
|
"""
|
|
if not credentials:
|
|
raise InvalidTokenException("Authorization header required for API calls")
|
|
|
|
user = _validate_user_token(credentials.credentials, db)
|
|
|
|
# Verify user owns at least one merchant
|
|
from app.modules.tenancy.models import Merchant
|
|
merchant_count = (
|
|
db.query(Merchant)
|
|
.filter(
|
|
Merchant.owner_user_id == user.id,
|
|
Merchant.is_active == True, # noqa: E712
|
|
)
|
|
.count()
|
|
)
|
|
|
|
if merchant_count == 0:
|
|
logger.warning(f"User {user.username} attempted merchant API without owning any merchants")
|
|
raise InsufficientPermissionsException(
|
|
"Merchant owner privileges required"
|
|
)
|
|
|
|
return UserContext.from_user(user)
|
|
|
|
|
|
def get_current_merchant_optional(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
merchant_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext | None:
|
|
"""
|
|
Get current merchant user, returning None if not authenticated.
|
|
|
|
Used for login pages to check if user is already authenticated.
|
|
|
|
Returns:
|
|
UserContext: Authenticated merchant owner if valid token exists
|
|
None: If no token, invalid token, or user doesn't own merchants
|
|
"""
|
|
token, source = _get_token_from_request(
|
|
credentials, merchant_token, "merchant_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
return None
|
|
|
|
try:
|
|
user = _validate_user_token(token, db)
|
|
|
|
# Verify user owns at least one merchant
|
|
from app.modules.tenancy.models import Merchant
|
|
merchant_count = (
|
|
db.query(Merchant)
|
|
.filter(
|
|
Merchant.owner_user_id == user.id,
|
|
Merchant.is_active == True, # noqa: E712
|
|
)
|
|
.count()
|
|
)
|
|
|
|
if merchant_count > 0:
|
|
return UserContext.from_user(user)
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def get_merchant_for_current_user(
|
|
request: Request,
|
|
current_user: UserContext = Depends(get_current_merchant_api),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get the active merchant owned by the current API user.
|
|
|
|
Used by merchant API endpoints (header-only auth) that need the Merchant object.
|
|
Stores the merchant on request.state.merchant for endpoint use.
|
|
|
|
Returns:
|
|
Merchant ORM object
|
|
|
|
Raises:
|
|
MerchantNotFoundException: If user owns no active merchants
|
|
"""
|
|
from app.modules.tenancy.exceptions import MerchantNotFoundException
|
|
from app.modules.tenancy.models import Merchant
|
|
|
|
merchant = (
|
|
db.query(Merchant)
|
|
.filter(
|
|
Merchant.owner_user_id == current_user.id,
|
|
Merchant.is_active == True, # noqa: E712
|
|
)
|
|
.order_by(Merchant.id)
|
|
.first()
|
|
)
|
|
|
|
if not merchant:
|
|
raise MerchantNotFoundException(
|
|
str(current_user.id), identifier_type="owner_user_id"
|
|
)
|
|
|
|
request.state.merchant = merchant
|
|
return merchant
|
|
|
|
|
|
def get_merchant_for_current_user_page(
|
|
request: Request,
|
|
current_user: UserContext = Depends(get_current_merchant_from_cookie_or_header),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get the active merchant owned by the current page user.
|
|
|
|
Used by merchant page routes (cookie+header auth) that need the Merchant object.
|
|
Stores the merchant on request.state.merchant for endpoint use.
|
|
|
|
Returns:
|
|
Merchant ORM object
|
|
|
|
Raises:
|
|
MerchantNotFoundException: If user owns no active merchants
|
|
"""
|
|
from app.modules.tenancy.exceptions import MerchantNotFoundException
|
|
from app.modules.tenancy.models import Merchant
|
|
|
|
merchant = (
|
|
db.query(Merchant)
|
|
.filter(
|
|
Merchant.owner_user_id == current_user.id,
|
|
Merchant.is_active == True, # noqa: E712
|
|
)
|
|
.order_by(Merchant.id)
|
|
.first()
|
|
)
|
|
|
|
if not merchant:
|
|
raise MerchantNotFoundException(
|
|
str(current_user.id), identifier_type="owner_user_id"
|
|
)
|
|
|
|
request.state.merchant = merchant
|
|
return merchant
|
|
|
|
|
|
# ============================================================================
|
|
# CUSTOMER AUTHENTICATION (STOREFRONT)
|
|
# ============================================================================
|
|
|
|
|
|
def _validate_customer_token(token: str, request: Request, db: Session):
|
|
"""
|
|
Validate customer JWT token and return CustomerContext schema.
|
|
|
|
Validates:
|
|
1. Token signature and expiration
|
|
2. Token type is "customer"
|
|
3. Customer exists and is active
|
|
4. Token store_id matches request store (URL-based)
|
|
|
|
Args:
|
|
token: JWT token string
|
|
request: FastAPI request (for store context)
|
|
db: Database session
|
|
|
|
Returns:
|
|
CustomerContext: Authenticated customer context schema
|
|
|
|
Raises:
|
|
InvalidTokenException: If token is invalid or expired
|
|
UnauthorizedStoreAccessException: If store mismatch
|
|
"""
|
|
from datetime import datetime
|
|
|
|
from jose import JWTError, jwt
|
|
|
|
from app.modules.customers.models.customer import Customer
|
|
from app.modules.customers.schemas import CustomerContext
|
|
|
|
# Decode and validate customer JWT token
|
|
try:
|
|
payload = jwt.decode(
|
|
token, auth_manager.secret_key, algorithms=[auth_manager.algorithm]
|
|
)
|
|
|
|
# Verify this is a customer token
|
|
token_type = payload.get("type")
|
|
if token_type != "customer":
|
|
logger.warning(f"Invalid token type for customer route: {token_type}")
|
|
raise InvalidTokenException("Customer authentication required")
|
|
|
|
# Get customer ID from token
|
|
customer_id: str = payload.get("sub")
|
|
if customer_id is None:
|
|
logger.warning("Token missing 'sub' (customer_id)")
|
|
raise InvalidTokenException("Invalid token")
|
|
|
|
# Verify token hasn't expired
|
|
exp = payload.get("exp")
|
|
if exp and datetime.fromtimestamp(exp, tz=UTC) < datetime.now(UTC):
|
|
logger.warning(f"Expired customer token for customer_id={customer_id}")
|
|
raise InvalidTokenException("Token has expired")
|
|
|
|
# Get store_id from token for validation
|
|
token_store_id = payload.get("store_id")
|
|
|
|
except JWTError as e:
|
|
logger.warning(f"JWT decode error: {str(e)}")
|
|
raise InvalidTokenException("Could not validate credentials")
|
|
|
|
# Load customer from database
|
|
customer = db.query(Customer).filter(Customer.id == int(customer_id)).first()
|
|
|
|
if not customer:
|
|
logger.warning(f"Customer not found: {customer_id}")
|
|
raise InvalidTokenException("Customer not found")
|
|
|
|
if not customer.is_active:
|
|
logger.warning(f"Inactive customer attempted access: {customer.email}")
|
|
raise InvalidTokenException("Customer account is inactive")
|
|
|
|
# Validate store context matches token
|
|
# This prevents using a customer token from store A on store B's storefront
|
|
request_store = getattr(request.state, "store", None)
|
|
if request_store and token_store_id:
|
|
if request_store.id != token_store_id:
|
|
logger.warning(
|
|
f"Customer {customer.email} token store mismatch: "
|
|
f"token={token_store_id}, request={request_store.id}"
|
|
)
|
|
raise UnauthorizedStoreAccessException(
|
|
store_code=request_store.store_code,
|
|
user_id=customer.id,
|
|
)
|
|
|
|
logger.debug(f"Customer authenticated: {customer.email} (ID: {customer.id})")
|
|
|
|
# Return CustomerContext schema instead of database model
|
|
return CustomerContext.from_db_model(customer)
|
|
|
|
|
|
def get_current_customer_from_cookie_or_header(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
customer_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get current customer from customer_token cookie or Authorization header.
|
|
|
|
Used for storefront account HTML pages (/storefront/account/*) that need cookie-based auth.
|
|
Note: Public storefront pages (/storefront/products, etc.) don't use this dependency.
|
|
|
|
Validates that token store_id matches request store (URL-based detection).
|
|
|
|
Priority:
|
|
1. Authorization header (API calls)
|
|
2. customer_token cookie (page navigation)
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
customer_token: Optional token from customer_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
CustomerContext: Authenticated customer context schema
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
UnauthorizedStoreAccessException: If store mismatch
|
|
"""
|
|
token, source = _get_token_from_request(
|
|
credentials, customer_token, "customer_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
logger.warning(f"Customer auth failed: No token for {request.url.path}")
|
|
raise InvalidTokenException("Customer authentication required")
|
|
|
|
return _validate_customer_token(token, request, db)
|
|
|
|
|
|
def get_current_customer_api(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get current customer from Authorization header ONLY.
|
|
|
|
Used for storefront API endpoints that should not accept cookies.
|
|
Validates that token store_id matches request store (URL-based detection).
|
|
|
|
Args:
|
|
request: FastAPI request (for store context)
|
|
credentials: Bearer token from Authorization header
|
|
db: Database session
|
|
|
|
Returns:
|
|
CustomerContext: Authenticated customer context schema
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
UnauthorizedStoreAccessException: If store mismatch
|
|
"""
|
|
if not credentials:
|
|
raise InvalidTokenException("Authorization header required for API calls")
|
|
|
|
return _validate_customer_token(credentials.credentials, request, db)
|
|
|
|
|
|
# ============================================================================
|
|
# GENERIC AUTHENTICATION (for mixed-use endpoints)
|
|
# ============================================================================
|
|
|
|
|
|
def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext:
|
|
"""
|
|
Get current authenticated user from Authorization header only.
|
|
|
|
Generic authentication without role checking.
|
|
Used for endpoints accessible to any authenticated user.
|
|
|
|
Args:
|
|
credentials: Bearer token from Authorization header
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated user context (any role)
|
|
|
|
Raises:
|
|
InvalidTokenException: If no token or invalid token
|
|
"""
|
|
if not credentials:
|
|
raise InvalidTokenException("Authorization header required")
|
|
|
|
user = _validate_user_token(credentials.credentials, db)
|
|
return UserContext.from_user(user)
|
|
|
|
|
|
# ============================================================================
|
|
# STORE OWNERSHIP VERIFICATION
|
|
# ============================================================================
|
|
|
|
|
|
def get_user_store(
|
|
store_code: str,
|
|
current_user: UserContext = Depends(get_current_store_from_cookie_or_header),
|
|
db: Session = Depends(get_db),
|
|
) -> Store:
|
|
"""
|
|
Get store and verify user ownership/membership.
|
|
|
|
Ensures the current user has access to the specified store.
|
|
- Store owners can access their own store
|
|
- Team members can access their store
|
|
- Admins are BLOCKED (use admin routes instead)
|
|
|
|
Args:
|
|
store_code: Store code to look up
|
|
current_user: Current authenticated store user
|
|
db: Database session
|
|
|
|
Returns:
|
|
Store: Store object if user has access
|
|
|
|
Raises:
|
|
StoreNotFoundException: If store doesn't exist
|
|
UnauthorizedStoreAccessException: If user doesn't have access
|
|
"""
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
store = (
|
|
db.query(Store)
|
|
.options(joinedload(Store.merchant))
|
|
.filter(Store.store_code == store_code.upper())
|
|
.first()
|
|
)
|
|
|
|
if not store:
|
|
raise StoreNotFoundException(store_code)
|
|
|
|
# Check if user owns this store (via merchant ownership)
|
|
if store.merchant and store.merchant.owner_user_id == current_user.id:
|
|
return store
|
|
|
|
# Check if user is team member
|
|
# TODO: Add team member check when StoreUser relationship is set up
|
|
|
|
# User doesn't have access to this store
|
|
raise UnauthorizedStoreAccessException(store_code, current_user.id)
|
|
|
|
|
|
# ============================================================================
|
|
# PERMISSIONS CHECKING
|
|
# ============================================================================
|
|
|
|
|
|
def require_store_permission(permission: str):
|
|
"""
|
|
Dependency factory to require a specific store permission.
|
|
|
|
Uses token_store_id from JWT token (authenticated store API pattern).
|
|
The store object is loaded and stored in request.state.store for endpoint use.
|
|
|
|
Usage:
|
|
@router.get("/products")
|
|
def list_products(
|
|
request: Request,
|
|
user: UserContext = Depends(require_store_permission("products.view"))
|
|
):
|
|
store = request.state.store # Store is set by this dependency
|
|
...
|
|
"""
|
|
|
|
def permission_checker(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserContext = Depends(get_current_store_from_cookie_or_header),
|
|
) -> UserContext:
|
|
# Get store ID from JWT token
|
|
if not current_user.token_store_id:
|
|
raise InvalidTokenException(
|
|
"Token missing store information. Please login again."
|
|
)
|
|
|
|
store_id = current_user.token_store_id
|
|
|
|
# Load store from database (raises StoreNotFoundException if not found)
|
|
store = store_service.get_store_by_id(db, store_id)
|
|
|
|
# Store store in request state for endpoint use
|
|
request.state.store = store
|
|
|
|
# Check if user has permission (need User model for this)
|
|
user_model = _get_user_model(current_user, db)
|
|
if not user_model.has_store_permission(store.id, permission):
|
|
raise InsufficientStorePermissionsException(
|
|
required_permission=permission,
|
|
store_code=store.store_code,
|
|
)
|
|
|
|
return current_user
|
|
|
|
return permission_checker
|
|
|
|
|
|
def require_store_owner(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserContext = Depends(get_current_store_from_cookie_or_header),
|
|
) -> UserContext:
|
|
"""
|
|
Dependency to require store owner role.
|
|
|
|
Uses token_store_id from JWT token (authenticated store API pattern).
|
|
The store object is loaded and stored in request.state.store for endpoint use.
|
|
|
|
Usage:
|
|
@router.delete("/team/{user_id}")
|
|
def remove_team_member(
|
|
request: Request,
|
|
user: UserContext = Depends(require_store_owner)
|
|
):
|
|
store = request.state.store # Store is set by this dependency
|
|
...
|
|
"""
|
|
# Get store ID from JWT token
|
|
if not current_user.token_store_id:
|
|
raise InvalidTokenException(
|
|
"Token missing store information. Please login again."
|
|
)
|
|
|
|
store_id = current_user.token_store_id
|
|
|
|
# Load store from database (raises StoreNotFoundException if not found)
|
|
store = store_service.get_store_by_id(db, store_id)
|
|
|
|
# Store store in request state for endpoint use
|
|
request.state.store = store
|
|
|
|
# Need User model for is_owner_of check
|
|
user_model = _get_user_model(current_user, db)
|
|
if not user_model.is_owner_of(store.id):
|
|
raise StoreOwnerOnlyException(
|
|
operation="team management",
|
|
store_code=store.store_code,
|
|
)
|
|
|
|
return current_user
|
|
|
|
|
|
def require_any_store_permission(*permissions: str):
|
|
"""
|
|
Dependency factory to require ANY of the specified permissions.
|
|
|
|
Uses token_store_id from JWT token (authenticated store API pattern).
|
|
The store object is loaded and stored in request.state.store for endpoint use.
|
|
|
|
Usage:
|
|
@router.get("/dashboard")
|
|
def dashboard(
|
|
request: Request,
|
|
user: UserContext = Depends(require_any_store_permission(
|
|
"dashboard.view",
|
|
"reports.view"
|
|
))
|
|
):
|
|
store = request.state.store # Store is set by this dependency
|
|
...
|
|
"""
|
|
|
|
def permission_checker(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserContext = Depends(get_current_store_from_cookie_or_header),
|
|
) -> UserContext:
|
|
# Get store ID from JWT token
|
|
if not current_user.token_store_id:
|
|
raise InvalidTokenException(
|
|
"Token missing store information. Please login again."
|
|
)
|
|
|
|
store_id = current_user.token_store_id
|
|
|
|
# Load store from database (raises StoreNotFoundException if not found)
|
|
store = store_service.get_store_by_id(db, store_id)
|
|
|
|
# Store store in request state for endpoint use
|
|
request.state.store = store
|
|
|
|
# Check if user has ANY of the required permissions (need User model)
|
|
user_model = _get_user_model(current_user, db)
|
|
has_permission = any(
|
|
user_model.has_store_permission(store.id, perm) for perm in permissions
|
|
)
|
|
|
|
if not has_permission:
|
|
raise InsufficientStorePermissionsException(
|
|
required_permission=f"Any of: {', '.join(permissions)}",
|
|
store_code=store.store_code,
|
|
)
|
|
|
|
return current_user
|
|
|
|
return permission_checker
|
|
|
|
|
|
def require_all_store_permissions(*permissions: str):
|
|
"""
|
|
Dependency factory to require ALL of the specified permissions.
|
|
|
|
Uses token_store_id from JWT token (authenticated store API pattern).
|
|
The store object is loaded and stored in request.state.store for endpoint use.
|
|
|
|
Usage:
|
|
@router.post("/products/bulk-delete")
|
|
def bulk_delete_products(
|
|
request: Request,
|
|
user: UserContext = Depends(require_all_store_permissions(
|
|
"products.view",
|
|
"products.delete"
|
|
))
|
|
):
|
|
store = request.state.store # Store is set by this dependency
|
|
...
|
|
"""
|
|
|
|
def permission_checker(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserContext = Depends(get_current_store_from_cookie_or_header),
|
|
) -> UserContext:
|
|
# Get store ID from JWT token
|
|
if not current_user.token_store_id:
|
|
raise InvalidTokenException(
|
|
"Token missing store information. Please login again."
|
|
)
|
|
|
|
store_id = current_user.token_store_id
|
|
|
|
# Load store from database (raises StoreNotFoundException if not found)
|
|
store = store_service.get_store_by_id(db, store_id)
|
|
|
|
# Store store in request state for endpoint use
|
|
request.state.store = store
|
|
|
|
# Check if user has ALL required permissions (need User model)
|
|
user_model = _get_user_model(current_user, db)
|
|
missing_permissions = [
|
|
perm
|
|
for perm in permissions
|
|
if not user_model.has_store_permission(store.id, perm)
|
|
]
|
|
|
|
if missing_permissions:
|
|
raise InsufficientStorePermissionsException(
|
|
required_permission=f"All of: {', '.join(permissions)}",
|
|
store_code=store.store_code,
|
|
)
|
|
|
|
return current_user
|
|
|
|
return permission_checker
|
|
|
|
|
|
def get_user_permissions(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserContext = Depends(get_current_store_from_cookie_or_header),
|
|
) -> list:
|
|
"""
|
|
Get all permissions for current user in current store.
|
|
|
|
Uses token_store_id from JWT token (authenticated store API pattern).
|
|
Also sets request.state.store for endpoint use.
|
|
|
|
Returns empty list if no store context in token.
|
|
"""
|
|
# Get store ID from JWT token
|
|
if not current_user.token_store_id:
|
|
return []
|
|
|
|
store_id = current_user.token_store_id
|
|
|
|
# Load store from database
|
|
store = store_service.get_store_by_id(db, store_id)
|
|
|
|
# Store store in request state for endpoint use
|
|
request.state.store = store
|
|
|
|
# Need User model for ownership and membership checks
|
|
user_model = _get_user_model(current_user, db)
|
|
|
|
# If owner, return all permissions
|
|
if user_model.is_owner_of(store.id):
|
|
from app.modules.tenancy.services.permission_discovery_service import (
|
|
permission_discovery_service,
|
|
)
|
|
|
|
return list(permission_discovery_service.get_all_permission_ids())
|
|
|
|
# Get permissions from store membership
|
|
for vm in user_model.store_memberships:
|
|
if vm.store_id == store.id and vm.is_active:
|
|
return vm.get_all_permissions()
|
|
|
|
return []
|
|
|
|
|
|
# ============================================================================
|
|
# PAGE-LEVEL PERMISSION GUARDS (For Store Page Routes)
|
|
# ============================================================================
|
|
|
|
|
|
def require_store_page_permission(permission: str):
|
|
"""
|
|
Dependency factory to require a specific store permission for page routes.
|
|
|
|
Same as require_store_permission but raises InsufficientStorePermissionsException
|
|
which the exception handler intercepts for HTML requests (redirecting to login).
|
|
|
|
Usage:
|
|
@router.get("/products", response_class=HTMLResponse)
|
|
def store_products_page(
|
|
request: Request,
|
|
store_code: str = Depends(get_resolved_store_code),
|
|
current_user: User = Depends(require_store_page_permission("products.view")),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
...
|
|
"""
|
|
|
|
def permission_checker(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserContext = Depends(get_current_store_from_cookie_or_header),
|
|
) -> UserContext:
|
|
if not current_user.token_store_id:
|
|
raise InvalidTokenException(
|
|
"Token missing store information. Please login again."
|
|
)
|
|
|
|
store_id = current_user.token_store_id
|
|
store = store_service.get_store_by_id(db, store_id)
|
|
request.state.store = store
|
|
|
|
user_model = _get_user_model(current_user, db)
|
|
if not user_model.has_store_permission(store.id, permission):
|
|
raise InsufficientStorePermissionsException(
|
|
required_permission=permission,
|
|
store_code=store.store_code,
|
|
)
|
|
|
|
return current_user
|
|
|
|
return permission_checker
|
|
|
|
|
|
# ============================================================================
|
|
# OPTIONAL AUTHENTICATION (For Login Page Redirects)
|
|
# ============================================================================
|
|
|
|
|
|
def get_current_admin_optional(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
admin_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext | None:
|
|
"""
|
|
Get current admin user from admin_token cookie or Authorization header.
|
|
|
|
Returns None instead of raising exceptions if not authenticated.
|
|
Used for login pages to check if user is already authenticated.
|
|
|
|
Priority:
|
|
1. Authorization header (API calls)
|
|
2. admin_token cookie (page navigation)
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
admin_token: Optional token from admin_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated admin user context if valid token exists
|
|
None: If no token, invalid token, or user is not admin
|
|
"""
|
|
token, source = _get_token_from_request(
|
|
credentials, admin_token, "admin_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
return None
|
|
|
|
try:
|
|
# Validate token and get user
|
|
user = _validate_user_token(token, db)
|
|
|
|
# Verify user is admin
|
|
if user.is_admin:
|
|
return UserContext.from_user(user, include_store_context=False)
|
|
except Exception:
|
|
# Invalid token or other error
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def get_current_store_optional(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
store_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
) -> UserContext | None:
|
|
"""
|
|
Get current store user from store_token cookie or Authorization header.
|
|
|
|
Returns None instead of raising exceptions if not authenticated.
|
|
Used for login pages to check if user is already authenticated.
|
|
|
|
Priority:
|
|
1. Authorization header (API calls)
|
|
2. store_token cookie (page navigation)
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
store_token: Optional token from store_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
UserContext: Authenticated store user context if valid token exists
|
|
None: If no token, invalid token, or user is not store
|
|
"""
|
|
token, source = _get_token_from_request(
|
|
credentials, store_token, "store_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
return None
|
|
|
|
try:
|
|
# Validate token and get user
|
|
user = _validate_user_token(token, db)
|
|
|
|
# Verify user is a store user
|
|
if user.is_store_user:
|
|
return UserContext.from_user(user)
|
|
except Exception:
|
|
# Invalid token or other error
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def get_current_customer_optional(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
customer_token: str | None = Cookie(None),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get current customer from customer_token cookie or Authorization header.
|
|
|
|
Returns None instead of raising exceptions if not authenticated.
|
|
Used for login pages to check if user is already authenticated.
|
|
|
|
Priority:
|
|
1. Authorization header (API calls)
|
|
2. customer_token cookie (page navigation)
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
credentials: Optional Bearer token from header
|
|
customer_token: Optional token from customer_token cookie
|
|
db: Database session
|
|
|
|
Returns:
|
|
CustomerContext: Authenticated customer context if valid token exists
|
|
None: If no token, invalid token, or store mismatch
|
|
"""
|
|
token, source = _get_token_from_request(
|
|
credentials, customer_token, "customer_token", str(request.url.path)
|
|
)
|
|
|
|
if not token:
|
|
return None
|
|
|
|
try:
|
|
# Validate customer token (includes store validation)
|
|
return _validate_customer_token(token, request, db)
|
|
except Exception:
|
|
# Invalid token, store mismatch, or other error
|
|
return None
|