# tests/integration/api/v1/test_auth_endpoints.py from datetime import UTC, datetime, timedelta import pytest from jose import jwt @pytest.mark.integration @pytest.mark.api @pytest.mark.auth class TestAuthenticationAPI: def test_register_user_success(self, client, db): """Test successful user registration""" response = client.post( "/api/v1/auth/register", json={ "email": "newuser@example.com", "username": "newuser", "password": "securepass123", }, ) assert response.status_code == 200 data = response.json() assert data["email"] == "newuser@example.com" assert data["username"] == "newuser" assert data["role"] == "user" assert data["is_active"] is True assert "hashed_password" not in data def test_register_user_duplicate_email(self, client, test_user): """Test registration with duplicate email""" response = client.post( "/api/v1/auth/register", json={ "email": test_user.email, # Same as test_user "username": "newuser", "password": "securepass123", }, ) assert response.status_code == 409 data = response.json() assert data["error_code"] == "USER_ALREADY_EXISTS" assert "Email already registered" in data["message"] assert data["details"]["field"] == "email" def test_register_user_duplicate_username(self, client, test_user): """Test registration with duplicate username""" response = client.post( "/api/v1/auth/register", json={ "email": "new@example.com", "username": test_user.username, # Same as test_user "password": "securepass123", }, ) assert response.status_code == 409 data = response.json() assert data["error_code"] == "USER_ALREADY_EXISTS" assert "Username already taken" in data["message"] assert data["details"]["field"] == "username" def test_register_user_validation_error(self, client): """Test registration with invalid data""" response = client.post( "/api/v1/auth/register", json={ "email": "invalid-email", # Invalid email format "username": "", # Empty username "password": "123", # Too short password }, ) assert response.status_code == 422 data = response.json() assert data["error_code"] == "VALIDATION_ERROR" assert "Request validation failed" in data["message"] assert "validation_errors" in data["details"] def test_login_success(self, client, test_user): """Test successful login""" response = client.post( "/api/v1/auth/login", json={"username": test_user.username, "password": "testpass123"}, ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert data["token_type"] == "bearer" assert "expires_in" in data assert data["user"]["username"] == test_user.username assert data["user"]["email"] == test_user.email def test_login_wrong_password(self, client, test_user): """Test login with wrong password""" response = client.post( "/api/v1/auth/login", json={"username": test_user.username, "password": "wrongpassword"}, ) assert response.status_code == 401 data = response.json() assert data["error_code"] == "INVALID_CREDENTIALS" assert "Incorrect username or password" in data["message"] def test_login_nonexistent_user(self, client, db): """Test login with nonexistent user""" response = client.post( "/api/v1/auth/login", json={"username": "nonexistent", "password": "password123"}, ) assert response.status_code == 401 data = response.json() assert data["error_code"] == "INVALID_CREDENTIALS" assert "Incorrect username or password" in data["message"] def test_login_inactive_user(self, client, db, test_user): """Test login with inactive user account""" # Manually deactivate the user for this test original_status = test_user.is_active test_user.is_active = False db.commit() try: response = client.post( "/api/v1/auth/login", json={"username": test_user.username, "password": "testpass123"}, ) assert response.status_code == 403 data = response.json() assert data["error_code"] == "USER_NOT_ACTIVE" assert "User account is not active" in data["message"] finally: # Restore original status for cleanup test_user.is_active = original_status db.commit() def test_login_validation_error(self, client): """Test login with invalid request format""" response = client.post( "/api/v1/auth/login", json={ "username": "", # Empty username # Missing password field }, ) assert response.status_code == 422 data = response.json() assert data["error_code"] == "VALIDATION_ERROR" assert "Request validation failed" in data["message"] def test_get_current_user_info(self, client, auth_headers, test_user): """Test getting current user info""" response = client.get("/api/v1/auth/me", headers=auth_headers) assert response.status_code == 200 data = response.json() assert data["username"] == test_user.username assert data["email"] == test_user.email assert data["role"] == test_user.role assert data["is_active"] is True def test_get_current_user_without_auth(self, client): """Test getting current user without authentication""" response = client.get("/api/v1/auth/me") assert response.status_code == 401 data = response.json() assert data["error_code"] == "INVALID_TOKEN" assert "Authorization header required" in data["message"] def test_get_current_user_invalid_token(self, client): """Test getting current user with invalid token""" response = client.get( "/api/v1/auth/me", headers={"Authorization": "Bearer invalid_token_here"} ) assert response.status_code == 401 data = response.json() assert data["error_code"] == "INVALID_TOKEN" def test_get_current_user_expired_token(self, client, test_user, auth_manager): """Test getting current user with expired token""" # Create an expired token by mocking the expiration # Create token that expired 1 hour ago expired_payload = { "sub": str(test_user.id), "username": test_user.username, "email": test_user.email, "role": test_user.role, "exp": datetime.now(UTC) - timedelta(hours=1), "iat": datetime.now(UTC) - timedelta(hours=2), } expired_token = jwt.encode( expired_payload, auth_manager.secret_key, algorithm=auth_manager.algorithm ) response = client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {expired_token}"} ) assert response.status_code == 401 data = response.json() assert data["error_code"] == "TOKEN_EXPIRED" def test_user_registration_flow(self, client, db): """Test complete user registration and login flow""" # Register new user register_response = client.post( "/api/v1/auth/register", json={ "email": "flowtest@example.com", "username": "flowtest", "password": "securepass123", }, ) assert register_response.status_code == 200 user_data = register_response.json() assert user_data["email"] == "flowtest@example.com" assert user_data["username"] == "flowtest" # Login with new user login_response = client.post( "/api/v1/auth/login", json={"username": "flowtest", "password": "securepass123"}, ) assert login_response.status_code == 200 login_data = login_response.json() assert "access_token" in login_data assert login_data["user"]["username"] == "flowtest" # Use token to get user info headers = {"Authorization": f"Bearer {login_data['access_token']}"} me_response = client.get("/api/v1/auth/me", headers=headers) assert me_response.status_code == 200 me_data = me_response.json() assert me_data["username"] == "flowtest" assert me_data["email"] == "flowtest@example.com" @pytest.mark.unit @pytest.mark.auth class TestAuthManager: def test_hash_password(self, auth_manager): """Test password hashing""" password = "testpassword123" hashed = auth_manager.hash_password(password) assert hashed != password assert len(hashed) > 20 # bcrypt hashes are long assert hashed.startswith("$") # bcrypt hash format def test_verify_password(self, auth_manager): """Test password verification""" password = "testpassword123" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(password, hashed) is True assert auth_manager.verify_password("wrongpassword", hashed) is False def test_create_access_token(self, auth_manager, test_user): """Test JWT token creation""" token_data = auth_manager.create_access_token(test_user) assert "access_token" in token_data assert token_data["token_type"] == "bearer" assert "expires_in" in token_data assert isinstance(token_data["expires_in"], int) assert token_data["expires_in"] > 0 def test_verify_token_valid(self, auth_manager, test_user): """Test JWT token verification with valid token""" token_data = auth_manager.create_access_token(test_user) token = token_data["access_token"] verified_data = auth_manager.verify_token(token) assert verified_data["user_id"] == test_user.id assert verified_data["username"] == test_user.username assert verified_data["email"] == test_user.email assert verified_data["role"] == test_user.role def test_verify_token_invalid(self, auth_manager): """Test JWT token verification with invalid token""" from app.exceptions.auth import InvalidTokenException with pytest.raises(InvalidTokenException): auth_manager.verify_token("invalid_token_here") def test_authenticate_user_success(self, auth_manager, db, test_user): """Test user authentication with valid credentials""" user = auth_manager.authenticate_user(db, test_user.username, "testpass123") assert user is not None assert user.id == test_user.id assert user.username == test_user.username def test_authenticate_user_wrong_password(self, auth_manager, db, test_user): """Test user authentication with wrong password""" user = auth_manager.authenticate_user(db, test_user.username, "wrongpassword") assert user is None def test_authenticate_user_nonexistent(self, auth_manager, db): """Test user authentication with nonexistent user""" user = auth_manager.authenticate_user(db, "nonexistent", "password") assert user is None