Files
orion/tests/unit/middleware/test_auth.py
Samir Boulahtit d7a0ff8818 refactor: complete module-driven architecture migration
This commit completes the migration to a fully module-driven architecture:

## Models Migration
- Moved all domain models from models/database/ to their respective modules:
  - tenancy: User, Admin, Vendor, Company, Platform, VendorDomain, etc.
  - cms: MediaFile, VendorTheme
  - messaging: Email, VendorEmailSettings, VendorEmailTemplate
  - core: AdminMenuConfig
- models/database/ now only contains Base and TimestampMixin (infrastructure)

## Schemas Migration
- Moved all domain schemas from models/schema/ to their respective modules:
  - tenancy: company, vendor, admin, team, vendor_domain
  - cms: media, image, vendor_theme
  - messaging: email
- models/schema/ now only contains base.py and auth.py (infrastructure)

## Routes Migration
- Moved admin routes from app/api/v1/admin/ to modules:
  - menu_config.py -> core module
  - modules.py -> tenancy module
  - module_config.py -> tenancy module
- app/api/v1/admin/ now only aggregates auto-discovered module routes

## Menu System
- Implemented module-driven menu system with MenuDiscoveryService
- Extended FrontendType enum: PLATFORM, ADMIN, VENDOR, STOREFRONT
- Added MenuItemDefinition and MenuSectionDefinition dataclasses
- Each module now defines its own menu items in definition.py
- MenuService integrates with MenuDiscoveryService for template rendering

## Documentation
- Updated docs/architecture/models-structure.md
- Updated docs/architecture/menu-management.md
- Updated architecture validation rules for new exceptions

## Architecture Validation
- Updated MOD-019 rule to allow base.py in models/schema/
- Created core module exceptions.py and schemas/ directory
- All validation errors resolved (only warnings remain)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:02:56 +01:00

758 lines
25 KiB
Python

