Files
orion/middleware/auth.py
Samir Boulahtit 1b24269ef1
Some checks failed
CI / ruff (push) Failing after 7s
CI / pytest (push) Failing after 0s
CI / architecture (push) Failing after 8s
CI / dependency-scanning (push) Successful in 27s
CI / audit (push) Successful in 8s
CI / docs (push) Has been skipped
fix(lint): convert custom noqa directives to regular comments
Ruff only accepts standard rule codes (e.g., E712, F401) in noqa
directives. Custom architecture validator codes (SEC-034, SVC-006,
MOD-004, API-007) are now regular comments instead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 23:19:34 +01:00

508 lines
19 KiB
Python

# middleware/auth.py
"""Authentication and Authorization Module.
This module provides JWT-based authentication and role-based access control (RBAC)
for the application. It handles:
- Password hashing and verification using bcrypt
- JWT token creation and validation
- User authentication against the database
- Role-based access control (admin, store, customer)
- Current user extraction from request credentials
The module uses the following technologies:
- Jose for JWT token handling
- Passlib with bcrypt for secure password hashing
- SQLAlchemy for database operations
"""
import logging
import os
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from typing import Any
from fastapi import HTTPException
from fastapi.security import HTTPAuthorizationCredentials
from jose import jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.modules.tenancy.exceptions import (
AdminRequiredException,
InsufficientPermissionsException,
InvalidCredentialsException,
InvalidTokenException,
TokenExpiredException,
UserNotActiveException,
)
from app.modules.tenancy.models import User
logger = logging.getLogger(__name__)
# Password context for bcrypt hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthManager:
"""JWT-based authentication manager with bcrypt password hashing.
This class provides a complete authentication system including:
- User credential verification
- JWT token generation and validation
- Role-based access control
- Password hashing using bcrypt
Attributes:
secret_key (str): Secret key used for JWT encoding/decoding
algorithm (str): Algorithm used for JWT signing (HS256)
token_expire_minutes (int): Token expiration time in minutes
"""
def __init__(self):
"""Initialize the AuthManager with configuration from environment variables.
Reads the following environment variables:
- JWT_SECRET_KEY: Secret key for JWT signing (defaults to development key)
- JWT_EXPIRE_MINUTES: Token expiration time in minutes (defaults to 30)
"""
# Load JWT secret key from environment, with fallback for development
self.secret_key = os.getenv(
"JWT_SECRET_KEY", "your-secret-key-change-in-production-please"
)
# Use HS256 (HMAC with SHA-256) for token signing
self.algorithm = "HS256"
# Configure token expiration time from environment
self.token_expire_minutes = int(os.getenv("JWT_EXPIRE_MINUTES", "30"))
def hash_password(self, password: str) -> str:
"""Hash a plain text password using bcrypt.
Uses bcrypt hashing algorithm with automatic salt generation.
The resulting hash is safe to store in the database.
Args:
password (str): Plain text password to hash
Returns:
str: Bcrypt hashed password string
"""
return pwd_context.hash(password)
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""Verify a plain text password against a bcrypt hash.
Args:
plain_password (str): Plain text password to verify
hashed_password (str): Bcrypt hash to verify against
Returns:
bool: True if password matches hash, False otherwise
"""
return pwd_context.verify(plain_password, hashed_password)
def authenticate_user(
self, db: Session, username: str, password: str
) -> User | None:
"""Authenticate user credentials against the database.
Supports authentication using either username or email address.
Verifies the provided password against the stored bcrypt hash.
Args:
db (Session): SQLAlchemy database session
username (str): Username or email address to authenticate
password (str): Plain text password to verify
Returns:
Optional[User]: User object if authentication succeeds, None otherwise
"""
# Query user by username or email (both are unique identifiers)
user = (
db.query(User)
.filter((User.username == username) | (User.email == username))
.first()
)
# User not found in database
if not user:
return None
# Password verification failed
if not self.verify_password(password, user.hashed_password):
return None
# Authentication successful, return user object
return user
def create_access_token(
self,
user: User,
store_id: int | None = None,
store_code: str | None = None,
store_role: str | None = None,
platform_id: int | None = None,
platform_code: str | None = None,
) -> dict[str, Any]:
"""Create a JWT access token for an authenticated user.
The token includes user identity and role information in the payload.
Token expiration is set based on the configured token_expire_minutes.
Args:
user (User): Authenticated user object
store_id (int, optional): Store ID if logging into store context
store_code (str, optional): Store code if logging into store context
store_role (str, optional): User's role in this store (owner, manager, etc.)
platform_id (int, optional): Platform ID for platform admin context
platform_code (str, optional): Platform code for platform admin context
Returns:
Dict[str, Any]: Dictionary containing:
- access_token (str): The JWT token string
- token_type (str): Token type ("bearer")
- expires_in (int): Token lifetime in seconds
"""
# Calculate token expiration time
expires_delta = timedelta(minutes=self.token_expire_minutes)
expire = datetime.now(UTC) + expires_delta
# Build JWT payload with user information
payload = {
"sub": str(user.id), # Subject: user ID (JWT standard claim)
"username": user.username, # Username for display/logging
"email": user.email, # User email address
"role": user.role, # User role for authorization
"exp": expire, # Expiration time (JWT standard claim)
"iat": datetime.now(UTC), # Issued at time (JWT standard claim)
}
# Include admin-specific information for admin users
if user.is_admin:
payload["is_super_admin"] = user.is_super_admin
# For platform admins, include their accessible platform IDs
if not user.is_super_admin:
accessible = user.get_accessible_platform_ids()
if accessible is not None:
payload["accessible_platforms"] = accessible
# Include platform context for platform admins
if platform_id is not None:
payload["platform_id"] = platform_id
if platform_code is not None:
payload["platform_code"] = platform_code
# Include store information in token if provided (store-specific login)
if store_id is not None:
payload["store_id"] = store_id
if store_code is not None:
payload["store_code"] = store_code
if store_role is not None:
payload["store_role"] = store_role
# Encode the payload into a JWT token
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
# Return token with metadata
return {
"access_token": token,
"token_type": "bearer",
"expires_in": self.token_expire_minutes * 60, # Convert minutes to seconds
}
def verify_token(self, token: str) -> dict[str, Any]:
"""Verify and decode a JWT token, returning the user data.
Validates the token signature, expiration, and required claims.
Ensures the token contains all necessary user identification data.
Args:
token (str): JWT token string to verify
Returns:
Dict[str, Any]: Dictionary containing:
- user_id (int): User's database ID
- username (str): User's username
- email (str): User's email address
- role (str): User's role (defaults to "user" if not present)
- store_id (int, optional): Store ID if token is store-scoped
- store_code (str, optional): Store code if token is store-scoped
- store_role (str, optional): User's role in store if store-scoped
Raises:
TokenExpiredException: If token has expired
InvalidTokenException: If token is malformed, missing required claims,
or signature verification fails
"""
try:
# Decode and verify the JWT token signature
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# Validate token expiration claim exists
exp = payload.get("exp")
if exp is None:
raise InvalidTokenException("Token missing expiration")
# Check if token has expired (additional check beyond jwt.decode)
if datetime.now(UTC) > datetime.fromtimestamp(exp, tz=UTC):
raise TokenExpiredException()
# Validate user identifier claim exists
user_id = payload.get("sub")
if user_id is None:
raise InvalidTokenException("Token missing user identifier")
# Extract and return user data from token payload
user_data = {
"user_id": int(user_id),
"username": payload.get("username"),
"email": payload.get("email"),
"role": payload.get(
"role", "user"
), # Default to "user" role if not specified
}
# Include admin-specific information if present
if "is_super_admin" in payload:
user_data["is_super_admin"] = payload["is_super_admin"]
if "accessible_platforms" in payload:
user_data["accessible_platforms"] = payload["accessible_platforms"]
# Include platform context for platform admins
if "platform_id" in payload:
user_data["platform_id"] = payload["platform_id"]
if "platform_code" in payload:
user_data["platform_code"] = payload["platform_code"]
# Include store information if present in token
if "store_id" in payload:
user_data["store_id"] = payload["store_id"]
if "store_code" in payload:
user_data["store_code"] = payload["store_code"]
if "store_role" in payload:
user_data["store_role"] = payload["store_role"]
return user_data
except jwt.ExpiredSignatureError:
# Token has expired (caught by jwt.decode)
raise TokenExpiredException()
except jwt.JWTError as e:
# Token signature verification failed or token is malformed
logger.error(f"JWT decode error: {e}")
raise InvalidTokenException("Could not validate credentials")
except (InvalidTokenException, TokenExpiredException):
# Re-raise our custom exceptions with their original messages
raise
except Exception as e:
# Catch any other unexpected errors during token verification
logger.error(f"Token verification error: {e}")
raise InvalidTokenException("Authentication failed")
def get_current_user(
self, db: Session, credentials: HTTPAuthorizationCredentials
) -> User:
"""Extract and validate the current authenticated user from request credentials.
Verifies the JWT token from the Authorization header, looks up the user
in the database, and ensures the user account is active.
If the token contains store information, attaches it to the user object
as dynamic attributes (store_id, store_code, store_role).
Args:
db (Session): SQLAlchemy database session
credentials (HTTPAuthorizationCredentials): Bearer token credentials from request
Returns:
User: The authenticated and active user object (with store attrs if in token)
Raises:
InvalidTokenException: If token verification fails
InvalidCredentialsException: If user is not found in database
UserNotActiveException: If user account is inactive
"""
# Verify JWT token and extract user data
user_data = self.verify_token(credentials.credentials)
# Look up user in database by ID from token
user = db.query(User).filter(User.id == user_data["user_id"]).first()
if not user:
raise InvalidCredentialsException("User not found")
# Ensure user account is active
if not user.is_active:
raise UserNotActiveException()
# Attach admin-specific information to user object if present in token
# These become dynamic attributes on the user object for this request
if "is_super_admin" in user_data:
user.token_is_super_admin = user_data["is_super_admin"]
if "accessible_platforms" in user_data:
user.token_accessible_platforms = user_data["accessible_platforms"]
# Attach platform context to user object if present in token
if "platform_id" in user_data:
user.token_platform_id = user_data["platform_id"]
if "platform_code" in user_data:
user.token_platform_code = user_data["platform_code"]
# Attach store information to user object if present in token
if "store_id" in user_data:
user.token_store_id = user_data["store_id"]
if "store_code" in user_data:
user.token_store_code = user_data["store_code"]
if "store_role" in user_data:
user.token_store_role = user_data["store_role"]
return user
def require_role(self, required_role: str) -> Callable:
"""Create a decorator that enforces a specific role requirement.
This method returns a decorator that can be used to protect functions
requiring a specific user role. The decorator validates that the current
user has the exact required role.
Args:
required_role (str): The role name required (e.g., "admin", "store")
Returns:
Callable: Decorator function that enforces role requirement
Raises:
HTTPException: If user doesn't have the required role (403 Forbidden)
Example:
@auth_manager.require_role("admin")
def admin_only_function(current_user: User):
# This will only execute if user has "admin" role
pass
"""
def decorator(func):
"""Decorator that wraps the function with role checking."""
def wrapper(current_user: User, *args, **kwargs):
# Check if current user has the required role
if current_user.role != required_role:
raise HTTPException(
status_code=403,
detail=f"Required role '{required_role}' not found. Current role: '{current_user.role}'",
)
# User has required role, proceed with function execution
return func(current_user, *args, **kwargs)
return wrapper
return decorator
def require_admin(self, current_user: User) -> User:
"""Enforce that the current user has admin role.
Use this as a dependency in FastAPI routes to restrict access to admins only.
Args:
current_user (User): The authenticated user from get_current_user
Returns:
User: The user object if they have admin role
Raises:
AdminRequiredException: If user does not have admin role
"""
# Verify user has admin role
if current_user.role != "admin":
raise AdminRequiredException()
return current_user
def require_store(self, current_user: User) -> User:
"""
Require store role (store or admin).
Stores and admins can access store areas.
Args:
current_user: Current authenticated user
Returns:
User: The user if they have store or admin role
Raises:
InsufficientPermissionsException: If user is not store or admin
"""
# Check if user has store or admin role (admins have full access)
if current_user.role not in ["store", "admin"]:
raise InsufficientPermissionsException(
message="Store access required", required_permission="store"
)
return current_user
def require_customer(self, current_user: User) -> User:
"""
Require customer role (customer or admin).
Customers and admins can access customer account areas.
Args:
current_user: Current authenticated user
Returns:
User: The user if they have customer or admin role
Raises:
InsufficientPermissionsException: If user is not customer or admin
"""
# Check if user has customer or admin role (admins have full access)
if current_user.role not in ["customer", "admin"]:
raise InsufficientPermissionsException(
message="Customer account access required",
required_permission="customer",
)
return current_user
def create_default_admin_user(self, db: Session) -> User:
"""Create a default admin user account if one doesn't already exist.
This is typically used during initial application setup or database seeding.
The default admin account should have its password changed immediately in production.
Default credentials:
- Username: admin
- Password: admin123 (CHANGE IN PRODUCTION!)
- Email: admin@example.com
Args:
db (Session): SQLAlchemy database session
Returns:
User: The admin user object (either existing or newly created)
"""
# Check if admin user already exists
admin_user = db.query(User).filter(User.username == "admin").first()
# Create admin user if it doesn't exist
if not admin_user:
# Hash the default password securely
hashed_password = self.hash_password("admin123")
# Create new admin user with default credentials
# Default admin is a super admin with access to all platforms
admin_user = User(
email="admin@example.com",
username="admin",
hashed_password=hashed_password,
role="admin",
is_active=True,
is_super_admin=True,
)
# Save to database
db.add(admin_user)
db.commit()
db.refresh(admin_user)
# Log creation for audit trail (credentials redacted for security)
logger.info("Default admin user created") # sec-001 sec-021
return admin_user