Auth service tests update

This commit is contained in:
2025-09-24 21:44:48 +02:00
parent 8b86b3225a
commit f9879126c8
6 changed files with 436 additions and 192 deletions

View File

@@ -393,12 +393,12 @@ curl -X GET "http://localhost:8000/api/v1/marketplace/import-status/1" \
```bash ```bash
# Export all products # Export all products
curl -X GET "http://localhost:8000/api/v1/export-csv" \ curl -X GET "http://localhost:8000/api/v1/product/export-csv" \
-H "Authorization: Bearer YOUR_TOKEN" \ -H "Authorization: Bearer YOUR_TOKEN" \
-o products_export.csv -o products_export.csv
# Export with filters # Export with filters
curl -X GET "http://localhost:8000/api/v1/export-csv?marketplace=Amazon&shop_name=MyShop" \ curl -X GET "http://localhost:8000/api/v1/product/export-csv?marketplace=Amazon&shop_name=MyShop" \
-H "Authorization: Bearer YOUR_TOKEN" \ -H "Authorization: Bearer YOUR_TOKEN" \
-o amazon_products.csv -o amazon_products.csv
``` ```

View File

@@ -3,8 +3,8 @@
Authentication and authorization specific exceptions. Authentication and authorization specific exceptions.
""" """
from typing import Any, Dict, Optional from typing import Optional
from .base import AuthenticationException, AuthorizationException from .base import AuthenticationException, AuthorizationException, ConflictException
class InvalidCredentialsException(AuthenticationException): class InvalidCredentialsException(AuthenticationException):
@@ -76,7 +76,7 @@ class AdminRequiredException(AuthorizationException):
) )
class UserAlreadyExistsException(AuthenticationException): class UserAlreadyExistsException(ConflictException):
"""Raised when trying to register with existing username/email.""" """Raised when trying to register with existing username/email."""
def __init__( def __init__(

View File

@@ -52,10 +52,8 @@ class AuthManager:
"""Verify password against hash.""" """Verify password against hash."""
return pwd_context.verify(plain_password, hashed_password) return pwd_context.verify(plain_password, hashed_password)
def authenticate_user( def authenticate_user(self, db: Session, username: str, password: str) -> Optional[User]:
self, db: Session, username: str, password: str """Authenticate user and return user object if credentials are valid."""
) -> Optional[User]:
"""Authenticate user and return user object if valid."""
user = ( user = (
db.query(User) db.query(User)
.filter((User.username == username) | (User.email == username)) .filter((User.username == username) | (User.email == username))
@@ -65,18 +63,10 @@ class AuthManager:
if not user: if not user:
return None return None
if not user.is_active:
return None
if not self.verify_password(password, user.hashed_password): if not self.verify_password(password, user.hashed_password):
return None return None
# Update last login return user # Return user regardless of active status
user.last_login = datetime.utcnow()
db.commit()
db.refresh(user)
return user
def create_access_token(self, user: User) -> Dict[str, Any]: def create_access_token(self, user: User) -> Dict[str, Any]:
"""Create JWT access token for user.""" """Create JWT access token for user."""

View File

@@ -0,0 +1,338 @@
# tests/integration/api/v1/test_auth_endpoints.py
import pytest
from jose import jwt
from datetime import datetime, timedelta
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.auth
class TestAuthenticationAPI:
def test_register_user_success(self, client, db):
"""Test successful user registration"""
response = client.post(
"/api/v1/auth/register",
json={
"email": "newuser@example.com",
"username": "newuser",
"password": "securepass123",
},
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "newuser@example.com"
assert data["username"] == "newuser"
assert data["role"] == "user"
assert data["is_active"] is True
assert "hashed_password" not in data
def test_register_user_duplicate_email(self, client, test_user):
"""Test registration with duplicate email"""
response = client.post(
"/api/v1/auth/register",
json={
"email": test_user.email, # Same as test_user
"username": "newuser",
"password": "securepass123",
},
)
assert response.status_code == 409
data = response.json()
assert data["error_code"] == "USER_ALREADY_EXISTS"
assert "Email already registered" in data["message"]
assert data["details"]["field"] == "email"
def test_register_user_duplicate_username(self, client, test_user):
"""Test registration with duplicate username"""
response = client.post(
"/api/v1/auth/register",
json={
"email": "new@example.com",
"username": test_user.username, # Same as test_user
"password": "securepass123",
},
)
assert response.status_code == 409
data = response.json()
assert data["error_code"] == "USER_ALREADY_EXISTS"
assert "Username already taken" in data["message"]
assert data["details"]["field"] == "username"
def test_register_user_validation_error(self, client):
"""Test registration with invalid data"""
response = client.post(
"/api/v1/auth/register",
json={
"email": "invalid-email", # Invalid email format
"username": "", # Empty username
"password": "123", # Too short password
},
)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
assert "Request validation failed" in data["message"]
assert "validation_errors" in data["details"]
def test_login_success(self, client, test_user):
"""Test successful login"""
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
assert "expires_in" in data
assert data["user"]["username"] == test_user.username
assert data["user"]["email"] == test_user.email
def test_login_wrong_password(self, client, test_user):
"""Test login with wrong password"""
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "wrongpassword"},
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_CREDENTIALS"
assert "Incorrect username or password" in data["message"]
def test_login_nonexistent_user(self, client, db):
"""Test login with nonexistent user"""
response = client.post(
"/api/v1/auth/login",
json={"username": "nonexistent", "password": "password123"},
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_CREDENTIALS"
assert "Incorrect username or password" in data["message"]
def test_login_inactive_user(self, client, db, test_user):
"""Test login with inactive user account"""
# Manually deactivate the user for this test
original_status = test_user.is_active
test_user.is_active = False
db.commit()
try:
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"},
)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "USER_NOT_ACTIVE"
assert "User account is not active" in data["message"]
finally:
# Restore original status for cleanup
test_user.is_active = original_status
db.commit()
def test_login_validation_error(self, client):
"""Test login with invalid request format"""
response = client.post(
"/api/v1/auth/login",
json={
"username": "", # Empty username
# Missing password field
},
)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
assert "Request validation failed" in data["message"]
def test_get_current_user_info(self, client, auth_headers, test_user):
"""Test getting current user info"""
response = client.get("/api/v1/auth/me", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["username"] == test_user.username
assert data["email"] == test_user.email
assert data["role"] == test_user.role
assert data["is_active"] is True
def test_get_current_user_without_auth(self, client):
"""Test getting current user without authentication"""
response = client.get("/api/v1/auth/me")
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
assert "Authorization header required" in data["message"]
def test_get_current_user_invalid_token(self, client):
"""Test getting current user with invalid token"""
response = client.get(
"/api/v1/auth/me",
headers={"Authorization": "Bearer invalid_token_here"}
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
def test_get_current_user_expired_token(self, client, test_user, auth_manager):
"""Test getting current user with expired token"""
# Create an expired token by mocking the expiration
# Create token that expired 1 hour ago
expired_payload = {
"sub": str(test_user.id),
"username": test_user.username,
"email": test_user.email,
"role": test_user.role,
"exp": datetime.utcnow() - timedelta(hours=1),
"iat": datetime.utcnow() - timedelta(hours=2),
}
expired_token = jwt.encode(
expired_payload,
auth_manager.secret_key,
algorithm=auth_manager.algorithm
)
response = client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {expired_token}"}
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "TOKEN_EXPIRED"
def test_user_registration_flow(self, client, db):
"""Test complete user registration and login flow"""
# Register new user
register_response = client.post(
"/api/v1/auth/register",
json={
"email": "flowtest@example.com",
"username": "flowtest",
"password": "securepass123",
},
)
assert register_response.status_code == 200
user_data = register_response.json()
assert user_data["email"] == "flowtest@example.com"
assert user_data["username"] == "flowtest"
# Login with new user
login_response = client.post(
"/api/v1/auth/login",
json={"username": "flowtest", "password": "securepass123"},
)
assert login_response.status_code == 200
login_data = login_response.json()
assert "access_token" in login_data
assert login_data["user"]["username"] == "flowtest"
# Use token to get user info
headers = {"Authorization": f"Bearer {login_data['access_token']}"}
me_response = client.get("/api/v1/auth/me", headers=headers)
assert me_response.status_code == 200
me_data = me_response.json()
assert me_data["username"] == "flowtest"
assert me_data["email"] == "flowtest@example.com"
@pytest.mark.unit
@pytest.mark.auth
class TestAuthManager:
def test_hash_password(self, auth_manager):
"""Test password hashing"""
password = "testpassword123"
hashed = auth_manager.hash_password(password)
assert hashed != password
assert len(hashed) > 20 # bcrypt hashes are long
assert hashed.startswith("$") # bcrypt hash format
def test_verify_password(self, auth_manager):
"""Test password verification"""
password = "testpassword123"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) is True
assert auth_manager.verify_password("wrongpassword", hashed) is False
def test_create_access_token(self, auth_manager, test_user):
"""Test JWT token creation"""
token_data = auth_manager.create_access_token(test_user)
assert "access_token" in token_data
assert token_data["token_type"] == "bearer"
assert "expires_in" in token_data
assert isinstance(token_data["expires_in"], int)
assert token_data["expires_in"] > 0
def test_verify_token_valid(self, auth_manager, test_user):
"""Test JWT token verification with valid token"""
token_data = auth_manager.create_access_token(test_user)
token = token_data["access_token"]
verified_data = auth_manager.verify_token(token)
assert verified_data["user_id"] == test_user.id
assert verified_data["username"] == test_user.username
assert verified_data["email"] == test_user.email
assert verified_data["role"] == test_user.role
def test_verify_token_invalid(self, auth_manager):
"""Test JWT token verification with invalid token"""
from app.exceptions.auth import InvalidTokenException
with pytest.raises(InvalidTokenException):
auth_manager.verify_token("invalid_token_here")
def test_authenticate_user_success(self, auth_manager, db, test_user):
"""Test user authentication with valid credentials"""
user = auth_manager.authenticate_user(db, test_user.username, "testpass123")
assert user is not None
assert user.id == test_user.id
assert user.username == test_user.username
def test_authenticate_user_wrong_password(self, auth_manager, db, test_user):
"""Test user authentication with wrong password"""
user = auth_manager.authenticate_user(db, test_user.username, "wrongpassword")
assert user is None
def test_authenticate_user_nonexistent(self, auth_manager, db):
"""Test user authentication with nonexistent user"""
user = auth_manager.authenticate_user(db, "nonexistent", "password")
assert user is None
def test_authenticate_user_inactive(self, auth_manager, db, test_user):
"""Test user authentication with inactive user"""
# Deactivate user
original_status = test_user.is_active
test_user.is_active = False
db.commit()
try:
user = auth_manager.authenticate_user(db, test_user.username, "testpass123")
assert user is None # Should return None for inactive users
finally:
# Restore original status
test_user.is_active = original_status
db.commit()

View File

@@ -1,157 +0,0 @@
# tests/integration/api/v1/test_authentication_endpoints.py
import pytest
from fastapi import HTTPException
class TestAuthenticationAPI:
def test_register_user_success(self, client, db):
"""Test successful user registration"""
response = client.post(
"/api/v1/auth/register",
json={
"email": "newuser@example.com",
"username": "newuser",
"password": "securepass123",
},
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "newuser@example.com"
assert data["username"] == "newuser"
assert data["role"] == "user"
assert data["is_active"] is True
assert "hashed_password" not in data
def test_register_user_duplicate_email(self, client, test_user):
"""Test registration with duplicate email"""
response = client.post(
"/api/v1/auth/register",
json={
"email": test_user.email, # Same as test_user
"username": "newuser",
"password": "securepass123",
},
)
assert response.status_code == 409 # Changed from 400 to 409
data = response.json()
assert data["error_code"] == "USER_ALREADY_EXISTS"
assert "Email already exists" in data["message"]
assert data["field"] == "email"
def test_register_user_duplicate_username(self, client, test_user):
"""Test registration with duplicate username"""
response = client.post(
"/api/v1/auth/register",
json={
"email": "new@example.com",
"username": test_user.username, # Same as test_user
"password": "securepass123",
},
)
assert response.status_code == 409 # Changed from 400 to 409
data = response.json()
assert data["error_code"] == "USER_ALREADY_EXISTS"
assert "Username already taken" in data["message"]
assert data["field"] == "username"
def test_login_success(self, client, test_user):
"""Test successful login"""
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
assert "expires_in" in data
assert data["user"]["username"] == test_user.username
def test_login_wrong_password(self, client, test_user):
"""Test login with wrong password"""
response = client.post(
"/api/v1/auth/login",
json={"username": "testuser", "password": "wrongpassword"},
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_CREDENTIALS"
assert "Invalid username or password" in data["message"]
def test_login_nonexistent_user(self, client, db):
"""Test login with nonexistent user"""
response = client.post(
"/api/v1/auth/login",
json={"username": "nonexistent", "password": "password123"},
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_CREDENTIALS"
def test_login_inactive_user(self, client, db, test_user):
"""Test login with inactive user account"""
# Manually deactivate the user for this test
test_user.is_active = False
db.commit()
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"},
)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "USER_NOT_ACTIVE"
assert "User account is not active" in data["message"]
# Reactivate for other tests
test_user.is_active = True
db.commit()
def test_get_current_user_info(self, client, auth_headers, test_user):
"""Test getting current user info"""
response = client.get("/api/v1/auth/me", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["username"] == test_user.username
assert data["email"] == test_user.email
def test_get_current_user_without_auth(self, client):
"""Test getting current user without authentication"""
response = client.get("/api/v1/auth/me")
assert response.status_code == 401 # No authorization header
class TestAuthManager:
def test_hash_password(self, auth_manager):
"""Test password hashing"""
password = "testpassword123"
hashed = auth_manager.hash_password(password)
assert hashed != password
assert len(hashed) > 20 # bcrypt hashes are long
def test_verify_password(self, auth_manager):
"""Test password verification"""
password = "testpassword123"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) is True
assert auth_manager.verify_password("wrongpassword", hashed) is False
def test_create_access_token(self, auth_manager, test_user):
"""Test JWT token creation"""
token_data = auth_manager.create_access_token(test_user)
assert "access_token" in token_data
assert token_data["token_type"] == "bearer"
assert "expires_in" in token_data
assert isinstance(token_data["expires_in"], int)

View File

@@ -1,10 +1,14 @@
# tests/test_auth_service.py # tests/test_auth_service.py
import pytest import pytest
from fastapi import HTTPException
from app.exceptions.auth import (
UserAlreadyExistsException,
InvalidCredentialsException,
UserNotActiveException,
)
from app.exceptions.base import ValidationException
from app.services.auth_service import AuthService from app.services.auth_service import AuthService
from models.schemas.auth import UserLogin, UserRegister from models.schemas.auth import UserLogin, UserRegister
from models.database.user import User
@pytest.mark.unit @pytest.mark.unit
@@ -39,11 +43,14 @@ class TestAuthService:
password="securepass123", password="securepass123",
) )
with pytest.raises(HTTPException) as exc_info: with pytest.raises(UserAlreadyExistsException) as exc_info:
self.service.register_user(db, user_data) self.service.register_user(db, user_data)
assert exc_info.value.status_code == 400 exception = exc_info.value
assert "Email already registered" in str(exc_info.value.detail) assert exception.error_code == "USER_ALREADY_EXISTS"
assert exception.status_code == 409
assert "Email already registered" in exception.message
assert exception.details["field"] == "email"
def test_register_user_username_already_exists(self, db, test_user): def test_register_user_username_already_exists(self, db, test_user):
"""Test registration fails when username already exists""" """Test registration fails when username already exists"""
@@ -53,11 +60,14 @@ class TestAuthService:
password="securepass123", password="securepass123",
) )
with pytest.raises(HTTPException) as exc_info: with pytest.raises(UserAlreadyExistsException) as exc_info:
self.service.register_user(db, user_data) self.service.register_user(db, user_data)
assert exc_info.value.status_code == 400 exception = exc_info.value
assert "Username already taken" in str(exc_info.value.detail) assert exception.error_code == "USER_ALREADY_EXISTS"
assert exception.status_code == 409
assert "Username already taken" in exception.message
assert exception.details["field"] == "username"
def test_login_user_success(self, db, test_user): def test_login_user_success(self, db, test_user):
"""Test successful user login""" """Test successful user login"""
@@ -79,11 +89,13 @@ class TestAuthService:
"""Test login fails with wrong username""" """Test login fails with wrong username"""
user_credentials = UserLogin(username="nonexistentuser", password="testpass123") user_credentials = UserLogin(username="nonexistentuser", password="testpass123")
with pytest.raises(HTTPException) as exc_info: with pytest.raises(InvalidCredentialsException) as exc_info:
self.service.login_user(db, user_credentials) self.service.login_user(db, user_credentials)
assert exc_info.value.status_code == 401 exception = exc_info.value
assert "Incorrect username or password" in str(exc_info.value.detail) assert exception.error_code == "INVALID_CREDENTIALS"
assert exception.status_code == 401
assert "Incorrect username or password" in exception.message
def test_login_user_wrong_password(self, db, test_user): def test_login_user_wrong_password(self, db, test_user):
"""Test login fails with wrong password""" """Test login fails with wrong password"""
@@ -91,11 +103,13 @@ class TestAuthService:
username=test_user.username, password="wrongpassword" username=test_user.username, password="wrongpassword"
) )
with pytest.raises(HTTPException) as exc_info: with pytest.raises(InvalidCredentialsException) as exc_info:
self.service.login_user(db, user_credentials) self.service.login_user(db, user_credentials)
assert exc_info.value.status_code == 401 exception = exc_info.value
assert "Incorrect username or password" in str(exc_info.value.detail) assert exception.error_code == "INVALID_CREDENTIALS"
assert exception.status_code == 401
assert "Incorrect username or password" in exception.message
def test_login_user_inactive_user(self, db, test_user): def test_login_user_inactive_user(self, db, test_user):
"""Test login fails for inactive user""" """Test login fails for inactive user"""
@@ -107,11 +121,17 @@ class TestAuthService:
username=test_user.username, password="testpass123" username=test_user.username, password="testpass123"
) )
with pytest.raises(HTTPException) as exc_info: with pytest.raises(UserNotActiveException) as exc_info:
self.service.login_user(db, user_credentials) self.service.login_user(db, user_credentials)
assert exc_info.value.status_code == 401 exception = exc_info.value
assert "Incorrect username or password" in str(exc_info.value.detail) assert exception.error_code == "USER_NOT_ACTIVE"
assert exception.status_code == 403
assert "User account is not active" in exception.message
# Reactivate for cleanup
test_user.is_active = True
db.commit()
def test_get_user_by_email(self, db, test_user): def test_get_user_by_email(self, db, test_user):
"""Test getting user by email""" """Test getting user by email"""
@@ -196,6 +216,21 @@ class TestAuthService:
assert isinstance(token_data["expires_in"], int) assert isinstance(token_data["expires_in"], int)
assert token_data["expires_in"] > 0 assert token_data["expires_in"] > 0
def test_create_access_token_failure(self, test_user, monkeypatch):
"""Test creating access token handles failures"""
# Mock the auth_manager to raise an exception
def mock_create_token(*args, **kwargs):
raise Exception("Token creation failed")
monkeypatch.setattr(self.service.auth_manager, "create_access_token", mock_create_token)
with pytest.raises(ValidationException) as exc_info:
self.service.create_access_token(test_user)
exception = exc_info.value
assert exception.error_code == "VALIDATION_ERROR"
assert "Failed to create access token" in exception.message
def test_hash_password(self): def test_hash_password(self):
"""Test password hashing""" """Test password hashing"""
password = "testpassword123" password = "testpassword123"
@@ -212,3 +247,41 @@ class TestAuthService:
hash2 = self.service.hash_password(password) hash2 = self.service.hash_password(password)
assert hash1 != hash2 # Should be different due to salt assert hash1 != hash2 # Should be different due to salt
def test_hash_password_failure(self, monkeypatch):
"""Test password hashing handles failures"""
# Mock the auth_manager to raise an exception
def mock_hash_password(*args, **kwargs):
raise Exception("Hashing failed")
monkeypatch.setattr(self.service.auth_manager, "hash_password", mock_hash_password)
with pytest.raises(ValidationException) as exc_info:
self.service.hash_password("testpassword")
exception = exc_info.value
assert exception.error_code == "VALIDATION_ERROR"
assert "Failed to hash password" in exception.message
# Test database error handling
def test_register_user_database_error(self, db_with_error):
"""Test user registration handles database errors"""
user_data = UserRegister(
email="test@example.com",
username="testuser",
password="password123"
)
with pytest.raises(ValidationException) as exc_info:
self.service.register_user(db_with_error, user_data)
exception = exc_info.value
assert exception.error_code == "VALIDATION_ERROR"
assert "Registration failed" in exception.message
def test_login_user_database_error(self, db_with_error):
"""Test user login handles database errors"""
user_credentials = UserLogin(username="testuser", password="password123")
with pytest.raises(InvalidCredentialsException):
self.service.login_user(db_with_error, user_credentials)