# tests/unit/middleware/test_auth.py
"""
Comprehensive unit tests for AuthManager.
Tests cover:
- Password hashing and verification
- JWT token creation and validation
- User authentication
- Token expiration handling
- Role-based access control
- Admin/vendor/customer permission checks
- Error handling and edge cases
"""
from datetime import UTC, datetime, timedelta
from unittest.mock import Mock, patch
import pytest
from fastapi import HTTPException
from jose import jwt
from app.modules.tenancy.exceptions import (
AdminRequiredException,
InsufficientPermissionsException,
InvalidCredentialsException,
InvalidTokenException,
TokenExpiredException,
UserNotActiveException,
)
from middleware.auth import AuthManager
from app.modules.tenancy.models import User
@pytest.mark.unit
@pytest.mark.auth
class TestPasswordHashing:
"""Test suite for password hashing functionality."""
def test_hash_password(self):
"""Test password hashing creates different hash for each call."""
auth_manager = AuthManager()
password = "test_password_123"
hash1 = auth_manager.hash_password(password)
hash2 = auth_manager.hash_password(password)
# Hashes should be different due to salt
assert hash1 != hash2
# Both should be valid bcrypt hashes (start with $2b$)
assert hash1.startswith("$2b$")
assert hash2.startswith("$2b$")
def test_verify_password_correct(self):
"""Test password verification with correct password."""
auth_manager = AuthManager()
password = "test_password_123"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) is True
def test_verify_password_incorrect(self):
"""Test password verification with incorrect password."""
auth_manager = AuthManager()
password = "test_password_123"
wrong_password = "wrong_password_456"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(wrong_password, hashed) is False
def test_verify_password_empty(self):
"""Test password verification with empty password."""
auth_manager = AuthManager()
password = "test_password_123"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password("", hashed) is False
def test_hash_password_special_characters(self):
"""Test hashing password with special characters."""
auth_manager = AuthManager()
password = "P@ssw0rd!#$%^&*()_+-=[]{}|;:,.<>?"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) is True
def test_hash_password_unicode(self):
"""Test hashing password with unicode characters."""
auth_manager = AuthManager()
password = "パスワード123こんにちは"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) is True
@pytest.mark.unit
@pytest.mark.auth
class TestUserAuthentication:
"""Test suite for user authentication."""
def test_authenticate_user_success_with_username(self):
"""Test successful authentication with username."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.hashed_password = auth_manager.hash_password("password123")
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
result = auth_manager.authenticate_user(mock_db, "testuser", "password123")
assert result is mock_user
def test_authenticate_user_success_with_email(self):
"""Test successful authentication with email."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.hashed_password = auth_manager.hash_password("password123")
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
result = auth_manager.authenticate_user(
mock_db, "test@example.com", "password123"
)
assert result is mock_user
def test_authenticate_user_not_found(self):
"""Test authentication with non-existent user."""
auth_manager = AuthManager()
mock_db = Mock()
mock_db.query.return_value.filter.return_value.first.return_value = None
result = auth_manager.authenticate_user(mock_db, "nonexistent", "password123")
assert result is None
def test_authenticate_user_wrong_password(self):
"""Test authentication with wrong password."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.hashed_password = auth_manager.hash_password("correctpassword")
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
result = auth_manager.authenticate_user(mock_db, "testuser", "wrongpassword")
assert result is None
@pytest.mark.unit
@pytest.mark.auth
class TestJWTTokenCreation:
"""Test suite for JWT token creation."""
def test_create_access_token_structure(self):
"""Test JWT token creation returns correct structure."""
auth_manager = AuthManager()
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token_data = auth_manager.create_access_token(mock_user)
assert "access_token" in token_data
assert "token_type" in token_data
assert "expires_in" in token_data
assert token_data["token_type"] == "bearer"
assert isinstance(token_data["expires_in"], int)
assert token_data["expires_in"] == auth_manager.token_expire_minutes * 60
def test_create_access_token_payload(self):
"""Test JWT token contains correct payload."""
auth_manager = AuthManager()
mock_user = Mock(spec=User)
mock_user.id = 42
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "vendor"
token_data = auth_manager.create_access_token(mock_user)
token = token_data["access_token"]
# Decode without verification to check payload
payload = jwt.decode(
token, auth_manager.secret_key, algorithms=[auth_manager.algorithm]
)
assert payload["sub"] == "42"
assert payload["username"] == "testuser"
assert payload["email"] == "test@example.com"
assert payload["role"] == "vendor"
assert "exp" in payload
assert "iat" in payload
def test_create_access_token_different_users(self):
"""Test tokens are different for different users."""
auth_manager = AuthManager()
user1 = Mock(
spec=User, id=1, username="user1", email="user1@test.com", role="customer"
)
user2 = Mock(
spec=User, id=2, username="user2", email="user2@test.com", role="vendor"
)
token1 = auth_manager.create_access_token(user1)["access_token"]
token2 = auth_manager.create_access_token(user2)["access_token"]
assert token1 != token2
def test_create_access_token_admin_role(self):
"""Test token creation for admin user."""
auth_manager = AuthManager()
admin_user = Mock(spec=User)
admin_user.id = 1
admin_user.username = "admin"
admin_user.email = "admin@example.com"
admin_user.role = "admin"
token_data = auth_manager.create_access_token(admin_user)
payload = jwt.decode(
token_data["access_token"],
auth_manager.secret_key,
algorithms=[auth_manager.algorithm],
)
assert payload["role"] == "admin"
@pytest.mark.unit
@pytest.mark.auth
class TestJWTTokenVerification:
"""Test suite for JWT token verification."""
def test_verify_token_success(self):
"""Test successful token verification."""
auth_manager = AuthManager()
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token_data = auth_manager.create_access_token(mock_user)
token = token_data["access_token"]
result = auth_manager.verify_token(token)
assert result["user_id"] == 1
assert result["username"] == "testuser"
assert result["email"] == "test@example.com"
assert result["role"] == "customer"
def test_verify_token_expired(self):
"""Test token verification with expired token."""
auth_manager = AuthManager()
auth_manager.token_expire_minutes = -1 # Set to negative to force expiration
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token_data = auth_manager.create_access_token(mock_user)
token = token_data["access_token"]
# Reset to normal
auth_manager.token_expire_minutes = 30
with pytest.raises(TokenExpiredException):
auth_manager.verify_token(token)
def test_verify_token_invalid(self):
"""Test token verification with invalid token."""
auth_manager = AuthManager()
with pytest.raises(InvalidTokenException):
auth_manager.verify_token("invalid.token.here")
def test_verify_token_tampered(self):
"""Test token verification with tampered token."""
auth_manager = AuthManager()
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token = auth_manager.create_access_token(mock_user)["access_token"]
# Tamper with token
parts = token.split(".")
tampered_token = ".".join([parts[0], parts[1], "tampered"])
with pytest.raises(InvalidTokenException):
auth_manager.verify_token(tampered_token)
def test_verify_token_missing_user_id(self):
"""Test token verification with missing user ID."""
auth_manager = AuthManager()
# Create token without 'sub' field
payload = {
"username": "testuser",
"exp": datetime.now(UTC) + timedelta(minutes=30),
}
token = jwt.encode(
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
)
with pytest.raises(InvalidTokenException) as exc_info:
auth_manager.verify_token(token)
assert "missing user identifier" in str(exc_info.value.message)
def test_verify_token_missing_expiration(self):
"""Test token verification with missing expiration."""
auth_manager = AuthManager()
# Create token without 'exp' field
payload = {"sub": "1", "username": "testuser"}
token = jwt.encode(
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
)
with pytest.raises(InvalidTokenException) as exc_info:
auth_manager.verify_token(token)
assert "missing expiration" in str(exc_info.value.message)
def test_verify_token_wrong_algorithm(self):
"""Test token verification with different algorithm."""
auth_manager = AuthManager()
payload = {
"sub": "1",
"username": "testuser",
"exp": datetime.now(UTC) + timedelta(minutes=30),
}
# Create token with different algorithm
token = jwt.encode(payload, auth_manager.secret_key, algorithm="HS512")
with pytest.raises(InvalidTokenException):
auth_manager.verify_token(token)
def test_verify_token_additional_expiration_check(self):
"""Test the additional expiration check after jwt.decode."""
auth_manager = AuthManager()
# Create a token with expiration in the past
past_time = datetime.now(UTC) - timedelta(minutes=1)
payload = {"sub": "1", "username": "testuser", "exp": past_time.timestamp()}
token = jwt.encode(
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
)
# Mock jwt.decode to bypass its expiration check and test line 205
with patch("middleware.auth.jwt.decode") as mock_decode:
mock_decode.return_value = payload
with pytest.raises(TokenExpiredException):
auth_manager.verify_token(token)
@pytest.mark.unit
@pytest.mark.auth
class TestGetCurrentUser:
"""Test suite for get_current_user functionality."""
def test_get_current_user_success(self):
"""Test successfully getting current user."""
auth_manager = AuthManager()
mock_db = Mock()
# Create mock user
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
mock_user.is_active = True
# Setup database mock
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
# Create valid token
token_data = auth_manager.create_access_token(mock_user)
# Create mock credentials
mock_credentials = Mock()
mock_credentials.credentials = token_data["access_token"]
result = auth_manager.get_current_user(mock_db, mock_credentials)
assert result is mock_user
def test_get_current_user_not_found(self):
"""Test get_current_user when user doesn't exist in database."""
auth_manager = AuthManager()
mock_db = Mock()
# Setup database to return None
mock_db.query.return_value.filter.return_value.first.return_value = None
# Create mock user for token
mock_user = Mock(spec=User)
mock_user.id = 999
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token_data = auth_manager.create_access_token(mock_user)
mock_credentials = Mock()
mock_credentials.credentials = token_data["access_token"]
with pytest.raises(InvalidCredentialsException):
auth_manager.get_current_user(mock_db, mock_credentials)
def test_get_current_user_inactive(self):
"""Test get_current_user with inactive user."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
mock_user.is_active = False # Inactive user
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
token_data = auth_manager.create_access_token(mock_user)
mock_credentials = Mock()
mock_credentials.credentials = token_data["access_token"]
with pytest.raises(UserNotActiveException):
auth_manager.get_current_user(mock_db, mock_credentials)
@pytest.mark.unit
@pytest.mark.auth
class TestRoleRequirements:
"""Test suite for role-based access control."""
def test_require_admin_success(self):
"""Test require_admin with admin user."""
auth_manager = AuthManager()
admin_user = Mock(spec=User)
admin_user.role = "admin"
result = auth_manager.require_admin(admin_user)
assert result is admin_user
def test_require_admin_failure(self):
"""Test require_admin with non-admin user."""
auth_manager = AuthManager()
customer_user = Mock(spec=User)
customer_user.role = "customer"
with pytest.raises(AdminRequiredException):
auth_manager.require_admin(customer_user)
def test_require_vendor_with_vendor_role(self):
"""Test require_vendor with vendor user."""
auth_manager = AuthManager()
vendor_user = Mock(spec=User)
vendor_user.role = "vendor"
result = auth_manager.require_vendor(vendor_user)
assert result is vendor_user
def test_require_vendor_with_admin_role(self):
"""Test require_vendor with admin user (admin can access vendor areas)."""
auth_manager = AuthManager()
admin_user = Mock(spec=User)
admin_user.role = "admin"
result = auth_manager.require_vendor(admin_user)
assert result is admin_user
def test_require_vendor_failure(self):
"""Test require_vendor with customer user."""
auth_manager = AuthManager()
customer_user = Mock(spec=User)
customer_user.role = "customer"
with pytest.raises(InsufficientPermissionsException) as exc_info:
auth_manager.require_vendor(customer_user)
assert exc_info.value.details.get("required_permission") == "vendor"
def test_require_customer_with_customer_role(self):
"""Test require_customer with customer user."""
auth_manager = AuthManager()
customer_user = Mock(spec=User)
customer_user.role = "customer"
result = auth_manager.require_customer(customer_user)
assert result is customer_user
def test_require_customer_with_admin_role(self):
"""Test require_customer with admin user (admin can access customer areas)."""
auth_manager = AuthManager()
admin_user = Mock(spec=User)
admin_user.role = "admin"
result = auth_manager.require_customer(admin_user)
assert result is admin_user
def test_require_customer_failure(self):
"""Test require_customer with vendor user."""
auth_manager = AuthManager()
vendor_user = Mock(spec=User)
vendor_user.role = "vendor"
with pytest.raises(InsufficientPermissionsException) as exc_info:
auth_manager.require_customer(vendor_user)
assert exc_info.value.details.get("required_permission") == "customer"
@pytest.mark.unit
@pytest.mark.auth
class TestCreateDefaultAdminUser:
"""Test suite for default admin user creation."""
def test_create_default_admin_user_first_time(self):
"""Test creating default admin user when none exists."""
auth_manager = AuthManager()
mock_db = Mock()
# No existing admin user
mock_db.query.return_value.filter.return_value.first.return_value = None
result = auth_manager.create_default_admin_user(mock_db)
# Verify admin user was created
mock_db.add.assert_called_once()
mock_db.commit.assert_called_once()
mock_db.refresh.assert_called_once()
# Verify the created user
created_user = mock_db.add.call_args[0][0]
assert created_user.username == "admin"
assert created_user.email == "admin@example.com"
assert created_user.role == "admin"
assert created_user.is_active is True
assert auth_manager.verify_password("admin123", created_user.hashed_password)
def test_create_default_admin_user_already_exists(self):
"""Test creating default admin user when one already exists."""
auth_manager = AuthManager()
mock_db = Mock()
# Existing admin user
existing_admin = Mock(spec=User)
mock_db.query.return_value.filter.return_value.first.return_value = (
existing_admin
)
result = auth_manager.create_default_admin_user(mock_db)
# Should not create new user
mock_db.add.assert_not_called()
mock_db.commit.assert_not_called()
# Should return existing user
assert result is existing_admin
@pytest.mark.unit
@pytest.mark.auth
class TestAuthManagerConfiguration:
"""Test suite for AuthManager configuration."""
def test_default_configuration(self):
"""Test AuthManager uses default configuration."""
with patch.dict("os.environ", {}, clear=True):
auth_manager = AuthManager()
assert auth_manager.algorithm == "HS256"
assert auth_manager.token_expire_minutes == 30
assert (
auth_manager.secret_key == "your-secret-key-change-in-production-please"
)
def test_custom_configuration(self):
"""Test AuthManager uses environment variables."""
with patch.dict(
"os.environ",
{"JWT_SECRET_KEY": "custom-secret-key", "JWT_EXPIRE_MINUTES": "60"},
):
auth_manager = AuthManager()
assert auth_manager.secret_key == "custom-secret-key"
assert auth_manager.token_expire_minutes == 60
def test_partial_custom_configuration(self):
"""Test AuthManager with partial environment configuration."""
with patch.dict("os.environ", {"JWT_EXPIRE_MINUTES": "120"}, clear=False):
auth_manager = AuthManager()
assert auth_manager.token_expire_minutes == 120
# Secret key should use default or existing env var
assert auth_manager.secret_key is not None
@pytest.mark.unit
@pytest.mark.auth
class TestEdgeCases:
"""Test suite for edge cases and error scenarios."""
def test_verify_password_with_none(self):
"""Test password verification with None values returns False."""
auth_manager = AuthManager()
# None values should return False (safe behavior - None never authenticates)
assert auth_manager.verify_password(None, None) is False
# None password with valid hash
valid_hash = auth_manager.hash_password("test_password")
assert auth_manager.verify_password("password", None) is False
# Note: verify_password(None, valid_hash) raises TypeError from bcrypt
# This edge case is handled by the underlying library
def test_token_with_future_iat(self):
"""Test token with issued_at time in the future."""
auth_manager = AuthManager()
payload = {
"sub": "1",
"username": "testuser",
"iat": datetime.now(UTC) + timedelta(hours=1), # Future time
"exp": datetime.now(UTC) + timedelta(hours=2),
}
token = jwt.encode(
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
)
# Should still verify successfully (JWT doesn't validate iat by default)
result = auth_manager.verify_token(token)
assert result["user_id"] == 1
def test_authenticate_user_case_sensitivity(self):
"""Test that username/email authentication is case-sensitive."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.username = "TestUser"
mock_user.email = "test@example.com"
mock_user.hashed_password = auth_manager.hash_password("password123")
# This will depend on database collation, but generally should be case-sensitive
mock_db.query.return_value.filter.return_value.first.return_value = None
result = auth_manager.authenticate_user(mock_db, "testuser", "password123")
# Result depends on how the filter is implemented
# This test documents the expected behavior
assert result is None or result is mock_user
def test_verify_token_unexpected_exception(self):
"""Test generic exception handler in verify_token."""
auth_manager = AuthManager()
# Create a valid token with a mock user
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "test"
mock_user.email = "test@example.com"
mock_user.role = "user"
token_data = auth_manager.create_access_token(mock_user)
token = token_data["access_token"]
# Mock jose.jwt.decode to raise an unexpected exception
with patch(
"middleware.auth.jwt.decode", side_effect=RuntimeError("Unexpected error")
):
with pytest.raises(InvalidTokenException) as exc_info:
auth_manager.verify_token(token)
# The message should be "Authentication failed" from the generic except handler
assert "Authentication failed" in str(exc_info.value.message)
def test_require_role_decorator_wrapper_functionality(self):
"""Test the require_role decorator wrapper execution."""
auth_manager = AuthManager()
# Create a test function decorated with require_role
@auth_manager.require_role("admin")
def test_function(current_user, additional_arg=None):
return {"user": current_user.username, "arg": additional_arg}
# Test successful case - user has required role
admin_user = Mock(spec=User)
admin_user.role = "admin"
admin_user.username = "admin_user"
result = test_function(admin_user, additional_arg="test_value")
assert result["user"] == "admin_user"
assert result["arg"] == "test_value"
def test_require_role_decorator_blocks_wrong_role(self):
"""Test that require_role decorator blocks users with wrong role."""
auth_manager = AuthManager()
@auth_manager.require_role("admin")
def admin_only_function(current_user):
return {"status": "success"}
# Test with user that has wrong role
regular_user = Mock(spec=User)
regular_user.role = "user"
with pytest.raises(HTTPException) as exc_info:
admin_only_function(regular_user)
assert exc_info.value.status_code == 403
assert "Required role 'admin' not found" in exc_info.value.detail