# tests/unit/middleware/test_auth.py """ Comprehensive unit tests for AuthManager. Tests cover: - Password hashing and verification - JWT token creation and validation - User authentication - Token expiration handling - Role-based access control - Admin/vendor/customer permission checks - Error handling and edge cases """ import pytest from unittest.mock import Mock, MagicMock, patch from datetime import datetime, timedelta, timezone from jose import jwt from middleware.auth import AuthManager from app.exceptions import ( InvalidTokenException, TokenExpiredException, UserNotActiveException, InvalidCredentialsException, AdminRequiredException, InsufficientPermissionsException, ) from models.database.user import User @pytest.mark.unit @pytest.mark.auth class TestPasswordHashing: """Test suite for password hashing functionality.""" def test_hash_password(self): """Test password hashing creates different hash for each call.""" auth_manager = AuthManager() password = "test_password_123" hash1 = auth_manager.hash_password(password) hash2 = auth_manager.hash_password(password) # Hashes should be different due to salt assert hash1 != hash2 # Both should be valid bcrypt hashes (start with $2b$) assert hash1.startswith("$2b$") assert hash2.startswith("$2b$") def test_verify_password_correct(self): """Test password verification with correct password.""" auth_manager = AuthManager() password = "test_password_123" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(password, hashed) is True def test_verify_password_incorrect(self): """Test password verification with incorrect password.""" auth_manager = AuthManager() password = "test_password_123" wrong_password = "wrong_password_456" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(wrong_password, hashed) is False def test_verify_password_empty(self): """Test password verification with empty password.""" auth_manager = AuthManager() password = "test_password_123" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password("", hashed) is False def test_hash_password_special_characters(self): """Test hashing password with special characters.""" auth_manager = AuthManager() password = "P@ssw0rd!#$%^&*()_+-=[]{}|;:,.<>?" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(password, hashed) is True def test_hash_password_unicode(self): """Test hashing password with unicode characters.""" auth_manager = AuthManager() password = "パスワード123こんにちは" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(password, hashed) is True @pytest.mark.unit @pytest.mark.auth class TestUserAuthentication: """Test suite for user authentication.""" def test_authenticate_user_success_with_username(self): """Test successful authentication with username.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.hashed_password = auth_manager.hash_password("password123") mock_db.query.return_value.filter.return_value.first.return_value = mock_user result = auth_manager.authenticate_user(mock_db, "testuser", "password123") assert result is mock_user def test_authenticate_user_success_with_email(self): """Test successful authentication with email.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.hashed_password = auth_manager.hash_password("password123") mock_db.query.return_value.filter.return_value.first.return_value = mock_user result = auth_manager.authenticate_user(mock_db, "test@example.com", "password123") assert result is mock_user def test_authenticate_user_not_found(self): """Test authentication with non-existent user.""" auth_manager = AuthManager() mock_db = Mock() mock_db.query.return_value.filter.return_value.first.return_value = None result = auth_manager.authenticate_user(mock_db, "nonexistent", "password123") assert result is None def test_authenticate_user_wrong_password(self): """Test authentication with wrong password.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.hashed_password = auth_manager.hash_password("correctpassword") mock_db.query.return_value.filter.return_value.first.return_value = mock_user result = auth_manager.authenticate_user(mock_db, "testuser", "wrongpassword") assert result is None @pytest.mark.unit @pytest.mark.auth class TestJWTTokenCreation: """Test suite for JWT token creation.""" def test_create_access_token_structure(self): """Test JWT token creation returns correct structure.""" auth_manager = AuthManager() mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token_data = auth_manager.create_access_token(mock_user) assert "access_token" in token_data assert "token_type" in token_data assert "expires_in" in token_data assert token_data["token_type"] == "bearer" assert isinstance(token_data["expires_in"], int) assert token_data["expires_in"] == auth_manager.token_expire_minutes * 60 def test_create_access_token_payload(self): """Test JWT token contains correct payload.""" auth_manager = AuthManager() mock_user = Mock(spec=User) mock_user.id = 42 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "vendor" token_data = auth_manager.create_access_token(mock_user) token = token_data["access_token"] # Decode without verification to check payload payload = jwt.decode(token, auth_manager.secret_key, algorithms=[auth_manager.algorithm]) assert payload["sub"] == "42" assert payload["username"] == "testuser" assert payload["email"] == "test@example.com" assert payload["role"] == "vendor" assert "exp" in payload assert "iat" in payload def test_create_access_token_different_users(self): """Test tokens are different for different users.""" auth_manager = AuthManager() user1 = Mock(spec=User, id=1, username="user1", email="user1@test.com", role="customer") user2 = Mock(spec=User, id=2, username="user2", email="user2@test.com", role="vendor") token1 = auth_manager.create_access_token(user1)["access_token"] token2 = auth_manager.create_access_token(user2)["access_token"] assert token1 != token2 def test_create_access_token_admin_role(self): """Test token creation for admin user.""" auth_manager = AuthManager() admin_user = Mock(spec=User) admin_user.id = 1 admin_user.username = "admin" admin_user.email = "admin@example.com" admin_user.role = "admin" token_data = auth_manager.create_access_token(admin_user) payload = jwt.decode( token_data["access_token"], auth_manager.secret_key, algorithms=[auth_manager.algorithm] ) assert payload["role"] == "admin" @pytest.mark.unit @pytest.mark.auth class TestJWTTokenVerification: """Test suite for JWT token verification.""" def test_verify_token_success(self): """Test successful token verification.""" auth_manager = AuthManager() mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token_data = auth_manager.create_access_token(mock_user) token = token_data["access_token"] result = auth_manager.verify_token(token) assert result["user_id"] == 1 assert result["username"] == "testuser" assert result["email"] == "test@example.com" assert result["role"] == "customer" def test_verify_token_expired(self): """Test token verification with expired token.""" auth_manager = AuthManager() auth_manager.token_expire_minutes = -1 # Set to negative to force expiration mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token_data = auth_manager.create_access_token(mock_user) token = token_data["access_token"] # Reset to normal auth_manager.token_expire_minutes = 30 with pytest.raises(TokenExpiredException): auth_manager.verify_token(token) def test_verify_token_invalid(self): """Test token verification with invalid token.""" auth_manager = AuthManager() with pytest.raises(InvalidTokenException): auth_manager.verify_token("invalid.token.here") def test_verify_token_tampered(self): """Test token verification with tampered token.""" auth_manager = AuthManager() mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token = auth_manager.create_access_token(mock_user)["access_token"] # Tamper with token parts = token.split(".") tampered_token = ".".join([parts[0], parts[1], "tampered"]) with pytest.raises(InvalidTokenException): auth_manager.verify_token(tampered_token) def test_verify_token_missing_user_id(self): """Test token verification with missing user ID.""" auth_manager = AuthManager() # Create token without 'sub' field payload = { "username": "testuser", "exp": datetime.now(timezone.utc) + timedelta(minutes=30) } token = jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm) with pytest.raises(InvalidTokenException) as exc_info: auth_manager.verify_token(token) assert "missing user identifier" in str(exc_info.value.message) def test_verify_token_missing_expiration(self): """Test token verification with missing expiration.""" auth_manager = AuthManager() # Create token without 'exp' field payload = { "sub": "1", "username": "testuser" } token = jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm) with pytest.raises(InvalidTokenException) as exc_info: auth_manager.verify_token(token) assert "missing expiration" in str(exc_info.value.message) def test_verify_token_wrong_algorithm(self): """Test token verification with different algorithm.""" auth_manager = AuthManager() payload = { "sub": "1", "username": "testuser", "exp": datetime.now(timezone.utc) + timedelta(minutes=30) } # Create token with different algorithm token = jwt.encode(payload, auth_manager.secret_key, algorithm="HS512") with pytest.raises(InvalidTokenException): auth_manager.verify_token(token) @pytest.mark.unit @pytest.mark.auth class TestGetCurrentUser: """Test suite for get_current_user functionality.""" def test_get_current_user_success(self): """Test successfully getting current user.""" auth_manager = AuthManager() mock_db = Mock() # Create mock user mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" mock_user.is_active = True # Setup database mock mock_db.query.return_value.filter.return_value.first.return_value = mock_user # Create valid token token_data = auth_manager.create_access_token(mock_user) # Create mock credentials mock_credentials = Mock() mock_credentials.credentials = token_data["access_token"] result = auth_manager.get_current_user(mock_db, mock_credentials) assert result is mock_user def test_get_current_user_not_found(self): """Test get_current_user when user doesn't exist in database.""" auth_manager = AuthManager() mock_db = Mock() # Setup database to return None mock_db.query.return_value.filter.return_value.first.return_value = None # Create mock user for token mock_user = Mock(spec=User) mock_user.id = 999 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" token_data = auth_manager.create_access_token(mock_user) mock_credentials = Mock() mock_credentials.credentials = token_data["access_token"] with pytest.raises(InvalidCredentialsException): auth_manager.get_current_user(mock_db, mock_credentials) def test_get_current_user_inactive(self): """Test get_current_user with inactive user.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.id = 1 mock_user.username = "testuser" mock_user.email = "test@example.com" mock_user.role = "customer" mock_user.is_active = False # Inactive user mock_db.query.return_value.filter.return_value.first.return_value = mock_user token_data = auth_manager.create_access_token(mock_user) mock_credentials = Mock() mock_credentials.credentials = token_data["access_token"] with pytest.raises(UserNotActiveException): auth_manager.get_current_user(mock_db, mock_credentials) @pytest.mark.unit @pytest.mark.auth class TestRoleRequirements: """Test suite for role-based access control.""" def test_require_admin_success(self): """Test require_admin with admin user.""" auth_manager = AuthManager() admin_user = Mock(spec=User) admin_user.role = "admin" result = auth_manager.require_admin(admin_user) assert result is admin_user def test_require_admin_failure(self): """Test require_admin with non-admin user.""" auth_manager = AuthManager() customer_user = Mock(spec=User) customer_user.role = "customer" with pytest.raises(AdminRequiredException): auth_manager.require_admin(customer_user) def test_require_vendor_with_vendor_role(self): """Test require_vendor with vendor user.""" auth_manager = AuthManager() vendor_user = Mock(spec=User) vendor_user.role = "vendor" result = auth_manager.require_vendor(vendor_user) assert result is vendor_user def test_require_vendor_with_admin_role(self): """Test require_vendor with admin user (admin can access vendor areas).""" auth_manager = AuthManager() admin_user = Mock(spec=User) admin_user.role = "admin" result = auth_manager.require_vendor(admin_user) assert result is admin_user def test_require_vendor_failure(self): """Test require_vendor with customer user.""" auth_manager = AuthManager() customer_user = Mock(spec=User) customer_user.role = "customer" with pytest.raises(InsufficientPermissionsException) as exc_info: auth_manager.require_vendor(customer_user) assert exc_info.value.details.get("required_permission") == "vendor" def test_require_customer_with_customer_role(self): """Test require_customer with customer user.""" auth_manager = AuthManager() customer_user = Mock(spec=User) customer_user.role = "customer" result = auth_manager.require_customer(customer_user) assert result is customer_user def test_require_customer_with_admin_role(self): """Test require_customer with admin user (admin can access customer areas).""" auth_manager = AuthManager() admin_user = Mock(spec=User) admin_user.role = "admin" result = auth_manager.require_customer(admin_user) assert result is admin_user def test_require_customer_failure(self): """Test require_customer with vendor user.""" auth_manager = AuthManager() vendor_user = Mock(spec=User) vendor_user.role = "vendor" with pytest.raises(InsufficientPermissionsException) as exc_info: auth_manager.require_customer(vendor_user) assert exc_info.value.details.get("required_permission") == "customer" @pytest.mark.unit @pytest.mark.auth class TestCreateDefaultAdminUser: """Test suite for default admin user creation.""" def test_create_default_admin_user_first_time(self): """Test creating default admin user when none exists.""" auth_manager = AuthManager() mock_db = Mock() # No existing admin user mock_db.query.return_value.filter.return_value.first.return_value = None result = auth_manager.create_default_admin_user(mock_db) # Verify admin user was created mock_db.add.assert_called_once() mock_db.commit.assert_called_once() mock_db.refresh.assert_called_once() # Verify the created user created_user = mock_db.add.call_args[0][0] assert created_user.username == "admin" assert created_user.email == "admin@example.com" assert created_user.role == "admin" assert created_user.is_active is True assert auth_manager.verify_password("admin123", created_user.hashed_password) def test_create_default_admin_user_already_exists(self): """Test creating default admin user when one already exists.""" auth_manager = AuthManager() mock_db = Mock() # Existing admin user existing_admin = Mock(spec=User) mock_db.query.return_value.filter.return_value.first.return_value = existing_admin result = auth_manager.create_default_admin_user(mock_db) # Should not create new user mock_db.add.assert_not_called() mock_db.commit.assert_not_called() # Should return existing user assert result is existing_admin @pytest.mark.unit @pytest.mark.auth class TestAuthManagerConfiguration: """Test suite for AuthManager configuration.""" def test_default_configuration(self): """Test AuthManager uses default configuration.""" with patch.dict('os.environ', {}, clear=True): auth_manager = AuthManager() assert auth_manager.algorithm == "HS256" assert auth_manager.token_expire_minutes == 30 assert auth_manager.secret_key == "your-secret-key-change-in-production-please" def test_custom_configuration(self): """Test AuthManager uses environment variables.""" with patch.dict('os.environ', { 'JWT_SECRET_KEY': 'custom-secret-key', 'JWT_EXPIRE_MINUTES': '60' }): auth_manager = AuthManager() assert auth_manager.secret_key == "custom-secret-key" assert auth_manager.token_expire_minutes == 60 def test_partial_custom_configuration(self): """Test AuthManager with partial environment configuration.""" with patch.dict('os.environ', { 'JWT_EXPIRE_MINUTES': '120' }, clear=False): auth_manager = AuthManager() assert auth_manager.token_expire_minutes == 120 # Secret key should use default or existing env var assert auth_manager.secret_key is not None @pytest.mark.unit @pytest.mark.auth class TestEdgeCases: """Test suite for edge cases and error scenarios.""" def test_verify_password_with_none(self): """Test password verification with None values returns False.""" auth_manager = AuthManager() # None values should return False (safe behavior - None never authenticates) assert auth_manager.verify_password(None, None) is False # None password with valid hash valid_hash = auth_manager.hash_password("test_password") assert auth_manager.verify_password("password", None) is False # Note: verify_password(None, valid_hash) raises TypeError from bcrypt # This edge case is handled by the underlying library def test_token_with_future_iat(self): """Test token with issued_at time in the future.""" auth_manager = AuthManager() payload = { "sub": "1", "username": "testuser", "iat": datetime.now(timezone.utc) + timedelta(hours=1), # Future time "exp": datetime.now(timezone.utc) + timedelta(hours=2) } token = jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm) # Should still verify successfully (JWT doesn't validate iat by default) result = auth_manager.verify_token(token) assert result["user_id"] == 1 def test_authenticate_user_case_sensitivity(self): """Test that username/email authentication is case-sensitive.""" auth_manager = AuthManager() mock_db = Mock() mock_user = Mock(spec=User) mock_user.username = "TestUser" mock_user.email = "test@example.com" mock_user.hashed_password = auth_manager.hash_password("password123") # This will depend on database collation, but generally should be case-sensitive mock_db.query.return_value.filter.return_value.first.return_value = None result = auth_manager.authenticate_user(mock_db, "testuser", "password123") # Result depends on how the filter is implemented # This test documents the expected behavior assert result is None or result is mock_user