- Replace black, isort, and flake8 with Ruff (all-in-one linter and formatter) - Add comprehensive pyproject.toml configuration - Simplify Makefile code quality targets - Configure exclusions for venv/.venv in pyproject.toml - Auto-fix 1,359 linting issues across codebase Benefits: - Much faster builds (Ruff is written in Rust) - Single tool replaces multiple tools - More comprehensive rule set (UP, B, C4, SIM, PIE, RET, Q) - All configuration centralized in pyproject.toml - Better import sorting and formatting consistency 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
758 lines
25 KiB
Python
758 lines
25 KiB
Python
# tests/unit/middleware/test_auth.py
|
|
"""
|
|
Comprehensive unit tests for AuthManager.
|
|
|
|
Tests cover:
|
|
- Password hashing and verification
|
|
- JWT token creation and validation
|
|
- User authentication
|
|
- Token expiration handling
|
|
- Role-based access control
|
|
- Admin/vendor/customer permission checks
|
|
- Error handling and edge cases
|
|
"""
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
from unittest.mock import Mock, patch
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
from jose import jwt
|
|
|
|
from app.exceptions import (
|
|
AdminRequiredException,
|
|
InsufficientPermissionsException,
|
|
InvalidCredentialsException,
|
|
InvalidTokenException,
|
|
TokenExpiredException,
|
|
UserNotActiveException,
|
|
)
|
|
from middleware.auth import AuthManager
|
|
from models.database.user import User
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestPasswordHashing:
|
|
"""Test suite for password hashing functionality."""
|
|
|
|
def test_hash_password(self):
|
|
"""Test password hashing creates different hash for each call."""
|
|
auth_manager = AuthManager()
|
|
password = "test_password_123"
|
|
|
|
hash1 = auth_manager.hash_password(password)
|
|
hash2 = auth_manager.hash_password(password)
|
|
|
|
# Hashes should be different due to salt
|
|
assert hash1 != hash2
|
|
# Both should be valid bcrypt hashes (start with $2b$)
|
|
assert hash1.startswith("$2b$")
|
|
assert hash2.startswith("$2b$")
|
|
|
|
def test_verify_password_correct(self):
|
|
"""Test password verification with correct password."""
|
|
auth_manager = AuthManager()
|
|
password = "test_password_123"
|
|
hashed = auth_manager.hash_password(password)
|
|
|
|
assert auth_manager.verify_password(password, hashed) is True
|
|
|
|
def test_verify_password_incorrect(self):
|
|
"""Test password verification with incorrect password."""
|
|
auth_manager = AuthManager()
|
|
password = "test_password_123"
|
|
wrong_password = "wrong_password_456"
|
|
hashed = auth_manager.hash_password(password)
|
|
|
|
assert auth_manager.verify_password(wrong_password, hashed) is False
|
|
|
|
def test_verify_password_empty(self):
|
|
"""Test password verification with empty password."""
|
|
auth_manager = AuthManager()
|
|
password = "test_password_123"
|
|
hashed = auth_manager.hash_password(password)
|
|
|
|
assert auth_manager.verify_password("", hashed) is False
|
|
|
|
def test_hash_password_special_characters(self):
|
|
"""Test hashing password with special characters."""
|
|
auth_manager = AuthManager()
|
|
password = "P@ssw0rd!#$%^&*()_+-=[]{}|;:,.<>?"
|
|
hashed = auth_manager.hash_password(password)
|
|
|
|
assert auth_manager.verify_password(password, hashed) is True
|
|
|
|
def test_hash_password_unicode(self):
|
|
"""Test hashing password with unicode characters."""
|
|
auth_manager = AuthManager()
|
|
password = "パスワード123こんにちは"
|
|
hashed = auth_manager.hash_password(password)
|
|
|
|
assert auth_manager.verify_password(password, hashed) is True
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestUserAuthentication:
|
|
"""Test suite for user authentication."""
|
|
|
|
def test_authenticate_user_success_with_username(self):
|
|
"""Test successful authentication with username."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.hashed_password = auth_manager.hash_password("password123")
|
|
|
|
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
|
|
|
|
result = auth_manager.authenticate_user(mock_db, "testuser", "password123")
|
|
|
|
assert result is mock_user
|
|
|
|
def test_authenticate_user_success_with_email(self):
|
|
"""Test successful authentication with email."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.hashed_password = auth_manager.hash_password("password123")
|
|
|
|
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
|
|
|
|
result = auth_manager.authenticate_user(
|
|
mock_db, "test@example.com", "password123"
|
|
)
|
|
|
|
assert result is mock_user
|
|
|
|
def test_authenticate_user_not_found(self):
|
|
"""Test authentication with non-existent user."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
mock_db.query.return_value.filter.return_value.first.return_value = None
|
|
|
|
result = auth_manager.authenticate_user(mock_db, "nonexistent", "password123")
|
|
|
|
assert result is None
|
|
|
|
def test_authenticate_user_wrong_password(self):
|
|
"""Test authentication with wrong password."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.hashed_password = auth_manager.hash_password("correctpassword")
|
|
|
|
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
|
|
|
|
result = auth_manager.authenticate_user(mock_db, "testuser", "wrongpassword")
|
|
|
|
assert result is None
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestJWTTokenCreation:
|
|
"""Test suite for JWT token creation."""
|
|
|
|
def test_create_access_token_structure(self):
|
|
"""Test JWT token creation returns correct structure."""
|
|
auth_manager = AuthManager()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 1
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "customer"
|
|
|
|
token_data = auth_manager.create_access_token(mock_user)
|
|
|
|
assert "access_token" in token_data
|
|
assert "token_type" in token_data
|
|
assert "expires_in" in token_data
|
|
assert token_data["token_type"] == "bearer"
|
|
assert isinstance(token_data["expires_in"], int)
|
|
assert token_data["expires_in"] == auth_manager.token_expire_minutes * 60
|
|
|
|
def test_create_access_token_payload(self):
|
|
"""Test JWT token contains correct payload."""
|
|
auth_manager = AuthManager()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 42
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "vendor"
|
|
|
|
token_data = auth_manager.create_access_token(mock_user)
|
|
token = token_data["access_token"]
|
|
|
|
# Decode without verification to check payload
|
|
payload = jwt.decode(
|
|
token, auth_manager.secret_key, algorithms=[auth_manager.algorithm]
|
|
)
|
|
|
|
assert payload["sub"] == "42"
|
|
assert payload["username"] == "testuser"
|
|
assert payload["email"] == "test@example.com"
|
|
assert payload["role"] == "vendor"
|
|
assert "exp" in payload
|
|
assert "iat" in payload
|
|
|
|
def test_create_access_token_different_users(self):
|
|
"""Test tokens are different for different users."""
|
|
auth_manager = AuthManager()
|
|
|
|
user1 = Mock(
|
|
spec=User, id=1, username="user1", email="user1@test.com", role="customer"
|
|
)
|
|
user2 = Mock(
|
|
spec=User, id=2, username="user2", email="user2@test.com", role="vendor"
|
|
)
|
|
|
|
token1 = auth_manager.create_access_token(user1)["access_token"]
|
|
token2 = auth_manager.create_access_token(user2)["access_token"]
|
|
|
|
assert token1 != token2
|
|
|
|
def test_create_access_token_admin_role(self):
|
|
"""Test token creation for admin user."""
|
|
auth_manager = AuthManager()
|
|
|
|
admin_user = Mock(spec=User)
|
|
admin_user.id = 1
|
|
admin_user.username = "admin"
|
|
admin_user.email = "admin@example.com"
|
|
admin_user.role = "admin"
|
|
|
|
token_data = auth_manager.create_access_token(admin_user)
|
|
payload = jwt.decode(
|
|
token_data["access_token"],
|
|
auth_manager.secret_key,
|
|
algorithms=[auth_manager.algorithm],
|
|
)
|
|
|
|
assert payload["role"] == "admin"
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestJWTTokenVerification:
|
|
"""Test suite for JWT token verification."""
|
|
|
|
def test_verify_token_success(self):
|
|
"""Test successful token verification."""
|
|
auth_manager = AuthManager()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 1
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "customer"
|
|
|
|
token_data = auth_manager.create_access_token(mock_user)
|
|
token = token_data["access_token"]
|
|
|
|
result = auth_manager.verify_token(token)
|
|
|
|
assert result["user_id"] == 1
|
|
assert result["username"] == "testuser"
|
|
assert result["email"] == "test@example.com"
|
|
assert result["role"] == "customer"
|
|
|
|
def test_verify_token_expired(self):
|
|
"""Test token verification with expired token."""
|
|
auth_manager = AuthManager()
|
|
auth_manager.token_expire_minutes = -1 # Set to negative to force expiration
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 1
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "customer"
|
|
|
|
token_data = auth_manager.create_access_token(mock_user)
|
|
token = token_data["access_token"]
|
|
|
|
# Reset to normal
|
|
auth_manager.token_expire_minutes = 30
|
|
|
|
with pytest.raises(TokenExpiredException):
|
|
auth_manager.verify_token(token)
|
|
|
|
def test_verify_token_invalid(self):
|
|
"""Test token verification with invalid token."""
|
|
auth_manager = AuthManager()
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
auth_manager.verify_token("invalid.token.here")
|
|
|
|
def test_verify_token_tampered(self):
|
|
"""Test token verification with tampered token."""
|
|
auth_manager = AuthManager()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 1
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "customer"
|
|
|
|
token = auth_manager.create_access_token(mock_user)["access_token"]
|
|
|
|
# Tamper with token
|
|
parts = token.split(".")
|
|
tampered_token = ".".join([parts[0], parts[1], "tampered"])
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
auth_manager.verify_token(tampered_token)
|
|
|
|
def test_verify_token_missing_user_id(self):
|
|
"""Test token verification with missing user ID."""
|
|
auth_manager = AuthManager()
|
|
|
|
# Create token without 'sub' field
|
|
payload = {
|
|
"username": "testuser",
|
|
"exp": datetime.now(UTC) + timedelta(minutes=30),
|
|
}
|
|
token = jwt.encode(
|
|
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
|
|
)
|
|
|
|
with pytest.raises(InvalidTokenException) as exc_info:
|
|
auth_manager.verify_token(token)
|
|
|
|
assert "missing user identifier" in str(exc_info.value.message)
|
|
|
|
def test_verify_token_missing_expiration(self):
|
|
"""Test token verification with missing expiration."""
|
|
auth_manager = AuthManager()
|
|
|
|
# Create token without 'exp' field
|
|
payload = {"sub": "1", "username": "testuser"}
|
|
token = jwt.encode(
|
|
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
|
|
)
|
|
|
|
with pytest.raises(InvalidTokenException) as exc_info:
|
|
auth_manager.verify_token(token)
|
|
|
|
assert "missing expiration" in str(exc_info.value.message)
|
|
|
|
def test_verify_token_wrong_algorithm(self):
|
|
"""Test token verification with different algorithm."""
|
|
auth_manager = AuthManager()
|
|
|
|
payload = {
|
|
"sub": "1",
|
|
"username": "testuser",
|
|
"exp": datetime.now(UTC) + timedelta(minutes=30),
|
|
}
|
|
# Create token with different algorithm
|
|
token = jwt.encode(payload, auth_manager.secret_key, algorithm="HS512")
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
auth_manager.verify_token(token)
|
|
|
|
def test_verify_token_additional_expiration_check(self):
|
|
"""Test the additional expiration check after jwt.decode."""
|
|
auth_manager = AuthManager()
|
|
|
|
# Create a token with expiration in the past
|
|
past_time = datetime.now(UTC) - timedelta(minutes=1)
|
|
payload = {"sub": "1", "username": "testuser", "exp": past_time.timestamp()}
|
|
token = jwt.encode(
|
|
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
|
|
)
|
|
|
|
# Mock jwt.decode to bypass its expiration check and test line 205
|
|
with patch("middleware.auth.jwt.decode") as mock_decode:
|
|
mock_decode.return_value = payload
|
|
|
|
with pytest.raises(TokenExpiredException):
|
|
auth_manager.verify_token(token)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentUser:
|
|
"""Test suite for get_current_user functionality."""
|
|
|
|
def test_get_current_user_success(self):
|
|
"""Test successfully getting current user."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
# Create mock user
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 1
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "customer"
|
|
mock_user.is_active = True
|
|
|
|
# Setup database mock
|
|
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
|
|
|
|
# Create valid token
|
|
token_data = auth_manager.create_access_token(mock_user)
|
|
|
|
# Create mock credentials
|
|
mock_credentials = Mock()
|
|
mock_credentials.credentials = token_data["access_token"]
|
|
|
|
result = auth_manager.get_current_user(mock_db, mock_credentials)
|
|
|
|
assert result is mock_user
|
|
|
|
def test_get_current_user_not_found(self):
|
|
"""Test get_current_user when user doesn't exist in database."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
# Setup database to return None
|
|
mock_db.query.return_value.filter.return_value.first.return_value = None
|
|
|
|
# Create mock user for token
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 999
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "customer"
|
|
|
|
token_data = auth_manager.create_access_token(mock_user)
|
|
|
|
mock_credentials = Mock()
|
|
mock_credentials.credentials = token_data["access_token"]
|
|
|
|
with pytest.raises(InvalidCredentialsException):
|
|
auth_manager.get_current_user(mock_db, mock_credentials)
|
|
|
|
def test_get_current_user_inactive(self):
|
|
"""Test get_current_user with inactive user."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 1
|
|
mock_user.username = "testuser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "customer"
|
|
mock_user.is_active = False # Inactive user
|
|
|
|
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
|
|
|
|
token_data = auth_manager.create_access_token(mock_user)
|
|
|
|
mock_credentials = Mock()
|
|
mock_credentials.credentials = token_data["access_token"]
|
|
|
|
with pytest.raises(UserNotActiveException):
|
|
auth_manager.get_current_user(mock_db, mock_credentials)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestRoleRequirements:
|
|
"""Test suite for role-based access control."""
|
|
|
|
def test_require_admin_success(self):
|
|
"""Test require_admin with admin user."""
|
|
auth_manager = AuthManager()
|
|
|
|
admin_user = Mock(spec=User)
|
|
admin_user.role = "admin"
|
|
|
|
result = auth_manager.require_admin(admin_user)
|
|
|
|
assert result is admin_user
|
|
|
|
def test_require_admin_failure(self):
|
|
"""Test require_admin with non-admin user."""
|
|
auth_manager = AuthManager()
|
|
|
|
customer_user = Mock(spec=User)
|
|
customer_user.role = "customer"
|
|
|
|
with pytest.raises(AdminRequiredException):
|
|
auth_manager.require_admin(customer_user)
|
|
|
|
def test_require_vendor_with_vendor_role(self):
|
|
"""Test require_vendor with vendor user."""
|
|
auth_manager = AuthManager()
|
|
|
|
vendor_user = Mock(spec=User)
|
|
vendor_user.role = "vendor"
|
|
|
|
result = auth_manager.require_vendor(vendor_user)
|
|
|
|
assert result is vendor_user
|
|
|
|
def test_require_vendor_with_admin_role(self):
|
|
"""Test require_vendor with admin user (admin can access vendor areas)."""
|
|
auth_manager = AuthManager()
|
|
|
|
admin_user = Mock(spec=User)
|
|
admin_user.role = "admin"
|
|
|
|
result = auth_manager.require_vendor(admin_user)
|
|
|
|
assert result is admin_user
|
|
|
|
def test_require_vendor_failure(self):
|
|
"""Test require_vendor with customer user."""
|
|
auth_manager = AuthManager()
|
|
|
|
customer_user = Mock(spec=User)
|
|
customer_user.role = "customer"
|
|
|
|
with pytest.raises(InsufficientPermissionsException) as exc_info:
|
|
auth_manager.require_vendor(customer_user)
|
|
|
|
assert exc_info.value.details.get("required_permission") == "vendor"
|
|
|
|
def test_require_customer_with_customer_role(self):
|
|
"""Test require_customer with customer user."""
|
|
auth_manager = AuthManager()
|
|
|
|
customer_user = Mock(spec=User)
|
|
customer_user.role = "customer"
|
|
|
|
result = auth_manager.require_customer(customer_user)
|
|
|
|
assert result is customer_user
|
|
|
|
def test_require_customer_with_admin_role(self):
|
|
"""Test require_customer with admin user (admin can access customer areas)."""
|
|
auth_manager = AuthManager()
|
|
|
|
admin_user = Mock(spec=User)
|
|
admin_user.role = "admin"
|
|
|
|
result = auth_manager.require_customer(admin_user)
|
|
|
|
assert result is admin_user
|
|
|
|
def test_require_customer_failure(self):
|
|
"""Test require_customer with vendor user."""
|
|
auth_manager = AuthManager()
|
|
|
|
vendor_user = Mock(spec=User)
|
|
vendor_user.role = "vendor"
|
|
|
|
with pytest.raises(InsufficientPermissionsException) as exc_info:
|
|
auth_manager.require_customer(vendor_user)
|
|
|
|
assert exc_info.value.details.get("required_permission") == "customer"
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestCreateDefaultAdminUser:
|
|
"""Test suite for default admin user creation."""
|
|
|
|
def test_create_default_admin_user_first_time(self):
|
|
"""Test creating default admin user when none exists."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
# No existing admin user
|
|
mock_db.query.return_value.filter.return_value.first.return_value = None
|
|
|
|
result = auth_manager.create_default_admin_user(mock_db)
|
|
|
|
# Verify admin user was created
|
|
mock_db.add.assert_called_once()
|
|
mock_db.commit.assert_called_once()
|
|
mock_db.refresh.assert_called_once()
|
|
|
|
# Verify the created user
|
|
created_user = mock_db.add.call_args[0][0]
|
|
assert created_user.username == "admin"
|
|
assert created_user.email == "admin@example.com"
|
|
assert created_user.role == "admin"
|
|
assert created_user.is_active is True
|
|
assert auth_manager.verify_password("admin123", created_user.hashed_password)
|
|
|
|
def test_create_default_admin_user_already_exists(self):
|
|
"""Test creating default admin user when one already exists."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
# Existing admin user
|
|
existing_admin = Mock(spec=User)
|
|
mock_db.query.return_value.filter.return_value.first.return_value = (
|
|
existing_admin
|
|
)
|
|
|
|
result = auth_manager.create_default_admin_user(mock_db)
|
|
|
|
# Should not create new user
|
|
mock_db.add.assert_not_called()
|
|
mock_db.commit.assert_not_called()
|
|
|
|
# Should return existing user
|
|
assert result is existing_admin
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestAuthManagerConfiguration:
|
|
"""Test suite for AuthManager configuration."""
|
|
|
|
def test_default_configuration(self):
|
|
"""Test AuthManager uses default configuration."""
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
auth_manager = AuthManager()
|
|
|
|
assert auth_manager.algorithm == "HS256"
|
|
assert auth_manager.token_expire_minutes == 30
|
|
assert (
|
|
auth_manager.secret_key == "your-secret-key-change-in-production-please"
|
|
)
|
|
|
|
def test_custom_configuration(self):
|
|
"""Test AuthManager uses environment variables."""
|
|
with patch.dict(
|
|
"os.environ",
|
|
{"JWT_SECRET_KEY": "custom-secret-key", "JWT_EXPIRE_MINUTES": "60"},
|
|
):
|
|
auth_manager = AuthManager()
|
|
|
|
assert auth_manager.secret_key == "custom-secret-key"
|
|
assert auth_manager.token_expire_minutes == 60
|
|
|
|
def test_partial_custom_configuration(self):
|
|
"""Test AuthManager with partial environment configuration."""
|
|
with patch.dict("os.environ", {"JWT_EXPIRE_MINUTES": "120"}, clear=False):
|
|
auth_manager = AuthManager()
|
|
|
|
assert auth_manager.token_expire_minutes == 120
|
|
# Secret key should use default or existing env var
|
|
assert auth_manager.secret_key is not None
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestEdgeCases:
|
|
"""Test suite for edge cases and error scenarios."""
|
|
|
|
def test_verify_password_with_none(self):
|
|
"""Test password verification with None values returns False."""
|
|
auth_manager = AuthManager()
|
|
|
|
# None values should return False (safe behavior - None never authenticates)
|
|
assert auth_manager.verify_password(None, None) is False
|
|
|
|
# None password with valid hash
|
|
valid_hash = auth_manager.hash_password("test_password")
|
|
assert auth_manager.verify_password("password", None) is False
|
|
|
|
# Note: verify_password(None, valid_hash) raises TypeError from bcrypt
|
|
# This edge case is handled by the underlying library
|
|
|
|
def test_token_with_future_iat(self):
|
|
"""Test token with issued_at time in the future."""
|
|
auth_manager = AuthManager()
|
|
|
|
payload = {
|
|
"sub": "1",
|
|
"username": "testuser",
|
|
"iat": datetime.now(UTC) + timedelta(hours=1), # Future time
|
|
"exp": datetime.now(UTC) + timedelta(hours=2),
|
|
}
|
|
token = jwt.encode(
|
|
payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
|
|
)
|
|
|
|
# Should still verify successfully (JWT doesn't validate iat by default)
|
|
result = auth_manager.verify_token(token)
|
|
assert result["user_id"] == 1
|
|
|
|
def test_authenticate_user_case_sensitivity(self):
|
|
"""Test that username/email authentication is case-sensitive."""
|
|
auth_manager = AuthManager()
|
|
mock_db = Mock()
|
|
|
|
mock_user = Mock(spec=User)
|
|
mock_user.username = "TestUser"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.hashed_password = auth_manager.hash_password("password123")
|
|
|
|
# This will depend on database collation, but generally should be case-sensitive
|
|
mock_db.query.return_value.filter.return_value.first.return_value = None
|
|
|
|
result = auth_manager.authenticate_user(mock_db, "testuser", "password123")
|
|
|
|
# Result depends on how the filter is implemented
|
|
# This test documents the expected behavior
|
|
assert result is None or result is mock_user
|
|
|
|
def test_verify_token_unexpected_exception(self):
|
|
"""Test generic exception handler in verify_token."""
|
|
auth_manager = AuthManager()
|
|
|
|
# Create a valid token with a mock user
|
|
mock_user = Mock(spec=User)
|
|
mock_user.id = 1
|
|
mock_user.username = "test"
|
|
mock_user.email = "test@example.com"
|
|
mock_user.role = "user"
|
|
|
|
token_data = auth_manager.create_access_token(mock_user)
|
|
token = token_data["access_token"]
|
|
|
|
# Mock jose.jwt.decode to raise an unexpected exception
|
|
with patch(
|
|
"middleware.auth.jwt.decode", side_effect=RuntimeError("Unexpected error")
|
|
):
|
|
with pytest.raises(InvalidTokenException) as exc_info:
|
|
auth_manager.verify_token(token)
|
|
|
|
# The message should be "Authentication failed" from the generic except handler
|
|
assert "Authentication failed" in str(exc_info.value.message)
|
|
|
|
def test_require_role_decorator_wrapper_functionality(self):
|
|
"""Test the require_role decorator wrapper execution."""
|
|
auth_manager = AuthManager()
|
|
|
|
# Create a test function decorated with require_role
|
|
@auth_manager.require_role("admin")
|
|
def test_function(current_user, additional_arg=None):
|
|
return {"user": current_user.username, "arg": additional_arg}
|
|
|
|
# Test successful case - user has required role
|
|
admin_user = Mock(spec=User)
|
|
admin_user.role = "admin"
|
|
admin_user.username = "admin_user"
|
|
|
|
result = test_function(admin_user, additional_arg="test_value")
|
|
|
|
assert result["user"] == "admin_user"
|
|
assert result["arg"] == "test_value"
|
|
|
|
def test_require_role_decorator_blocks_wrong_role(self):
|
|
"""Test that require_role decorator blocks users with wrong role."""
|
|
auth_manager = AuthManager()
|
|
|
|
@auth_manager.require_role("admin")
|
|
def admin_only_function(current_user):
|
|
return {"status": "success"}
|
|
|
|
# Test with user that has wrong role
|
|
regular_user = Mock(spec=User)
|
|
regular_user.role = "user"
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
admin_only_function(regular_user)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
assert "Required role 'admin' not found" in exc_info.value.detail
|