# tests/integration/api/v1/test_auth_endpoints.py """Integration tests for authentication endpoints. Note: User registration is handled per-context: - Customers: /api/v1/shop/auth/register (CustomerService) - Admin/Vendor users: Created by admin or via team invites """ from datetime import UTC, datetime, timedelta import pytest from jose import jwt @pytest.mark.integration @pytest.mark.api @pytest.mark.auth class TestAuthenticationAPI: """Test authentication endpoints for admin/vendor login.""" def test_login_success(self, client, test_user): """Test successful login.""" response = client.post( "/api/v1/auth/login", json={"username": test_user.username, "password": "testpass123"}, ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert data["token_type"] == "bearer" assert "expires_in" in data assert data["user"]["username"] == test_user.username assert data["user"]["email"] == test_user.email def test_login_wrong_password(self, client, test_user): """Test login with wrong password.""" response = client.post( "/api/v1/auth/login", json={"username": test_user.username, "password": "wrongpassword"}, ) assert response.status_code == 401 data = response.json() assert data["error_code"] == "INVALID_CREDENTIALS" assert "Incorrect username or password" in data["message"] def test_login_nonexistent_user(self, client, db): """Test login with nonexistent user.""" response = client.post( "/api/v1/auth/login", json={"username": "nonexistent", "password": "password123"}, ) assert response.status_code == 401 data = response.json() assert data["error_code"] == "INVALID_CREDENTIALS" assert "Incorrect username or password" in data["message"] def test_login_inactive_user(self, client, db, test_user): """Test login with inactive user account.""" # Manually deactivate the user for this test original_status = test_user.is_active test_user.is_active = False db.commit() try: response = client.post( "/api/v1/auth/login", json={"username": test_user.username, "password": "testpass123"}, ) assert response.status_code == 403 data = response.json() assert data["error_code"] == "USER_NOT_ACTIVE" assert "User account is not active" in data["message"] finally: # Restore original status for cleanup test_user.is_active = original_status db.commit() def test_login_validation_error(self, client): """Test login with invalid request format.""" response = client.post( "/api/v1/auth/login", json={ "username": "", # Empty username # Missing password field }, ) assert response.status_code == 422 data = response.json() assert data["error_code"] == "VALIDATION_ERROR" assert "Request validation failed" in data["message"] def test_get_current_user_info(self, client, auth_headers, test_user): """Test getting current user info.""" response = client.get("/api/v1/auth/me", headers=auth_headers) assert response.status_code == 200 data = response.json() assert data["username"] == test_user.username assert data["email"] == test_user.email assert data["role"] == test_user.role assert data["is_active"] is True def test_get_current_user_without_auth(self, client): """Test getting current user without authentication.""" response = client.get("/api/v1/auth/me") assert response.status_code == 401 data = response.json() assert data["error_code"] == "INVALID_TOKEN" assert "Authorization header required" in data["message"] def test_get_current_user_invalid_token(self, client): """Test getting current user with invalid token.""" response = client.get( "/api/v1/auth/me", headers={"Authorization": "Bearer invalid_token_here"} ) assert response.status_code == 401 data = response.json() assert data["error_code"] == "INVALID_TOKEN" def test_get_current_user_expired_token(self, client, test_user, auth_manager): """Test getting current user with expired token.""" # Create token that expired 1 hour ago expired_payload = { "sub": str(test_user.id), "username": test_user.username, "email": test_user.email, "role": test_user.role, "exp": datetime.now(UTC) - timedelta(hours=1), "iat": datetime.now(UTC) - timedelta(hours=2), } expired_token = jwt.encode( expired_payload, auth_manager.secret_key, algorithm=auth_manager.algorithm ) response = client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {expired_token}"} ) assert response.status_code == 401 data = response.json() assert data["error_code"] == "TOKEN_EXPIRED" @pytest.mark.unit @pytest.mark.auth class TestAuthManager: """Unit tests for AuthManager.""" def test_hash_password(self, auth_manager): """Test password hashing.""" password = "testpassword123" hashed = auth_manager.hash_password(password) assert hashed != password assert len(hashed) > 20 # bcrypt hashes are long assert hashed.startswith("$") # bcrypt hash format def test_verify_password(self, auth_manager): """Test password verification.""" password = "testpassword123" hashed = auth_manager.hash_password(password) assert auth_manager.verify_password(password, hashed) is True assert auth_manager.verify_password("wrongpassword", hashed) is False def test_create_access_token(self, auth_manager, test_user): """Test JWT token creation.""" token_data = auth_manager.create_access_token(test_user) assert "access_token" in token_data assert token_data["token_type"] == "bearer" assert "expires_in" in token_data assert isinstance(token_data["expires_in"], int) assert token_data["expires_in"] > 0 def test_verify_token_valid(self, auth_manager, test_user): """Test JWT token verification with valid token.""" token_data = auth_manager.create_access_token(test_user) token = token_data["access_token"] verified_data = auth_manager.verify_token(token) assert verified_data["user_id"] == test_user.id assert verified_data["username"] == test_user.username assert verified_data["email"] == test_user.email assert verified_data["role"] == test_user.role def test_verify_token_invalid(self, auth_manager): """Test JWT token verification with invalid token.""" from app.exceptions.auth import InvalidTokenException with pytest.raises(InvalidTokenException): auth_manager.verify_token("invalid_token_here") def test_authenticate_user_success(self, auth_manager, db, test_user): """Test user authentication with valid credentials.""" user = auth_manager.authenticate_user(db, test_user.username, "testpass123") assert user is not None assert user.id == test_user.id assert user.username == test_user.username def test_authenticate_user_wrong_password(self, auth_manager, db, test_user): """Test user authentication with wrong password.""" user = auth_manager.authenticate_user(db, test_user.username, "wrongpassword") assert user is None def test_authenticate_user_nonexistent(self, auth_manager, db): """Test user authentication with nonexistent user.""" user = auth_manager.authenticate_user(db, "nonexistent", "password") assert user is None