refactor(api): introduce UserContext schema for API dependency injection

Replace direct User database model imports in API endpoints with UserContext
schema, following the architecture principle that API routes should not import
database models directly.

Changes:
- Create UserContext schema in models/schema/auth.py with from_user() factory
- Update app/api/deps.py to return UserContext from all auth dependencies
- Add _get_user_model() helper for functions needing User model access
- Update 58 API endpoint files to use UserContext instead of User
- Add noqa comments for 4 legitimate edge cases (enums, internal helpers)

Architecture validation: 0 errors (down from 61), 11 warnings remain

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-30 20:47:33 +01:00
parent 1ad30bd77e
commit cad862f469
60 changed files with 755 additions and 589 deletions

View File

@@ -51,8 +51,9 @@ from app.exceptions import (
from app.services.vendor_service import vendor_service
from middleware.auth import AuthManager
from middleware.rate_limiter import RateLimiter
from models.database.user import User
from models.database.user import User as UserModel
from models.database.vendor import Vendor
from models.schema.auth import UserContext
# Initialize dependencies
security = HTTPBearer(auto_error=False) # auto_error=False prevents automatic 403
@@ -98,7 +99,7 @@ def _get_token_from_request(
return None, None
def _validate_user_token(token: str, db: Session) -> User:
def _validate_user_token(token: str, db: Session) -> UserModel:
"""
Validate JWT token and return user.
@@ -107,7 +108,7 @@ def _validate_user_token(token: str, db: Session) -> User:
db: Database session
Returns:
User: Authenticated user object
UserModel: Authenticated user object
Raises:
InvalidTokenException: If token is invalid
@@ -116,6 +117,38 @@ def _validate_user_token(token: str, db: Session) -> User:
return auth_manager.get_current_user(db, mock_credentials)
def _get_user_model(user_context: UserContext, db: Session) -> UserModel:
"""
Get User database model from UserContext.
Used internally by permission-checking functions that need
access to User model methods like has_vendor_permission().
Args:
user_context: UserContext schema instance
db: Database session
Returns:
UserModel: User database model
Raises:
InvalidTokenException: If user not found
"""
user = db.query(UserModel).filter(UserModel.id == user_context.id).first()
if not user:
raise InvalidTokenException("User not found")
# Copy token attributes from context to model for compatibility
if user_context.token_vendor_id:
user.token_vendor_id = user_context.token_vendor_id
if user_context.token_vendor_code:
user.token_vendor_code = user_context.token_vendor_code
if user_context.token_vendor_role:
user.token_vendor_role = user_context.token_vendor_role
return user
# ============================================================================
# ADMIN AUTHENTICATION
# ============================================================================
@@ -126,7 +159,7 @@ def get_current_admin_from_cookie_or_header(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
"""
Get current admin user from admin_token cookie or Authorization header.
@@ -143,7 +176,7 @@ def get_current_admin_from_cookie_or_header(
db: Database session
Returns:
User: Authenticated admin user
UserContext: Authenticated admin user context
Raises:
InvalidTokenException: If no token or invalid token
@@ -167,13 +200,13 @@ def get_current_admin_from_cookie_or_header(
)
raise AdminRequiredException("Admin privileges required")
return user
return UserContext.from_user(user, include_vendor_context=False)
def get_current_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
"""
Get current admin user from Authorization header ONLY.
@@ -185,7 +218,7 @@ def get_current_admin_api(
db: Database session
Returns:
User: Authenticated admin user
UserContext: Authenticated admin user context
Raises:
InvalidTokenException: If no token or invalid token
@@ -200,7 +233,7 @@ def get_current_admin_api(
logger.warning(f"Non-admin user {user.username} attempted admin API")
raise AdminRequiredException("Admin privileges required")
return user
return UserContext.from_user(user, include_vendor_context=False)
# ============================================================================
@@ -213,7 +246,7 @@ def get_current_super_admin(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
"""
Require super admin role.
@@ -227,27 +260,27 @@ def get_current_super_admin(
db: Database session
Returns:
User: Authenticated super admin user
UserContext: Authenticated super admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin or not super admin
"""
user = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db)
user_context = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db)
if not user.is_super_admin:
if not user_context.is_super_admin:
logger.warning(
f"Platform admin {user.username} attempted super admin route: {request.url.path}"
f"Platform admin {user_context.username} attempted super admin route: {request.url.path}"
)
raise AdminRequiredException("Super admin privileges required")
return user
return user_context
def get_current_super_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
"""
Require super admin role (API header only).
@@ -258,19 +291,19 @@ def get_current_super_admin_api(
db: Database session
Returns:
User: Authenticated super admin user
UserContext: Authenticated super admin user context
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin or not super admin
"""
user = get_current_admin_api(credentials, db)
user_context = get_current_admin_api(credentials, db)
if not user.is_super_admin:
logger.warning(f"Platform admin {user.username} attempted super admin API")
if not user_context.is_super_admin:
logger.warning(f"Platform admin {user_context.username} attempted super admin API")
raise AdminRequiredException("Super admin privileges required")
return user
return user_context
def require_platform_access(platform_id: int):
@@ -284,7 +317,7 @@ def require_platform_access(platform_id: int):
@router.get("/platforms/{platform_id}/vendors")
def list_vendors(
platform_id: int,
admin: User = Depends(require_platform_access(platform_id))
admin: UserContext = Depends(require_platform_access(platform_id))
):
...
"""
@@ -294,20 +327,26 @@ def require_platform_access(platform_id: int):
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
user = get_current_admin_from_cookie_or_header(
) -> UserContext:
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
if not user.can_access_platform(platform_id):
# Super admins (accessible_platform_ids=None) can access all platforms
# Platform admins can only access their assigned platforms
can_access = (
user_context.accessible_platform_ids is None or
platform_id in (user_context.accessible_platform_ids or [])
)
if not can_access:
logger.warning(
f"Admin {user.username} denied access to platform_id={platform_id}"
f"Admin {user_context.username} denied access to platform_id={platform_id}"
)
raise InsufficientPermissionsException(
f"Access denied to platform {platform_id}"
)
return user
return user_context
return _check_platform_access
@@ -317,7 +356,7 @@ def get_admin_with_platform_context(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
"""
Get admin user and verify platform context from token.
@@ -326,6 +365,9 @@ def get_admin_with_platform_context(
Super admins bypass platform context check (they can access all platforms).
Note: This function needs the raw User model for token attributes and
platform access checks, so it uses _validate_user_token internally.
Args:
request: FastAPI request
credentials: Optional Bearer token from header
@@ -333,7 +375,7 @@ def get_admin_with_platform_context(
db: Database session
Returns:
User: Authenticated admin with platform context
UserContext: Authenticated admin with platform context
Raises:
InvalidTokenException: If platform admin token missing platform info
@@ -341,11 +383,22 @@ def get_admin_with_platform_context(
"""
from models.database.platform import Platform
user = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db)
# Get raw token for platform_id extraction
token, source = _get_token_from_request(
credentials, admin_token, "admin_token", str(request.url.path)
)
if not token:
raise InvalidTokenException("Admin authentication required")
user = _validate_user_token(token, db)
if user.role != "admin":
raise AdminRequiredException("Admin privileges required")
# Super admins bypass platform context
if user.is_super_admin:
return user
return UserContext.from_user(user, include_vendor_context=False)
# Platform admins need platform_id in token
if not hasattr(user, "token_platform_id"):
@@ -368,7 +421,7 @@ def get_admin_with_platform_context(
platform = db.query(Platform).filter(Platform.id == platform_id).first()
request.state.admin_platform = platform
return user
return UserContext.from_user(user, include_vendor_context=False)
# ============================================================================
@@ -405,34 +458,33 @@ def require_module_access(module_code: str):
admin_token: str | None = Cookie(None),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
# Try admin auth first, then vendor
user = None
user_context = None
platform_id = None
# Check if this is an admin request
if admin_token or (credentials and request.url.path.startswith("/admin")):
try:
user = get_current_admin_from_cookie_or_header(
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
# Get platform context for admin
if user.is_super_admin:
if user_context.is_super_admin:
# Super admins bypass module checks
return user
return user_context
else:
platform = getattr(request.state, "admin_platform", None)
if platform:
platform_id = platform.id
elif hasattr(user, "token_platform_id"):
platform_id = user.token_platform_id
# Note: token_platform_id is not on UserContext, would need to be added
except Exception:
pass
# Check if this is a vendor request
if not user and (vendor_token or (credentials and "/vendor/" in request.url.path)):
if not user_context and (vendor_token or (credentials and "/vendor/" in request.url.path)):
try:
user = get_current_vendor_from_cookie_or_header(
user_context = get_current_vendor_from_cookie_or_header(
request, credentials, vendor_token, db
)
# Get platform from vendor context
@@ -442,25 +494,25 @@ def require_module_access(module_code: str):
except Exception:
pass
if not user:
if not user_context:
raise InvalidTokenException("Authentication required")
# If no platform context, allow access (module checking requires platform)
if not platform_id:
logger.debug(f"No platform context for module check: {module_code}")
return user
return user_context
# Check if module is enabled
if not module_service.is_module_enabled(db, platform_id, module_code):
logger.warning(
f"Module access denied: {module_code} disabled for "
f"platform_id={platform_id}, user={user.username}"
f"platform_id={platform_id}, user={user_context.username}"
)
raise InsufficientPermissionsException(
f"The '{module_code}' module is not enabled for this platform"
)
return user
return user_context
return _check_module_access
@@ -509,33 +561,31 @@ def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
admin_token: str | None = Cookie(None),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
# Get current user based on frontend type
if frontend_type == FT.ADMIN:
user = get_current_admin_from_cookie_or_header(
user_context = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
if user.is_super_admin:
if user_context.is_super_admin:
# Super admin: check user-level config
platform_id = None
user_id = user.id
user_id = user_context.id
else:
# Platform admin: need platform context
# Try to get from request state or token
# Try to get from request state
platform = getattr(request.state, "admin_platform", None)
if platform:
platform_id = platform.id
elif hasattr(user, "token_platform_id"):
platform_id = user.token_platform_id
else:
# No platform context - allow access (will be restricted elsewhere)
# This handles routes that don't have platform context yet
return user
return user_context
user_id = None
elif frontend_type == FT.VENDOR:
user = get_current_vendor_from_cookie_or_header(
user_context = get_current_vendor_from_cookie_or_header(
request, credentials, vendor_token, db
)
@@ -546,7 +596,7 @@ def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
else:
# No platform context for vendor - allow access
# This handles edge cases where vendor doesn't have platform
return user
return user_context
user_id = None
else:
@@ -558,7 +608,7 @@ def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
if module_code and not module_service.is_module_enabled(db, platform_id, module_code):
logger.warning(
f"Module access denied: {menu_item_id} (module={module_code}) for "
f"user={user.username}, platform_id={platform_id}"
f"user={user_context.username}, platform_id={platform_id}"
)
raise InsufficientPermissionsException(
f"The '{module_code}' module is not enabled for this platform. "
@@ -573,7 +623,7 @@ def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
if not can_access:
logger.warning(
f"Menu visibility denied: {menu_item_id} for "
f"user={user.username}, frontend={frontend_type.value}, "
f"user={user_context.username}, frontend={frontend_type.value}, "
f"platform_id={platform_id}, user_id={user_id}"
)
raise InsufficientPermissionsException(
@@ -581,7 +631,7 @@ def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
f"Contact your administrator if you need access."
)
return user
return user_context
return _check_menu_access
@@ -596,7 +646,7 @@ def get_current_vendor_from_cookie_or_header(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
"""
Get current vendor user from vendor_token cookie or Authorization header.
@@ -613,7 +663,7 @@ def get_current_vendor_from_cookie_or_header(
db: Database session
Returns:
User: Authenticated vendor user
UserContext: Authenticated vendor user context
Raises:
InvalidTokenException: If no token or invalid token
@@ -646,13 +696,13 @@ def get_current_vendor_from_cookie_or_header(
)
raise InsufficientPermissionsException("Vendor privileges required")
return user
return UserContext.from_user(user)
def get_current_vendor_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
"""
Get current vendor user from Authorization header ONLY.
@@ -666,7 +716,7 @@ def get_current_vendor_api(
db: Database session
Returns:
User: Authenticated vendor user (with token_vendor_id, token_vendor_code, token_vendor_role)
UserContext: Authenticated vendor user context (with token_vendor_id, token_vendor_code, token_vendor_role)
Raises:
InvalidTokenException: If no token, invalid token, or missing vendor context
@@ -706,7 +756,7 @@ def get_current_vendor_api(
f"vendor_code={getattr(user, 'token_vendor_code', 'N/A')}"
)
return user
return UserContext.from_user(user)
# ============================================================================
@@ -884,7 +934,7 @@ def get_current_customer_api(
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
) -> UserContext:
"""
Get current authenticated user from Authorization header only.
@@ -896,7 +946,7 @@ def get_current_user(
db: Database session
Returns:
User: Authenticated user (any role)
UserContext: Authenticated user context (any role)
Raises:
InvalidTokenException: If no token or invalid token
@@ -904,7 +954,8 @@ def get_current_user(
if not credentials:
raise InvalidTokenException("Authorization header required")
return _validate_user_token(credentials.credentials, db)
user = _validate_user_token(credentials.credentials, db)
return UserContext.from_user(user)
# ============================================================================
@@ -914,7 +965,7 @@ def get_current_user(
def get_user_vendor(
vendor_code: str,
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
db: Session = Depends(get_db),
) -> Vendor:
"""
@@ -976,7 +1027,7 @@ def require_vendor_permission(permission: str):
@router.get("/products")
def list_products(
request: Request,
user: User = Depends(require_vendor_permission(VendorPermissions.PRODUCTS_VIEW.value))
user: UserContext = Depends(require_vendor_permission(VendorPermissions.PRODUCTS_VIEW.value))
):
vendor = request.state.vendor # Vendor is set by this dependency
...
@@ -985,10 +1036,10 @@ def require_vendor_permission(permission: str):
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
@@ -1001,8 +1052,9 @@ def require_vendor_permission(permission: str):
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has permission
if not current_user.has_vendor_permission(vendor.id, permission):
# Check if user has permission (need User model for this)
user_model = _get_user_model(current_user, db)
if not user_model.has_vendor_permission(vendor.id, permission):
raise InsufficientVendorPermissionsException(
required_permission=permission,
vendor_code=vendor.vendor_code,
@@ -1016,8 +1068,8 @@ def require_vendor_permission(permission: str):
def require_vendor_owner(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
"""
Dependency to require vendor owner role.
@@ -1028,13 +1080,13 @@ def require_vendor_owner(
@router.delete("/team/{user_id}")
def remove_team_member(
request: Request,
user: User = Depends(require_vendor_owner)
user: UserContext = Depends(require_vendor_owner)
):
vendor = request.state.vendor # Vendor is set by this dependency
...
"""
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
@@ -1047,7 +1099,9 @@ def require_vendor_owner(
# Store vendor in request state for endpoint use
request.state.vendor = vendor
if not current_user.is_owner_of(vendor.id):
# Need User model for is_owner_of check
user_model = _get_user_model(current_user, db)
if not user_model.is_owner_of(vendor.id):
raise VendorOwnerOnlyException(
operation="team management",
vendor_code=vendor.vendor_code,
@@ -1067,7 +1121,7 @@ def require_any_vendor_permission(*permissions: str):
@router.get("/dashboard")
def dashboard(
request: Request,
user: User = Depends(require_any_vendor_permission(
user: UserContext = Depends(require_any_vendor_permission(
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.REPORTS_VIEW.value
))
@@ -1079,10 +1133,10 @@ def require_any_vendor_permission(*permissions: str):
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
@@ -1095,9 +1149,10 @@ def require_any_vendor_permission(*permissions: str):
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has ANY of the required permissions
# Check if user has ANY of the required permissions (need User model)
user_model = _get_user_model(current_user, db)
has_permission = any(
current_user.has_vendor_permission(vendor.id, perm) for perm in permissions
user_model.has_vendor_permission(vendor.id, perm) for perm in permissions
)
if not has_permission:
@@ -1122,7 +1177,7 @@ def require_all_vendor_permissions(*permissions: str):
@router.post("/products/bulk-delete")
def bulk_delete_products(
request: Request,
user: User = Depends(require_all_vendor_permissions(
user: UserContext = Depends(require_all_vendor_permissions(
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.PRODUCTS_DELETE.value
))
@@ -1134,10 +1189,10 @@ def require_all_vendor_permissions(*permissions: str):
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> UserContext:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
if not current_user.token_vendor_id:
raise InvalidTokenException(
"Token missing vendor information. Please login again."
)
@@ -1150,11 +1205,12 @@ def require_all_vendor_permissions(*permissions: str):
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Check if user has ALL required permissions
# Check if user has ALL required permissions (need User model)
user_model = _get_user_model(current_user, db)
missing_permissions = [
perm
for perm in permissions
if not current_user.has_vendor_permission(vendor.id, perm)
if not user_model.has_vendor_permission(vendor.id, perm)
]
if missing_permissions:
@@ -1171,7 +1227,7 @@ def require_all_vendor_permissions(*permissions: str):
def get_user_permissions(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
current_user: UserContext = Depends(get_current_vendor_from_cookie_or_header),
) -> list:
"""
Get all permissions for current user in current vendor.
@@ -1182,7 +1238,7 @@ def get_user_permissions(
Returns empty list if no vendor context in token.
"""
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
if not current_user.token_vendor_id:
return []
vendor_id = current_user.token_vendor_id
@@ -1193,14 +1249,17 @@ def get_user_permissions(
# Store vendor in request state for endpoint use
request.state.vendor = vendor
# Need User model for ownership and membership checks
user_model = _get_user_model(current_user, db)
# If owner, return all permissions
if current_user.is_owner_of(vendor.id):
if user_model.is_owner_of(vendor.id):
from app.core.permissions import VendorPermissions
return [p.value for p in VendorPermissions]
# Get permissions from vendor membership
for vm in current_user.vendor_memberships:
for vm in user_model.vendor_memberships:
if vm.vendor_id == vendor.id and vm.is_active:
return vm.get_all_permissions()
@@ -1217,7 +1276,7 @@ def get_current_admin_optional(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User | None:
) -> UserContext | None:
"""
Get current admin user from admin_token cookie or Authorization header.
@@ -1235,7 +1294,7 @@ def get_current_admin_optional(
db: Database session
Returns:
User: Authenticated admin user if valid token exists
UserContext: Authenticated admin user context if valid token exists
None: If no token, invalid token, or user is not admin
"""
token, source = _get_token_from_request(
@@ -1251,7 +1310,7 @@ def get_current_admin_optional(
# Verify user is admin
if user.role == "admin":
return user
return UserContext.from_user(user, include_vendor_context=False)
except Exception:
# Invalid token or other error
pass
@@ -1264,7 +1323,7 @@ def get_current_vendor_optional(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User | None:
) -> UserContext | None:
"""
Get current vendor user from vendor_token cookie or Authorization header.
@@ -1282,7 +1341,7 @@ def get_current_vendor_optional(
db: Database session
Returns:
User: Authenticated vendor user if valid token exists
UserContext: Authenticated vendor user context if valid token exists
None: If no token, invalid token, or user is not vendor
"""
token, source = _get_token_from_request(
@@ -1298,7 +1357,7 @@ def get_current_vendor_optional(
# Verify user is vendor
if user.role == "vendor":
return user
return UserContext.from_user(user)
except Exception:
# Invalid token or other error
pass