Files
orion/tests/unit/middleware/test_auth.py

744 lines
25 KiB
Python

# tests/unit/middleware/test_auth.py
"""
Comprehensive unit tests for AuthManager.
Tests cover:
- Password hashing and verification
- JWT token creation and validation
- User authentication
- Token expiration handling
- Role-based access control
- Admin/vendor/customer permission checks
- Error handling and edge cases
"""
import pytest
from unittest.mock import Mock, MagicMock, patch
from datetime import datetime, timedelta, timezone
from jose import jwt
from fastapi import HTTPException
from middleware.auth import AuthManager
from app.exceptions import (
InvalidTokenException,
TokenExpiredException,
UserNotActiveException,
InvalidCredentialsException,
AdminRequiredException,
InsufficientPermissionsException,
)
from models.database.user import User
@pytest.mark.unit
@pytest.mark.auth
class TestPasswordHashing:
"""Test suite for password hashing functionality."""
def test_hash_password(self):
"""Test password hashing creates different hash for each call."""
auth_manager = AuthManager()
password = "test_password_123"
hash1 = auth_manager.hash_password(password)
hash2 = auth_manager.hash_password(password)
# Hashes should be different due to salt
assert hash1 != hash2
# Both should be valid bcrypt hashes (start with $2b$)
assert hash1.startswith("$2b$")
assert hash2.startswith("$2b$")
def test_verify_password_correct(self):
"""Test password verification with correct password."""
auth_manager = AuthManager()
password = "test_password_123"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) is True
def test_verify_password_incorrect(self):
"""Test password verification with incorrect password."""
auth_manager = AuthManager()
password = "test_password_123"
wrong_password = "wrong_password_456"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(wrong_password, hashed) is False
def test_verify_password_empty(self):
"""Test password verification with empty password."""
auth_manager = AuthManager()
password = "test_password_123"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password("", hashed) is False
def test_hash_password_special_characters(self):
"""Test hashing password with special characters."""
auth_manager = AuthManager()
password = "P@ssw0rd!#$%^&*()_+-=[]{}|;:,.<>?"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) is True
def test_hash_password_unicode(self):
"""Test hashing password with unicode characters."""
auth_manager = AuthManager()
password = "パスワード123こんにちは"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) is True
@pytest.mark.unit
@pytest.mark.auth
class TestUserAuthentication:
"""Test suite for user authentication."""
def test_authenticate_user_success_with_username(self):
"""Test successful authentication with username."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.hashed_password = auth_manager.hash_password("password123")
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
result = auth_manager.authenticate_user(mock_db, "testuser", "password123")
assert result is mock_user
def test_authenticate_user_success_with_email(self):
"""Test successful authentication with email."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.hashed_password = auth_manager.hash_password("password123")
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
result = auth_manager.authenticate_user(mock_db, "test@example.com", "password123")
assert result is mock_user
def test_authenticate_user_not_found(self):
"""Test authentication with non-existent user."""
auth_manager = AuthManager()
mock_db = Mock()
mock_db.query.return_value.filter.return_value.first.return_value = None
result = auth_manager.authenticate_user(mock_db, "nonexistent", "password123")
assert result is None
def test_authenticate_user_wrong_password(self):
"""Test authentication with wrong password."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.hashed_password = auth_manager.hash_password("correctpassword")
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
result = auth_manager.authenticate_user(mock_db, "testuser", "wrongpassword")
assert result is None
@pytest.mark.unit
@pytest.mark.auth
class TestJWTTokenCreation:
"""Test suite for JWT token creation."""
def test_create_access_token_structure(self):
"""Test JWT token creation returns correct structure."""
auth_manager = AuthManager()
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token_data = auth_manager.create_access_token(mock_user)
assert "access_token" in token_data
assert "token_type" in token_data
assert "expires_in" in token_data
assert token_data["token_type"] == "bearer"
assert isinstance(token_data["expires_in"], int)
assert token_data["expires_in"] == auth_manager.token_expire_minutes * 60
def test_create_access_token_payload(self):
"""Test JWT token contains correct payload."""
auth_manager = AuthManager()
mock_user = Mock(spec=User)
mock_user.id = 42
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "vendor"
token_data = auth_manager.create_access_token(mock_user)
token = token_data["access_token"]
# Decode without verification to check payload
payload = jwt.decode(token, auth_manager.secret_key, algorithms=[auth_manager.algorithm])
assert payload["sub"] == "42"
assert payload["username"] == "testuser"
assert payload["email"] == "test@example.com"
assert payload["role"] == "vendor"
assert "exp" in payload
assert "iat" in payload
def test_create_access_token_different_users(self):
"""Test tokens are different for different users."""
auth_manager = AuthManager()
user1 = Mock(spec=User, id=1, username="user1", email="user1@test.com", role="customer")
user2 = Mock(spec=User, id=2, username="user2", email="user2@test.com", role="vendor")
token1 = auth_manager.create_access_token(user1)["access_token"]
token2 = auth_manager.create_access_token(user2)["access_token"]
assert token1 != token2
def test_create_access_token_admin_role(self):
"""Test token creation for admin user."""
auth_manager = AuthManager()
admin_user = Mock(spec=User)
admin_user.id = 1
admin_user.username = "admin"
admin_user.email = "admin@example.com"
admin_user.role = "admin"
token_data = auth_manager.create_access_token(admin_user)
payload = jwt.decode(
token_data["access_token"],
auth_manager.secret_key,
algorithms=[auth_manager.algorithm]
)
assert payload["role"] == "admin"
@pytest.mark.unit
@pytest.mark.auth
class TestJWTTokenVerification:
"""Test suite for JWT token verification."""
def test_verify_token_success(self):
"""Test successful token verification."""
auth_manager = AuthManager()
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token_data = auth_manager.create_access_token(mock_user)
token = token_data["access_token"]
result = auth_manager.verify_token(token)
assert result["user_id"] == 1
assert result["username"] == "testuser"
assert result["email"] == "test@example.com"
assert result["role"] == "customer"
def test_verify_token_expired(self):
"""Test token verification with expired token."""
auth_manager = AuthManager()
auth_manager.token_expire_minutes = -1 # Set to negative to force expiration
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token_data = auth_manager.create_access_token(mock_user)
token = token_data["access_token"]
# Reset to normal
auth_manager.token_expire_minutes = 30
with pytest.raises(TokenExpiredException):
auth_manager.verify_token(token)
def test_verify_token_invalid(self):
"""Test token verification with invalid token."""
auth_manager = AuthManager()
with pytest.raises(InvalidTokenException):
auth_manager.verify_token("invalid.token.here")
def test_verify_token_tampered(self):
"""Test token verification with tampered token."""
auth_manager = AuthManager()
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token = auth_manager.create_access_token(mock_user)["access_token"]
# Tamper with token
parts = token.split(".")
tampered_token = ".".join([parts[0], parts[1], "tampered"])
with pytest.raises(InvalidTokenException):
auth_manager.verify_token(tampered_token)
def test_verify_token_missing_user_id(self):
"""Test token verification with missing user ID."""
auth_manager = AuthManager()
# Create token without 'sub' field
payload = {
"username": "testuser",
"exp": datetime.now(timezone.utc) + timedelta(minutes=30)
}
token = jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
with pytest.raises(InvalidTokenException) as exc_info:
auth_manager.verify_token(token)
assert "missing user identifier" in str(exc_info.value.message)
def test_verify_token_missing_expiration(self):
"""Test token verification with missing expiration."""
auth_manager = AuthManager()
# Create token without 'exp' field
payload = {
"sub": "1",
"username": "testuser"
}
token = jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
with pytest.raises(InvalidTokenException) as exc_info:
auth_manager.verify_token(token)
assert "missing expiration" in str(exc_info.value.message)
def test_verify_token_wrong_algorithm(self):
"""Test token verification with different algorithm."""
auth_manager = AuthManager()
payload = {
"sub": "1",
"username": "testuser",
"exp": datetime.now(timezone.utc) + timedelta(minutes=30)
}
# Create token with different algorithm
token = jwt.encode(payload, auth_manager.secret_key, algorithm="HS512")
with pytest.raises(InvalidTokenException):
auth_manager.verify_token(token)
def test_verify_token_additional_expiration_check(self):
"""Test the additional expiration check after jwt.decode."""
auth_manager = AuthManager()
# Create a token with expiration in the past
past_time = datetime.now(timezone.utc) - timedelta(minutes=1)
payload = {
"sub": "1",
"username": "testuser",
"exp": past_time.timestamp()
}
token = jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
# Mock jwt.decode to bypass its expiration check and test line 205
with patch('middleware.auth.jwt.decode') as mock_decode:
mock_decode.return_value = payload
with pytest.raises(TokenExpiredException):
auth_manager.verify_token(token)
@pytest.mark.unit
@pytest.mark.auth
class TestGetCurrentUser:
"""Test suite for get_current_user functionality."""
def test_get_current_user_success(self):
"""Test successfully getting current user."""
auth_manager = AuthManager()
mock_db = Mock()
# Create mock user
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
mock_user.is_active = True
# Setup database mock
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
# Create valid token
token_data = auth_manager.create_access_token(mock_user)
# Create mock credentials
mock_credentials = Mock()
mock_credentials.credentials = token_data["access_token"]
result = auth_manager.get_current_user(mock_db, mock_credentials)
assert result is mock_user
def test_get_current_user_not_found(self):
"""Test get_current_user when user doesn't exist in database."""
auth_manager = AuthManager()
mock_db = Mock()
# Setup database to return None
mock_db.query.return_value.filter.return_value.first.return_value = None
# Create mock user for token
mock_user = Mock(spec=User)
mock_user.id = 999
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
token_data = auth_manager.create_access_token(mock_user)
mock_credentials = Mock()
mock_credentials.credentials = token_data["access_token"]
with pytest.raises(InvalidCredentialsException):
auth_manager.get_current_user(mock_db, mock_credentials)
def test_get_current_user_inactive(self):
"""Test get_current_user with inactive user."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "testuser"
mock_user.email = "test@example.com"
mock_user.role = "customer"
mock_user.is_active = False # Inactive user
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
token_data = auth_manager.create_access_token(mock_user)
mock_credentials = Mock()
mock_credentials.credentials = token_data["access_token"]
with pytest.raises(UserNotActiveException):
auth_manager.get_current_user(mock_db, mock_credentials)
@pytest.mark.unit
@pytest.mark.auth
class TestRoleRequirements:
"""Test suite for role-based access control."""
def test_require_admin_success(self):
"""Test require_admin with admin user."""
auth_manager = AuthManager()
admin_user = Mock(spec=User)
admin_user.role = "admin"
result = auth_manager.require_admin(admin_user)
assert result is admin_user
def test_require_admin_failure(self):
"""Test require_admin with non-admin user."""
auth_manager = AuthManager()
customer_user = Mock(spec=User)
customer_user.role = "customer"
with pytest.raises(AdminRequiredException):
auth_manager.require_admin(customer_user)
def test_require_vendor_with_vendor_role(self):
"""Test require_vendor with vendor user."""
auth_manager = AuthManager()
vendor_user = Mock(spec=User)
vendor_user.role = "vendor"
result = auth_manager.require_vendor(vendor_user)
assert result is vendor_user
def test_require_vendor_with_admin_role(self):
"""Test require_vendor with admin user (admin can access vendor areas)."""
auth_manager = AuthManager()
admin_user = Mock(spec=User)
admin_user.role = "admin"
result = auth_manager.require_vendor(admin_user)
assert result is admin_user
def test_require_vendor_failure(self):
"""Test require_vendor with customer user."""
auth_manager = AuthManager()
customer_user = Mock(spec=User)
customer_user.role = "customer"
with pytest.raises(InsufficientPermissionsException) as exc_info:
auth_manager.require_vendor(customer_user)
assert exc_info.value.details.get("required_permission") == "vendor"
def test_require_customer_with_customer_role(self):
"""Test require_customer with customer user."""
auth_manager = AuthManager()
customer_user = Mock(spec=User)
customer_user.role = "customer"
result = auth_manager.require_customer(customer_user)
assert result is customer_user
def test_require_customer_with_admin_role(self):
"""Test require_customer with admin user (admin can access customer areas)."""
auth_manager = AuthManager()
admin_user = Mock(spec=User)
admin_user.role = "admin"
result = auth_manager.require_customer(admin_user)
assert result is admin_user
def test_require_customer_failure(self):
"""Test require_customer with vendor user."""
auth_manager = AuthManager()
vendor_user = Mock(spec=User)
vendor_user.role = "vendor"
with pytest.raises(InsufficientPermissionsException) as exc_info:
auth_manager.require_customer(vendor_user)
assert exc_info.value.details.get("required_permission") == "customer"
@pytest.mark.unit
@pytest.mark.auth
class TestCreateDefaultAdminUser:
"""Test suite for default admin user creation."""
def test_create_default_admin_user_first_time(self):
"""Test creating default admin user when none exists."""
auth_manager = AuthManager()
mock_db = Mock()
# No existing admin user
mock_db.query.return_value.filter.return_value.first.return_value = None
result = auth_manager.create_default_admin_user(mock_db)
# Verify admin user was created
mock_db.add.assert_called_once()
mock_db.commit.assert_called_once()
mock_db.refresh.assert_called_once()
# Verify the created user
created_user = mock_db.add.call_args[0][0]
assert created_user.username == "admin"
assert created_user.email == "admin@example.com"
assert created_user.role == "admin"
assert created_user.is_active is True
assert auth_manager.verify_password("admin123", created_user.hashed_password)
def test_create_default_admin_user_already_exists(self):
"""Test creating default admin user when one already exists."""
auth_manager = AuthManager()
mock_db = Mock()
# Existing admin user
existing_admin = Mock(spec=User)
mock_db.query.return_value.filter.return_value.first.return_value = existing_admin
result = auth_manager.create_default_admin_user(mock_db)
# Should not create new user
mock_db.add.assert_not_called()
mock_db.commit.assert_not_called()
# Should return existing user
assert result is existing_admin
@pytest.mark.unit
@pytest.mark.auth
class TestAuthManagerConfiguration:
"""Test suite for AuthManager configuration."""
def test_default_configuration(self):
"""Test AuthManager uses default configuration."""
with patch.dict('os.environ', {}, clear=True):
auth_manager = AuthManager()
assert auth_manager.algorithm == "HS256"
assert auth_manager.token_expire_minutes == 30
assert auth_manager.secret_key == "your-secret-key-change-in-production-please"
def test_custom_configuration(self):
"""Test AuthManager uses environment variables."""
with patch.dict('os.environ', {
'JWT_SECRET_KEY': 'custom-secret-key',
'JWT_EXPIRE_MINUTES': '60'
}):
auth_manager = AuthManager()
assert auth_manager.secret_key == "custom-secret-key"
assert auth_manager.token_expire_minutes == 60
def test_partial_custom_configuration(self):
"""Test AuthManager with partial environment configuration."""
with patch.dict('os.environ', {
'JWT_EXPIRE_MINUTES': '120'
}, clear=False):
auth_manager = AuthManager()
assert auth_manager.token_expire_minutes == 120
# Secret key should use default or existing env var
assert auth_manager.secret_key is not None
@pytest.mark.unit
@pytest.mark.auth
class TestEdgeCases:
"""Test suite for edge cases and error scenarios."""
def test_verify_password_with_none(self):
"""Test password verification with None values returns False."""
auth_manager = AuthManager()
# None values should return False (safe behavior - None never authenticates)
assert auth_manager.verify_password(None, None) is False
# None password with valid hash
valid_hash = auth_manager.hash_password("test_password")
assert auth_manager.verify_password("password", None) is False
# Note: verify_password(None, valid_hash) raises TypeError from bcrypt
# This edge case is handled by the underlying library
def test_token_with_future_iat(self):
"""Test token with issued_at time in the future."""
auth_manager = AuthManager()
payload = {
"sub": "1",
"username": "testuser",
"iat": datetime.now(timezone.utc) + timedelta(hours=1), # Future time
"exp": datetime.now(timezone.utc) + timedelta(hours=2)
}
token = jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
# Should still verify successfully (JWT doesn't validate iat by default)
result = auth_manager.verify_token(token)
assert result["user_id"] == 1
def test_authenticate_user_case_sensitivity(self):
"""Test that username/email authentication is case-sensitive."""
auth_manager = AuthManager()
mock_db = Mock()
mock_user = Mock(spec=User)
mock_user.username = "TestUser"
mock_user.email = "test@example.com"
mock_user.hashed_password = auth_manager.hash_password("password123")
# This will depend on database collation, but generally should be case-sensitive
mock_db.query.return_value.filter.return_value.first.return_value = None
result = auth_manager.authenticate_user(mock_db, "testuser", "password123")
# Result depends on how the filter is implemented
# This test documents the expected behavior
assert result is None or result is mock_user
def test_verify_token_unexpected_exception(self):
"""Test generic exception handler in verify_token."""
auth_manager = AuthManager()
# Create a valid token with a mock user
mock_user = Mock(spec=User)
mock_user.id = 1
mock_user.username = "test"
mock_user.email = "test@example.com"
mock_user.role = "user"
token_data = auth_manager.create_access_token(mock_user)
token = token_data["access_token"]
# Mock jose.jwt.decode to raise an unexpected exception
with patch('middleware.auth.jwt.decode', side_effect=RuntimeError("Unexpected error")):
with pytest.raises(InvalidTokenException) as exc_info:
auth_manager.verify_token(token)
# The message should be "Authentication failed" from the generic except handler
assert "Authentication failed" in str(exc_info.value.message)
def test_require_role_decorator_wrapper_functionality(self):
"""Test the require_role decorator wrapper execution."""
auth_manager = AuthManager()
# Create a test function decorated with require_role
@auth_manager.require_role("admin")
def test_function(current_user, additional_arg=None):
return {"user": current_user.username, "arg": additional_arg}
# Test successful case - user has required role
admin_user = Mock(spec=User)
admin_user.role = "admin"
admin_user.username = "admin_user"
result = test_function(admin_user, additional_arg="test_value")
assert result["user"] == "admin_user"
assert result["arg"] == "test_value"
def test_require_role_decorator_blocks_wrong_role(self):
"""Test that require_role decorator blocks users with wrong role."""
auth_manager = AuthManager()
@auth_manager.require_role("admin")
def admin_only_function(current_user):
return {"status": "success"}
# Test with user that has wrong role
regular_user = Mock(spec=User)
regular_user.role = "user"
with pytest.raises(HTTPException) as exc_info:
admin_only_function(regular_user)
assert exc_info.value.status_code == 403
assert "Required role 'admin' not found" in exc_info.value.detail