# tests/unit/middleware/test_auth.py """ Comprehensive unit tests for AuthManager. Tests cover: - Password hashing and verification - JWT token creation and validation - User authentication - Token expiration handling - Role-based access control - Admin/vendor/customer permission checks - Error handling and edge cases """ from datetime import UTC, datetime, timedelta from unittest.mock import Mock, patch import pytest from fastapi import HTTPException from jose import jwt from app.exceptions import ( AdminRequiredException, InsufficientPermissionsException, InvalidCredentialsException, InvalidTokenException, TokenExpiredException, UserNotActiveException, ) from middleware.auth import AuthManager from models.database.user import User @pytest.mark.unit @pytest.mark.auth class TestPasswordHashing: """Test suite for password hashing functionality.""" def test_hash_password(self): """Test password hashing creates different hash for each call.""" auth_manager = AuthManager() password = "test_password_123" hash1 = auth_manager.hash_password(password) hash2 = auth_manager.hash_password(password) # Hashes should be different due to salt assert hash1 != hash2 # Both should be valid bcrypt hashes (start with $2b$) assert hash1.startswith("$2b$") assert hash2.startswith("$2b$") def test_verify_password_correct(self): """Test password verification with correct password.""" auth_manager = AuthManager() password = "test_password_123" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(password, hashed) is True def test_verify_password_incorrect(self): """Test password verification with incorrect password.""" auth_manager = AuthManager() password = "test_password_123" wrong_password = "wrong_password_456" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(wrong_password, hashed) is False def test_verify_password_empty(self): """Test password verification with empty password.""" auth_manager = AuthManager() password = "test_password_123" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password("", hashed) is False def test_hash_password_special_characters(self): """Test hashing password with special characters.""" auth_manager = AuthManager() password = "P@ssw0rd!#$%^&*()_+-=[]{}|;:,.<>?" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(password, hashed) is True def test_hash_password_unicode(self): """Test hashing password with unicode characters.""" auth_manager = AuthManager() password = "パスワード123こんにちは" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(password, hashed) is True @pytest.mark.unit @pytest.mark.auth class TestUserAuthentication: """Test suite for user authentication.""" def test_authenticate_user_success_with_username(self): """Test successful authentication with username.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.hashed_password = auth_manager.hash_password("password123") mock_db.query.return_value.filter.return_value.first.return_value = mock_user result = auth_manager.authenticate_user(mock_db, "testuser", "password123") assert result is mock_user def test_authenticate_user_success_with_email(self): """Test successful authentication with email.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.hashed_password = auth_manager.hash_password("password123") mock_db.query.return_value.filter.return_value.first.return_value = mock_user result = auth_manager.authenticate_user( mock_db, "test@example.com", "password123" ) assert result is mock_user def test_authenticate_user_not_found(self): """Test authentication with non-existent user.""" auth_manager = AuthManager() mock_db = Mock() mock_db.query.return_value.filter.return_value.first.return_value = None result = auth_manager.authenticate_user(mock_db, "nonexistent", "password123") assert result is None def test_authenticate_user_wrong_password(self): """Test authentication with wrong password.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.hashed_password = auth_manager.hash_password("correctpassword") mock_db.query.return_value.filter.return_value.first.return_value = mock_user result = auth_manager.authenticate_user(mock_db, "testuser", "wrongpassword") assert result is None @pytest.mark.unit @pytest.mark.auth class TestJWTTokenCreation: """Test suite for JWT token creation.""" def test_create_access_token_structure(self): """Test JWT token creation returns correct structure.""" auth_manager = AuthManager() mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token_data = auth_manager.create_access_token(mock_user) assert "access_token" in token_data assert "token_type" in token_data assert "expires_in" in token_data assert token_data["token_type"] == "bearer" assert isinstance(token_data["expires_in"], int) assert token_data["expires_in"] == auth_manager.token_expire_minutes * 60 def test_create_access_token_payload(self): """Test JWT token contains correct payload.""" auth_manager = AuthManager() mock_user = Mock(spec=User) mock_user.id = 42 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "vendor" token_data = auth_manager.create_access_token(mock_user) token = token_data["access_token"] # Decode without verification to check payload payload = jwt.decode( token, auth_manager.secret_key, algorithms=[auth_manager.algorithm] ) assert payload["sub"] == "42" assert payload["username"] == "testuser" assert payload["email"] == "test@example.com" assert payload["role"] == "vendor" assert "exp" in payload assert "iat" in payload def test_create_access_token_different_users(self): """Test tokens are different for different users.""" auth_manager = AuthManager() user1 = Mock( spec=User, id=1, username="user1", email="user1@test.com", role="customer" ) user2 = Mock( spec=User, id=2, username="user2", email="user2@test.com", role="vendor" ) token1 = auth_manager.create_access_token(user1)["access_token"] token2 = auth_manager.create_access_token(user2)["access_token"] assert token1 != token2 def test_create_access_token_admin_role(self): """Test token creation for admin user.""" auth_manager = AuthManager() admin_user = Mock(spec=User) admin_user.id = 1 admin_user.username = "admin" admin_user.email = "admin@example.com" admin_user.role = "admin" token_data = auth_manager.create_access_token(admin_user) payload = jwt.decode( token_data["access_token"], auth_manager.secret_key, algorithms=[auth_manager.algorithm], ) assert payload["role"] == "admin" @pytest.mark.unit @pytest.mark.auth class TestJWTTokenVerification: """Test suite for JWT token verification.""" def test_verify_token_success(self): """Test successful token verification.""" auth_manager = AuthManager() mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token_data = auth_manager.create_access_token(mock_user) token = token_data["access_token"] result = auth_manager.verify_token(token) assert result["user_id"] == 1 assert result["username"] == "testuser" assert result["email"] == "test@example.com" assert result["role"] == "customer" def test_verify_token_expired(self): """Test token verification with expired token.""" auth_manager = AuthManager() auth_manager.token_expire_minutes = -1 # Set to negative to force expiration mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token_data = auth_manager.create_access_token(mock_user) token = token_data["access_token"] # Reset to normal auth_manager.token_expire_minutes = 30 with pytest.raises(TokenExpiredException): auth_manager.verify_token(token) def test_verify_token_invalid(self): """Test token verification with invalid token.""" auth_manager = AuthManager() with pytest.raises(InvalidTokenException): auth_manager.verify_token("invalid.token.here") def test_verify_token_tampered(self): """Test token verification with tampered token.""" auth_manager = AuthManager() mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token = auth_manager.create_access_token(mock_user)["access_token"] # Tamper with token parts = token.split(".") tampered_token = ".".join([parts[0], parts[1], "tampered"]) with pytest.raises(InvalidTokenException): auth_manager.verify_token(tampered_token) def test_verify_token_missing_user_id(self): """Test token verification with missing user ID.""" auth_manager = AuthManager() # Create token without 'sub' field payload = { "username": "testuser", "exp": datetime.now(UTC) + timedelta(minutes=30), } token = jwt.encode( payload, auth_manager.secret_key, algorithm=auth_manager.algorithm ) with pytest.raises(InvalidTokenException) as exc_info: auth_manager.verify_token(token) assert "missing user identifier" in str(exc_info.value.message) def test_verify_token_missing_expiration(self): """Test token verification with missing expiration.""" auth_manager = AuthManager() # Create token without 'exp' field payload = {"sub": "1", "username": "testuser"} token = jwt.encode( payload, auth_manager.secret_key, algorithm=auth_manager.algorithm ) with pytest.raises(InvalidTokenException) as exc_info: auth_manager.verify_token(token) assert "missing expiration" in str(exc_info.value.message) def test_verify_token_wrong_algorithm(self): """Test token verification with different algorithm.""" auth_manager = AuthManager() payload = { "sub": "1", "username": "testuser", "exp": datetime.now(UTC) + timedelta(minutes=30), } # Create token with different algorithm token = jwt.encode(payload, auth_manager.secret_key, algorithm="HS512") with pytest.raises(InvalidTokenException): auth_manager.verify_token(token) def test_verify_token_additional_expiration_check(self): """Test the additional expiration check after jwt.decode.""" auth_manager = AuthManager() # Create a token with expiration in the past past_time = datetime.now(UTC) - timedelta(minutes=1) payload = {"sub": "1", "username": "testuser", "exp": past_time.timestamp()} token = jwt.encode( payload, auth_manager.secret_key, algorithm=auth_manager.algorithm ) # Mock jwt.decode to bypass its expiration check and test line 205 with patch("middleware.auth.jwt.decode") as mock_decode: mock_decode.return_value = payload with pytest.raises(TokenExpiredException): auth_manager.verify_token(token) @pytest.mark.unit @pytest.mark.auth class TestGetCurrentUser: """Test suite for get_current_user functionality.""" def test_get_current_user_success(self): """Test successfully getting current user.""" auth_manager = AuthManager() mock_db = Mock() # Create mock user mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" mock_user.is_active = True # Setup database mock mock_db.query.return_value.filter.return_value.first.return_value = mock_user # Create valid token token_data = auth_manager.create_access_token(mock_user) # Create mock credentials mock_credentials = Mock() mock_credentials.credentials = token_data["access_token"] result = auth_manager.get_current_user(mock_db, mock_credentials) assert result is mock_user def test_get_current_user_not_found(self): """Test get_current_user when user doesn't exist in database.""" auth_manager = AuthManager() mock_db = Mock() # Setup database to return None mock_db.query.return_value.filter.return_value.first.return_value = None # Create mock user for token mock_user = Mock(spec=User) mock_user.id = 999 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token_data = auth_manager.create_access_token(mock_user) mock_credentials = Mock() mock_credentials.credentials = token_data["access_token"] with pytest.raises(InvalidCredentialsException): auth_manager.get_current_user(mock_db, mock_credentials) def test_get_current_user_inactive(self): """Test get_current_user with inactive user.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" mock_user.is_active = False # Inactive user mock_db.query.return_value.filter.return_value.first.return_value = mock_user token_data = auth_manager.create_access_token(mock_user) mock_credentials = Mock() mock_credentials.credentials = token_data["access_token"] with pytest.raises(UserNotActiveException): auth_manager.get_current_user(mock_db, mock_credentials) @pytest.mark.unit @pytest.mark.auth class TestRoleRequirements: """Test suite for role-based access control.""" def test_require_admin_success(self): """Test require_admin with admin user.""" auth_manager = AuthManager() admin_user = Mock(spec=User) admin_user.role = "admin" result = auth_manager.require_admin(admin_user) assert result is admin_user def test_require_admin_failure(self): """Test require_admin with non-admin user.""" auth_manager = AuthManager() customer_user = Mock(spec=User) customer_user.role = "customer" with pytest.raises(AdminRequiredException): auth_manager.require_admin(customer_user) def test_require_vendor_with_vendor_role(self): """Test require_vendor with vendor user.""" auth_manager = AuthManager() vendor_user = Mock(spec=User) vendor_user.role = "vendor" result = auth_manager.require_vendor(vendor_user) assert result is vendor_user def test_require_vendor_with_admin_role(self): """Test require_vendor with admin user (admin can access vendor areas).""" auth_manager = AuthManager() admin_user = Mock(spec=User) admin_user.role = "admin" result = auth_manager.require_vendor(admin_user) assert result is admin_user def test_require_vendor_failure(self): """Test require_vendor with customer user.""" auth_manager = AuthManager() customer_user = Mock(spec=User) customer_user.role = "customer" with pytest.raises(InsufficientPermissionsException) as exc_info: auth_manager.require_vendor(customer_user) assert exc_info.value.details.get("required_permission") == "vendor" def test_require_customer_with_customer_role(self): """Test require_customer with customer user.""" auth_manager = AuthManager() customer_user = Mock(spec=User) customer_user.role = "customer" result = auth_manager.require_customer(customer_user) assert result is customer_user def test_require_customer_with_admin_role(self): """Test require_customer with admin user (admin can access customer areas).""" auth_manager = AuthManager() admin_user = Mock(spec=User) admin_user.role = "admin" result = auth_manager.require_customer(admin_user) assert result is admin_user def test_require_customer_failure(self): """Test require_customer with vendor user.""" auth_manager = AuthManager() vendor_user = Mock(spec=User) vendor_user.role = "vendor" with pytest.raises(InsufficientPermissionsException) as exc_info: auth_manager.require_customer(vendor_user) assert exc_info.value.details.get("required_permission") == "customer" @pytest.mark.unit @pytest.mark.auth class TestCreateDefaultAdminUser: """Test suite for default admin user creation.""" def test_create_default_admin_user_first_time(self): """Test creating default admin user when none exists.""" auth_manager = AuthManager() mock_db = Mock() # No existing admin user mock_db.query.return_value.filter.return_value.first.return_value = None result = auth_manager.create_default_admin_user(mock_db) # Verify admin user was created mock_db.add.assert_called_once() mock_db.commit.assert_called_once() mock_db.refresh.assert_called_once() # Verify the created user created_user = mock_db.add.call_args[0][0] assert created_user.username == "admin" assert created_user.email == "admin@example.com" assert created_user.role == "admin" assert created_user.is_active is True assert auth_manager.verify_password("admin123", created_user.hashed_password) def test_create_default_admin_user_already_exists(self): """Test creating default admin user when one already exists.""" auth_manager = AuthManager() mock_db = Mock() # Existing admin user existing_admin = Mock(spec=User) mock_db.query.return_value.filter.return_value.first.return_value = ( existing_admin ) result = auth_manager.create_default_admin_user(mock_db) # Should not create new user mock_db.add.assert_not_called() mock_db.commit.assert_not_called() # Should return existing user assert result is existing_admin @pytest.mark.unit @pytest.mark.auth class TestAuthManagerConfiguration: """Test suite for AuthManager configuration.""" def test_default_configuration(self): """Test AuthManager uses default configuration.""" with patch.dict("os.environ", {}, clear=True): auth_manager = AuthManager() assert auth_manager.algorithm == "HS256" assert auth_manager.token_expire_minutes == 30 assert ( auth_manager.secret_key == "your-secret-key-change-in-production-please" ) def test_custom_configuration(self): """Test AuthManager uses environment variables.""" with patch.dict( "os.environ", {"JWT_SECRET_KEY": "custom-secret-key", "JWT_EXPIRE_MINUTES": "60"}, ): auth_manager = AuthManager() assert auth_manager.secret_key == "custom-secret-key" assert auth_manager.token_expire_minutes == 60 def test_partial_custom_configuration(self): """Test AuthManager with partial environment configuration.""" with patch.dict("os.environ", {"JWT_EXPIRE_MINUTES": "120"}, clear=False): auth_manager = AuthManager() assert auth_manager.token_expire_minutes == 120 # Secret key should use default or existing env var assert auth_manager.secret_key is not None @pytest.mark.unit @pytest.mark.auth class TestEdgeCases: """Test suite for edge cases and error scenarios.""" def test_verify_password_with_none(self): """Test password verification with None values returns False.""" auth_manager = AuthManager() # None values should return False (safe behavior - None never authenticates) assert auth_manager.verify_password(None, None) is False # None password with valid hash valid_hash = auth_manager.hash_password("test_password") assert auth_manager.verify_password("password", None) is False # Note: verify_password(None, valid_hash) raises TypeError from bcrypt # This edge case is handled by the underlying library def test_token_with_future_iat(self): """Test token with issued_at time in the future.""" auth_manager = AuthManager() payload = { "sub": "1", "username": "testuser", "iat": datetime.now(UTC) + timedelta(hours=1), # Future time "exp": datetime.now(UTC) + timedelta(hours=2), } token = jwt.encode( payload, auth_manager.secret_key, algorithm=auth_manager.algorithm ) # Should still verify successfully (JWT doesn't validate iat by default) result = auth_manager.verify_token(token) assert result["user_id"] == 1 def test_authenticate_user_case_sensitivity(self): """Test that username/email authentication is case-sensitive.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.username = "TestUser" mock_user.email = "test@example.com" mock_user.hashed_password = auth_manager.hash_password("password123") # This will depend on database collation, but generally should be case-sensitive mock_db.query.return_value.filter.return_value.first.return_value = None result = auth_manager.authenticate_user(mock_db, "testuser", "password123") # Result depends on how the filter is implemented # This test documents the expected behavior assert result is None or result is mock_user def test_verify_token_unexpected_exception(self): """Test generic exception handler in verify_token.""" auth_manager = AuthManager() # Create a valid token with a mock user mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "test" mock_user.email = "test@example.com" mock_user.role = "user" token_data = auth_manager.create_access_token(mock_user) token = token_data["access_token"] # Mock jose.jwt.decode to raise an unexpected exception with patch( "middleware.auth.jwt.decode", side_effect=RuntimeError("Unexpected error") ): with pytest.raises(InvalidTokenException) as exc_info: auth_manager.verify_token(token) # The message should be "Authentication failed" from the generic except handler assert "Authentication failed" in str(exc_info.value.message) def test_require_role_decorator_wrapper_functionality(self): """Test the require_role decorator wrapper execution.""" auth_manager = AuthManager() # Create a test function decorated with require_role @auth_manager.require_role("admin") def test_function(current_user, additional_arg=None): return {"user": current_user.username, "arg": additional_arg} # Test successful case - user has required role admin_user = Mock(spec=User) admin_user.role = "admin" admin_user.username = "admin_user" result = test_function(admin_user, additional_arg="test_value") assert result["user"] == "admin_user" assert result["arg"] == "test_value" def test_require_role_decorator_blocks_wrong_role(self): """Test that require_role decorator blocks users with wrong role.""" auth_manager = AuthManager() @auth_manager.require_role("admin") def admin_only_function(current_user): return {"status": "success"} # Test with user that has wrong role regular_user = Mock(spec=User) regular_user.role = "user" with pytest.raises(HTTPException) as exc_info: admin_only_function(regular_user) assert exc_info.value.status_code == 403 assert "Required role 'admin' not found" in exc_info.value.detail