refactor: remove legacy user registration from auth_service

- Remove unused register_user() method and helper methods
- Remove legacy UserRegister schema (customer registration uses CustomerService)
- Remove wrapper methods that just delegated to auth_manager
- Simplify auth_service to focus on login and vendor access control
- Clean up tests to match simplified service

The only registration path is now /api/v1/shop/auth/register for customers,
which uses CustomerService.register_customer().

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-06 19:55:23 +01:00
parent b31fd41423
commit 0bfdf331d6
5 changed files with 77 additions and 611 deletions

View File

@@ -1,90 +1,40 @@
# app/services/auth_service.py # app/services/auth_service.py
""" """
Authentication service for user registration and login. Authentication service for user login and vendor access control.
This module provides classes and functions for: This module provides:
- User registration with validation
- User authentication and JWT token generation - User authentication and JWT token generation
- Password management and security - Vendor access verification
- Password hashing utilities
Note: Customer registration is handled by CustomerService.
User (admin/vendor team) creation is handled by their respective services.
""" """
import logging import logging
from datetime import UTC
from typing import Any from typing import Any
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.exceptions import ( from app.exceptions import (
InvalidCredentialsException, InvalidCredentialsException,
UserAlreadyExistsException,
UserNotActiveException, UserNotActiveException,
ValidationException,
) )
from middleware.auth import AuthManager from middleware.auth import AuthManager
from models.database.user import User from models.database.user import User
from models.database.vendor import Vendor, VendorUser from models.database.vendor import Vendor, VendorUser
from models.schema.auth import UserLogin, UserRegister from models.schema.auth import UserLogin
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class AuthService: class AuthService:
"""Service class for authentication operations following the application's service pattern.""" """Service class for authentication operations."""
def __init__(self): def __init__(self):
"""Class constructor.""" """Initialize with AuthManager instance."""
self.auth_manager = AuthManager() self.auth_manager = AuthManager()
def register_user(self, db: Session, user_data: UserRegister) -> User:
"""
Register a new user.
Args:
db: Database session
user_data: User registration data
Returns:
Created user object
Raises:
UserAlreadyExistsException: If email or username already exists
ValidationException: If user data is invalid
"""
try:
# Check if email already exists
if self._email_exists(db, user_data.email):
raise UserAlreadyExistsException(
"Email already registered", field="email"
)
# Check if username already exists
if self._username_exists(db, user_data.username):
raise UserAlreadyExistsException(
"Username already taken", field="username"
)
# Hash password and create user
hashed_password = self.auth_manager.hash_password(user_data.password)
new_user = User(
email=user_data.email,
username=user_data.username,
hashed_password=hashed_password,
role="user",
is_active=True,
)
db.add(new_user)
db.flush()
logger.info(f"New user registered: {new_user.username}")
return new_user
except UserAlreadyExistsException:
raise # Re-raise custom exceptions
except Exception as e:
logger.error(f"Error registering user: {str(e)}")
raise ValidationException("Registration failed")
def login_user(self, db: Session, user_credentials: UserLogin) -> dict[str, Any]: def login_user(self, db: Session, user_credentials: UserLogin) -> dict[str, Any]:
""" """
Login user and return JWT token with user data. Login user and return JWT token with user data.
@@ -100,118 +50,31 @@ class AuthService:
InvalidCredentialsException: If authentication fails InvalidCredentialsException: If authentication fails
UserNotActiveException: If user account is not active UserNotActiveException: If user account is not active
""" """
try: user = self.auth_manager.authenticate_user(
user = self.auth_manager.authenticate_user( db, user_credentials.email_or_username, user_credentials.password
db, user_credentials.email_or_username, user_credentials.password )
) if not user:
if not user: raise InvalidCredentialsException("Incorrect username or password")
raise InvalidCredentialsException("Incorrect username or password")
# Check if user is active if not user.is_active:
if not user.is_active: raise UserNotActiveException("User account is not active")
raise UserNotActiveException("User account is not active")
# Create access token token_data = self.auth_manager.create_access_token(user)
token_data = self.auth_manager.create_access_token(user)
logger.info(f"User logged in: {user.username}") logger.info(f"User logged in: {user.username}")
return {"token_data": token_data, "user": user} return {"token_data": token_data, "user": user}
except (InvalidCredentialsException, UserNotActiveException):
raise # Re-raise custom exceptions
except Exception as e:
logger.error(f"Error during login: {str(e)}")
raise InvalidCredentialsException()
def get_user_by_email(self, db: Session, email: str) -> User | None:
"""Get user by email."""
try:
return db.query(User).filter(User.email == email).first()
except Exception as e:
logger.error(f"Error getting user by email: {str(e)}")
return None
def get_user_by_username(self, db: Session, username: str) -> User | None:
"""Get user by username."""
try:
return db.query(User).filter(User.username == username).first()
except Exception as e:
logger.error(f"Error getting user by username: {str(e)}")
return None
def authenticate_user(
self, db: Session, username: str, password: str
) -> User | None:
"""Authenticate user with username/password."""
try:
return self.auth_manager.authenticate_user(db, username, password)
except Exception as e:
logger.error(f"Error authenticating user: {str(e)}")
return None
def create_access_token(self, user: User) -> dict[str, Any]:
"""Create access token for user."""
try:
return self.auth_manager.create_access_token(user)
except Exception as e:
logger.error(f"Error creating access token: {str(e)}")
raise ValidationException("Failed to create access token")
def hash_password(self, password: str) -> str: def hash_password(self, password: str) -> str:
"""Hash password."""
try:
return self.auth_manager.hash_password(password)
except Exception as e:
logger.error(f"Error hashing password: {str(e)}")
raise ValidationException("Failed to hash password")
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
try:
return self.auth_manager.verify_password(plain_password, hashed_password)
except Exception as e:
logger.error(f"Error verifying password: {str(e)}")
return False
def create_access_token_with_data(self, data: dict) -> dict:
""" """
Create JWT token with custom data payload. Hash a password.
Useful for non-User entities like customers that need tokens.
Args: Args:
data: Dictionary containing token payload data (must include 'sub') password: Plain text password
Returns: Returns:
Dictionary with access_token, token_type, and expires_in Hashed password string
""" """
from datetime import datetime, timedelta return self.auth_manager.hash_password(password)
from jose import jwt
from app.core.config import settings
try:
expires_delta = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
expire = datetime.now(UTC) + expires_delta
# Build payload with provided data
payload = {
**data,
"exp": expire,
"iat": datetime.now(UTC),
}
token = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
return {
"access_token": token,
"token_type": "bearer",
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
}
except Exception as e:
logger.error(f"Error creating access token with data: {str(e)}")
raise ValidationException("Failed to create access token")
def get_vendor_by_code(self, db: Session, vendor_code: str) -> Vendor | None: def get_vendor_by_code(self, db: Session, vendor_code: str) -> Vendor | None:
""" """
@@ -291,15 +154,6 @@ class AuthService:
return None, None return None, None
# Private helper methods
def _email_exists(self, db: Session, email: str) -> bool:
"""Check if email already exists."""
return db.query(User).filter(User.email == email).first() is not None
def _username_exists(self, db: Session, username: str) -> bool: # Create service instance
"""Check if username already exists."""
return db.query(User).filter(User.username == username).first() is not None
# Create service instance following the same pattern as other services
auth_service = AuthService() auth_service = AuthService()

View File

@@ -5,30 +5,6 @@ from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator
class UserRegister(BaseModel):
email: EmailStr = Field(..., description="Valid email address")
username: str = Field(..., description="Username")
password: str = Field(..., description="Password")
# Keep security validation in Pydantic for auth
@field_validator("username")
@classmethod
def validate_username(cls, v):
if not re.match(r"^[a-zA-Z0-9_]+$", v):
raise ValueError(
"Username must contain only letters, numbers, or underscores"
)
return v.lower().strip()
@field_validator("password")
@classmethod
def validate_password(cls, v):
if len(v) < 6:
raise ValueError("Password must be at least 6 characters long")
return v
class UserLogin(BaseModel): class UserLogin(BaseModel):
email_or_username: str = Field(..., description="Username or email address") email_or_username: str = Field(..., description="Username or email address")
password: str = Field(..., description="Password") password: str = Field(..., description="Password")

View File

@@ -1,4 +1,10 @@
# tests/integration/api/v1/test_auth_endpoints.py # tests/integration/api/v1/test_auth_endpoints.py
"""Integration tests for authentication endpoints.
Note: User registration is handled per-context:
- Customers: /api/v1/shop/auth/register (CustomerService)
- Admin/Vendor users: Created by admin or via team invites
"""
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
import pytest import pytest
@@ -9,78 +15,10 @@ from jose import jwt
@pytest.mark.api @pytest.mark.api
@pytest.mark.auth @pytest.mark.auth
class TestAuthenticationAPI: class TestAuthenticationAPI:
def test_register_user_success(self, client, db): """Test authentication endpoints for admin/vendor login."""
"""Test successful user registration"""
response = client.post(
"/api/v1/auth/register",
json={
"email": "newuser@example.com",
"username": "newuser",
"password": "securepass123",
},
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "newuser@example.com"
assert data["username"] == "newuser"
assert data["role"] == "user"
assert data["is_active"] is True
assert "hashed_password" not in data
def test_register_user_duplicate_email(self, client, test_user):
"""Test registration with duplicate email"""
response = client.post(
"/api/v1/auth/register",
json={
"email": test_user.email, # Same as test_user
"username": "newuser",
"password": "securepass123",
},
)
assert response.status_code == 409
data = response.json()
assert data["error_code"] == "USER_ALREADY_EXISTS"
assert "Email already registered" in data["message"]
assert data["details"]["field"] == "email"
def test_register_user_duplicate_username(self, client, test_user):
"""Test registration with duplicate username"""
response = client.post(
"/api/v1/auth/register",
json={
"email": "new@example.com",
"username": test_user.username, # Same as test_user
"password": "securepass123",
},
)
assert response.status_code == 409
data = response.json()
assert data["error_code"] == "USER_ALREADY_EXISTS"
assert "Username already taken" in data["message"]
assert data["details"]["field"] == "username"
def test_register_user_validation_error(self, client):
"""Test registration with invalid data"""
response = client.post(
"/api/v1/auth/register",
json={
"email": "invalid-email", # Invalid email format
"username": "", # Empty username
"password": "123", # Too short password
},
)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
assert "Request validation failed" in data["message"]
assert "validation_errors" in data["details"]
def test_login_success(self, client, test_user): def test_login_success(self, client, test_user):
"""Test successful login""" """Test successful login."""
response = client.post( response = client.post(
"/api/v1/auth/login", "/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"}, json={"username": test_user.username, "password": "testpass123"},
@@ -95,7 +33,7 @@ class TestAuthenticationAPI:
assert data["user"]["email"] == test_user.email assert data["user"]["email"] == test_user.email
def test_login_wrong_password(self, client, test_user): def test_login_wrong_password(self, client, test_user):
"""Test login with wrong password""" """Test login with wrong password."""
response = client.post( response = client.post(
"/api/v1/auth/login", "/api/v1/auth/login",
json={"username": test_user.username, "password": "wrongpassword"}, json={"username": test_user.username, "password": "wrongpassword"},
@@ -107,7 +45,7 @@ class TestAuthenticationAPI:
assert "Incorrect username or password" in data["message"] assert "Incorrect username or password" in data["message"]
def test_login_nonexistent_user(self, client, db): def test_login_nonexistent_user(self, client, db):
"""Test login with nonexistent user""" """Test login with nonexistent user."""
response = client.post( response = client.post(
"/api/v1/auth/login", "/api/v1/auth/login",
json={"username": "nonexistent", "password": "password123"}, json={"username": "nonexistent", "password": "password123"},
@@ -119,7 +57,7 @@ class TestAuthenticationAPI:
assert "Incorrect username or password" in data["message"] assert "Incorrect username or password" in data["message"]
def test_login_inactive_user(self, client, db, test_user): def test_login_inactive_user(self, client, db, test_user):
"""Test login with inactive user account""" """Test login with inactive user account."""
# Manually deactivate the user for this test # Manually deactivate the user for this test
original_status = test_user.is_active original_status = test_user.is_active
test_user.is_active = False test_user.is_active = False
@@ -142,7 +80,7 @@ class TestAuthenticationAPI:
db.commit() db.commit()
def test_login_validation_error(self, client): def test_login_validation_error(self, client):
"""Test login with invalid request format""" """Test login with invalid request format."""
response = client.post( response = client.post(
"/api/v1/auth/login", "/api/v1/auth/login",
json={ json={
@@ -157,7 +95,7 @@ class TestAuthenticationAPI:
assert "Request validation failed" in data["message"] assert "Request validation failed" in data["message"]
def test_get_current_user_info(self, client, auth_headers, test_user): def test_get_current_user_info(self, client, auth_headers, test_user):
"""Test getting current user info""" """Test getting current user info."""
response = client.get("/api/v1/auth/me", headers=auth_headers) response = client.get("/api/v1/auth/me", headers=auth_headers)
assert response.status_code == 200 assert response.status_code == 200
@@ -168,7 +106,7 @@ class TestAuthenticationAPI:
assert data["is_active"] is True assert data["is_active"] is True
def test_get_current_user_without_auth(self, client): def test_get_current_user_without_auth(self, client):
"""Test getting current user without authentication""" """Test getting current user without authentication."""
response = client.get("/api/v1/auth/me") response = client.get("/api/v1/auth/me")
assert response.status_code == 401 assert response.status_code == 401
@@ -177,7 +115,7 @@ class TestAuthenticationAPI:
assert "Authorization header required" in data["message"] assert "Authorization header required" in data["message"]
def test_get_current_user_invalid_token(self, client): def test_get_current_user_invalid_token(self, client):
"""Test getting current user with invalid token""" """Test getting current user with invalid token."""
response = client.get( response = client.get(
"/api/v1/auth/me", headers={"Authorization": "Bearer invalid_token_here"} "/api/v1/auth/me", headers={"Authorization": "Bearer invalid_token_here"}
) )
@@ -187,9 +125,7 @@ class TestAuthenticationAPI:
assert data["error_code"] == "INVALID_TOKEN" assert data["error_code"] == "INVALID_TOKEN"
def test_get_current_user_expired_token(self, client, test_user, auth_manager): def test_get_current_user_expired_token(self, client, test_user, auth_manager):
"""Test getting current user with expired token""" """Test getting current user with expired token."""
# Create an expired token by mocking the expiration
# Create token that expired 1 hour ago # Create token that expired 1 hour ago
expired_payload = { expired_payload = {
"sub": str(test_user.id), "sub": str(test_user.id),
@@ -212,49 +148,14 @@ class TestAuthenticationAPI:
data = response.json() data = response.json()
assert data["error_code"] == "TOKEN_EXPIRED" assert data["error_code"] == "TOKEN_EXPIRED"
def test_user_registration_flow(self, client, db):
"""Test complete user registration and login flow"""
# Register new user
register_response = client.post(
"/api/v1/auth/register",
json={
"email": "flowtest@example.com",
"username": "flowtest",
"password": "securepass123",
},
)
assert register_response.status_code == 200
user_data = register_response.json()
assert user_data["email"] == "flowtest@example.com"
assert user_data["username"] == "flowtest"
# Login with new user
login_response = client.post(
"/api/v1/auth/login",
json={"username": "flowtest", "password": "securepass123"},
)
assert login_response.status_code == 200
login_data = login_response.json()
assert "access_token" in login_data
assert login_data["user"]["username"] == "flowtest"
# Use token to get user info
headers = {"Authorization": f"Bearer {login_data['access_token']}"}
me_response = client.get("/api/v1/auth/me", headers=headers)
assert me_response.status_code == 200
me_data = me_response.json()
assert me_data["username"] == "flowtest"
assert me_data["email"] == "flowtest@example.com"
@pytest.mark.unit @pytest.mark.unit
@pytest.mark.auth @pytest.mark.auth
class TestAuthManager: class TestAuthManager:
"""Unit tests for AuthManager."""
def test_hash_password(self, auth_manager): def test_hash_password(self, auth_manager):
"""Test password hashing""" """Test password hashing."""
password = "testpassword123" password = "testpassword123"
hashed = auth_manager.hash_password(password) hashed = auth_manager.hash_password(password)
@@ -263,7 +164,7 @@ class TestAuthManager:
assert hashed.startswith("$") # bcrypt hash format assert hashed.startswith("$") # bcrypt hash format
def test_verify_password(self, auth_manager): def test_verify_password(self, auth_manager):
"""Test password verification""" """Test password verification."""
password = "testpassword123" password = "testpassword123"
hashed = auth_manager.hash_password(password) hashed = auth_manager.hash_password(password)
@@ -271,7 +172,7 @@ class TestAuthManager:
assert auth_manager.verify_password("wrongpassword", hashed) is False assert auth_manager.verify_password("wrongpassword", hashed) is False
def test_create_access_token(self, auth_manager, test_user): def test_create_access_token(self, auth_manager, test_user):
"""Test JWT token creation""" """Test JWT token creation."""
token_data = auth_manager.create_access_token(test_user) token_data = auth_manager.create_access_token(test_user)
assert "access_token" in token_data assert "access_token" in token_data
@@ -281,7 +182,7 @@ class TestAuthManager:
assert token_data["expires_in"] > 0 assert token_data["expires_in"] > 0
def test_verify_token_valid(self, auth_manager, test_user): def test_verify_token_valid(self, auth_manager, test_user):
"""Test JWT token verification with valid token""" """Test JWT token verification with valid token."""
token_data = auth_manager.create_access_token(test_user) token_data = auth_manager.create_access_token(test_user)
token = token_data["access_token"] token = token_data["access_token"]
@@ -293,14 +194,14 @@ class TestAuthManager:
assert verified_data["role"] == test_user.role assert verified_data["role"] == test_user.role
def test_verify_token_invalid(self, auth_manager): def test_verify_token_invalid(self, auth_manager):
"""Test JWT token verification with invalid token""" """Test JWT token verification with invalid token."""
from app.exceptions.auth import InvalidTokenException from app.exceptions.auth import InvalidTokenException
with pytest.raises(InvalidTokenException): with pytest.raises(InvalidTokenException):
auth_manager.verify_token("invalid_token_here") auth_manager.verify_token("invalid_token_here")
def test_authenticate_user_success(self, auth_manager, db, test_user): def test_authenticate_user_success(self, auth_manager, db, test_user):
"""Test user authentication with valid credentials""" """Test user authentication with valid credentials."""
user = auth_manager.authenticate_user(db, test_user.username, "testpass123") user = auth_manager.authenticate_user(db, test_user.username, "testpass123")
assert user is not None assert user is not None
@@ -308,13 +209,13 @@ class TestAuthManager:
assert user.username == test_user.username assert user.username == test_user.username
def test_authenticate_user_wrong_password(self, auth_manager, db, test_user): def test_authenticate_user_wrong_password(self, auth_manager, db, test_user):
"""Test user authentication with wrong password""" """Test user authentication with wrong password."""
user = auth_manager.authenticate_user(db, test_user.username, "wrongpassword") user = auth_manager.authenticate_user(db, test_user.username, "wrongpassword")
assert user is None assert user is None
def test_authenticate_user_nonexistent(self, auth_manager, db): def test_authenticate_user_nonexistent(self, auth_manager, db):
"""Test user authentication with nonexistent user""" """Test user authentication with nonexistent user."""
user = auth_manager.authenticate_user(db, "nonexistent", "password") user = auth_manager.authenticate_user(db, "nonexistent", "password")
assert user is None assert user is None

View File

@@ -6,101 +6,11 @@ from pydantic import ValidationError
from models.schema.auth import ( from models.schema.auth import (
UserCreate, UserCreate,
UserLogin, UserLogin,
UserRegister,
UserResponse, UserResponse,
UserUpdate, UserUpdate,
) )
@pytest.mark.unit
@pytest.mark.schema
class TestUserRegisterSchema:
"""Test UserRegister schema validation."""
def test_valid_registration(self):
"""Test valid registration data."""
user = UserRegister(
email="test@example.com",
username="testuser",
password="password123",
)
assert user.email == "test@example.com"
assert user.username == "testuser"
assert user.password == "password123"
def test_username_normalized_to_lowercase(self):
"""Test username is normalized to lowercase."""
user = UserRegister(
email="test@example.com",
username="TestUser",
password="password123",
)
assert user.username == "testuser"
def test_username_with_whitespace_invalid(self):
"""Test username with whitespace is invalid (validation before strip)."""
with pytest.raises(ValidationError) as exc_info:
UserRegister(
email="test@example.com",
username=" testuser ",
password="password123",
)
assert "username" in str(exc_info.value).lower()
def test_invalid_email(self):
"""Test invalid email raises ValidationError."""
with pytest.raises(ValidationError) as exc_info:
UserRegister(
email="not-an-email",
username="testuser",
password="password123",
)
assert "email" in str(exc_info.value).lower()
def test_invalid_username_special_chars(self):
"""Test username with special characters raises ValidationError."""
with pytest.raises(ValidationError) as exc_info:
UserRegister(
email="test@example.com",
username="test@user!",
password="password123",
)
assert "username" in str(exc_info.value).lower()
def test_valid_username_with_underscore(self):
"""Test username with underscore is valid."""
user = UserRegister(
email="test@example.com",
username="test_user_123",
password="password123",
)
assert user.username == "test_user_123"
def test_password_too_short(self):
"""Test password shorter than 6 characters raises ValidationError."""
with pytest.raises(ValidationError) as exc_info:
UserRegister(
email="test@example.com",
username="testuser",
password="12345",
)
assert "password" in str(exc_info.value).lower()
def test_password_exactly_6_chars(self):
"""Test password with exactly 6 characters is valid."""
user = UserRegister(
email="test@example.com",
username="testuser",
password="123456",
)
assert user.password == "123456"
def test_missing_required_fields(self):
"""Test missing required fields raises ValidationError."""
with pytest.raises(ValidationError):
UserRegister(email="test@example.com")
@pytest.mark.unit @pytest.mark.unit
@pytest.mark.schema @pytest.mark.schema
class TestUserLoginSchema: class TestUserLoginSchema:

View File

@@ -1,76 +1,26 @@
# tests/test_auth_service.py # tests/unit/services/test_auth_service.py
"""Unit tests for AuthService - login and password hashing."""
import pytest import pytest
from app.exceptions.auth import ( from app.exceptions.auth import (
InvalidCredentialsException, InvalidCredentialsException,
UserAlreadyExistsException,
UserNotActiveException, UserNotActiveException,
) )
from app.exceptions.base import ValidationException
from app.services.auth_service import AuthService from app.services.auth_service import AuthService
from models.schema.auth import UserLogin, UserRegister from models.schema.auth import UserLogin
@pytest.mark.unit @pytest.mark.unit
@pytest.mark.auth @pytest.mark.auth
class TestAuthService: class TestAuthService:
"""Test suite for AuthService following the application's testing patterns""" """Test suite for AuthService."""
def setup_method(self): def setup_method(self):
"""Setup method following the same pattern as admin service tests""" """Setup method."""
self.service = AuthService() self.service = AuthService()
def test_register_user_success(self, db):
"""Test successful user registration"""
user_data = UserRegister(
email="newuser@example.com", username="newuser123", password="securepass123"
)
user = self.service.register_user(db, user_data)
assert user is not None
assert user.email == "newuser@example.com"
assert user.username == "newuser123"
assert user.role == "user"
assert user.is_active is True
assert user.hashed_password != "securepass123" # Should be hashed
def test_register_user_email_already_exists(self, db, test_user):
"""Test registration fails when email already exists"""
user_data = UserRegister(
email=test_user.email, # Use existing email
username="differentuser",
password="securepass123",
)
with pytest.raises(UserAlreadyExistsException) as exc_info:
self.service.register_user(db, user_data)
exception = exc_info.value
assert exception.error_code == "USER_ALREADY_EXISTS"
assert exception.status_code == 409
assert "Email already registered" in exception.message
assert exception.details["field"] == "email"
def test_register_user_username_already_exists(self, db, test_user):
"""Test registration fails when username already exists"""
user_data = UserRegister(
email="different@example.com",
username=test_user.username, # Use existing username
password="securepass123",
)
with pytest.raises(UserAlreadyExistsException) as exc_info:
self.service.register_user(db, user_data)
exception = exc_info.value
assert exception.error_code == "USER_ALREADY_EXISTS"
assert exception.status_code == 409
assert "Username already taken" in exception.message
assert exception.details["field"] == "username"
def test_login_user_success(self, db, test_user): def test_login_user_success(self, db, test_user):
"""Test successful user login""" """Test successful user login."""
user_credentials = UserLogin( user_credentials = UserLogin(
email_or_username=test_user.username, password="testpass123" email_or_username=test_user.username, password="testpass123"
) )
@@ -85,8 +35,19 @@ class TestAuthService:
assert "token_type" in result["token_data"] assert "token_type" in result["token_data"]
assert "expires_in" in result["token_data"] assert "expires_in" in result["token_data"]
def test_login_user_with_email(self, db, test_user):
"""Test login with email instead of username."""
user_credentials = UserLogin(
email_or_username=test_user.email, password="testpass123"
)
result = self.service.login_user(db, user_credentials)
assert result["user"].id == test_user.id
assert "access_token" in result["token_data"]
def test_login_user_wrong_username(self, db): def test_login_user_wrong_username(self, db):
"""Test login fails with wrong username""" """Test login fails with wrong username."""
user_credentials = UserLogin( user_credentials = UserLogin(
email_or_username="nonexistentuser", password="testpass123" email_or_username="nonexistentuser", password="testpass123"
) )
@@ -100,7 +61,7 @@ class TestAuthService:
assert "Incorrect username or password" in exception.message assert "Incorrect username or password" in exception.message
def test_login_user_wrong_password(self, db, test_user): def test_login_user_wrong_password(self, db, test_user):
"""Test login fails with wrong password""" """Test login fails with wrong password."""
user_credentials = UserLogin( user_credentials = UserLogin(
email_or_username=test_user.username, password="wrongpassword" email_or_username=test_user.username, password="wrongpassword"
) )
@@ -114,7 +75,7 @@ class TestAuthService:
assert "Incorrect username or password" in exception.message assert "Incorrect username or password" in exception.message
def test_login_user_inactive_user(self, db, test_user): def test_login_user_inactive_user(self, db, test_user):
"""Test login fails for inactive user""" """Test login fails for inactive user."""
from models.database.user import User from models.database.user import User
# Re-query user and deactivate # Re-query user and deactivate
@@ -138,109 +99,8 @@ class TestAuthService:
user.is_active = True user.is_active = True
db.commit() db.commit()
def test_get_user_by_email(self, db, test_user):
"""Test getting user by email"""
user = self.service.get_user_by_email(db, test_user.email)
assert user is not None
assert user.id == test_user.id
assert user.email == test_user.email
def test_get_user_by_email_not_found(self, db):
"""Test getting user by email when user doesn't exist"""
user = self.service.get_user_by_email(db, "nonexistent@example.com")
assert user is None
def test_get_user_by_username(self, db, test_user):
"""Test getting user by username"""
user = self.service.get_user_by_username(db, test_user.username)
assert user is not None
assert user.id == test_user.id
assert user.username == test_user.username
def test_get_user_by_username_not_found(self, db):
"""Test getting user by username when user doesn't exist"""
user = self.service.get_user_by_username(db, "nonexistentuser")
assert user is None
def test_email_exists_true(self, db, test_user):
"""Test email_exists returns True when email exists"""
exists = self.service._email_exists(db, test_user.email)
assert exists is True
def test_email_exists_false(self, db):
"""Test email_exists returns False when email doesn't exist"""
exists = self.service._email_exists(db, "nonexistent@example.com")
assert exists is False
def test_username_exists_true(self, db, test_user):
"""Test username_exists returns True when username exists"""
exists = self.service._username_exists(db, test_user.username)
assert exists is True
def test_username_exists_false(self, db):
"""Test username_exists returns False when username doesn't exist"""
exists = self.service._username_exists(db, "nonexistentuser")
assert exists is False
def test_authenticate_user_success(self, db, test_user):
"""Test successful user authentication"""
user = self.service.authenticate_user(db, test_user.username, "testpass123")
assert user is not None
assert user.id == test_user.id
assert user.username == test_user.username
def test_authenticate_user_wrong_password(self, db, test_user):
"""Test authentication fails with wrong password"""
user = self.service.authenticate_user(db, test_user.username, "wrongpassword")
assert user is None
def test_authenticate_user_nonexistent(self, db):
"""Test authentication fails with nonexistent user"""
user = self.service.authenticate_user(db, "nonexistentuser", "password")
assert user is None
def test_create_access_token(self, test_user):
"""Test creating access token for user"""
token_data = self.service.create_access_token(test_user)
assert "access_token" in token_data
assert "token_type" in token_data
assert "expires_in" in token_data
assert token_data["token_type"] == "bearer"
assert isinstance(token_data["expires_in"], int)
assert token_data["expires_in"] > 0
def test_create_access_token_failure(self, test_user, monkeypatch):
"""Test creating access token handles failures"""
# Mock the auth_manager to raise an exception
def mock_create_token(*args, **kwargs):
raise Exception("Token creation failed")
monkeypatch.setattr(
self.service.auth_manager, "create_access_token", mock_create_token
)
with pytest.raises(ValidationException) as exc_info:
self.service.create_access_token(test_user)
exception = exc_info.value
assert exception.error_code == "VALIDATION_ERROR"
assert "Failed to create access token" in exception.message
def test_hash_password(self): def test_hash_password(self):
"""Test password hashing""" """Test password hashing."""
password = "testpassword123" password = "testpassword123"
hashed = self.service.hash_password(password) hashed = self.service.hash_password(password)
@@ -249,50 +109,15 @@ class TestAuthService:
assert hashed.startswith("$") # bcrypt hash format assert hashed.startswith("$") # bcrypt hash format
def test_hash_password_different_results(self): def test_hash_password_different_results(self):
"""Test that hashing same password produces different hashes (salt)""" """Test that hashing same password produces different hashes (salt)."""
password = "testpassword123" password = "testpassword123"
hash1 = self.service.hash_password(password) hash1 = self.service.hash_password(password)
hash2 = self.service.hash_password(password) hash2 = self.service.hash_password(password)
assert hash1 != hash2 # Should be different due to salt assert hash1 != hash2 # Should be different due to salt
def test_hash_password_failure(self, monkeypatch): def test_get_vendor_by_code_not_found(self, db):
"""Test password hashing handles failures""" """Test getting vendor by non-existent code returns None."""
vendor = self.service.get_vendor_by_code(db, "NONEXISTENT")
# Mock the auth_manager to raise an exception assert vendor is None
def mock_hash_password(*args, **kwargs):
raise Exception("Hashing failed")
monkeypatch.setattr(
self.service.auth_manager, "hash_password", mock_hash_password
)
with pytest.raises(ValidationException) as exc_info:
self.service.hash_password("testpassword")
exception = exc_info.value
assert exception.error_code == "VALIDATION_ERROR"
assert "Failed to hash password" in exception.message
# Test database error handling
def test_register_user_database_error(self, db_with_error):
"""Test user registration handles database errors"""
user_data = UserRegister(
email="test@example.com", username="testuser", password="password123"
)
with pytest.raises(ValidationException) as exc_info:
self.service.register_user(db_with_error, user_data)
exception = exc_info.value
assert exception.error_code == "VALIDATION_ERROR"
assert "Registration failed" in exception.message
def test_login_user_database_error(self, db_with_error):
"""Test user login handles database errors"""
user_credentials = UserLogin(
email_or_username="testuser", password="password123"
)
with pytest.raises(InvalidCredentialsException):
self.service.login_user(db_with_error, user_credentials)