- Remove |safe from |tojson in HTML attributes (x-data) - quotes must become " for browsers to parse correctly - Update LANG-002 and LANG-003 architecture rules to document correct |tojson usage patterns: - HTML attributes: |tojson (no |safe) - Script blocks: |tojson|safe - Fix validator to warn when |tojson|safe is used in x-data (breaks HTML attribute parsing) - Improve code quality across services, APIs, and tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
311 lines
12 KiB
Python
311 lines
12 KiB
Python
# tests/system/test_error_handling.py
|
|
"""
|
|
System tests for error handling across the LetzVendor API.
|
|
|
|
Tests the complete error handling flow from FastAPI through custom exception handlers
|
|
to ensure proper HTTP status codes, error structures, and client-friendly responses.
|
|
|
|
API Structure:
|
|
- /api/v1/admin/* - Admin endpoints (require admin token)
|
|
- /api/v1/vendor/* - Vendor endpoints (require vendor token with vendor_id claim)
|
|
- /api/v1/shop/* - Shop endpoints (customer-facing)
|
|
- /health - Health check endpoint
|
|
"""
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.system
|
|
class TestErrorHandling:
|
|
"""Test error handling behavior across the API endpoints"""
|
|
|
|
def test_invalid_json_request(self, client, admin_headers):
|
|
"""Test handling of malformed JSON requests"""
|
|
response = client.post(
|
|
"/api/v1/admin/vendors",
|
|
headers=admin_headers,
|
|
content="{ invalid json syntax",
|
|
)
|
|
|
|
assert response.status_code == 422
|
|
data = response.json()
|
|
assert "error_code" in data or "detail" in data
|
|
|
|
def test_missing_required_fields_vendor_creation(self, client, admin_headers):
|
|
"""Test validation errors for missing required fields"""
|
|
# Missing required fields (company_id, name, etc.)
|
|
response = client.post(
|
|
"/api/v1/admin/vendors",
|
|
headers=admin_headers,
|
|
json={"vendor_code": "TESTVENDOR"},
|
|
)
|
|
|
|
assert response.status_code == 422
|
|
data = response.json()
|
|
# FastAPI validation error or custom error
|
|
assert "error_code" in data or "detail" in data
|
|
|
|
def test_missing_authentication_token(self, client):
|
|
"""Test authentication required endpoints without token"""
|
|
response = client.get("/api/v1/admin/vendors")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["error_code"] in ["INVALID_TOKEN", "HTTP_401"]
|
|
assert data["status_code"] == 401
|
|
|
|
def test_invalid_authentication_token(self, client):
|
|
"""Test endpoints with invalid JWT token"""
|
|
headers = {"Authorization": "Bearer invalid_token_here"}
|
|
response = client.get("/api/v1/admin/vendors", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["error_code"] in ["INVALID_TOKEN", "TOKEN_EXPIRED", "HTTP_401"]
|
|
assert data["status_code"] == 401
|
|
|
|
def test_expired_authentication_token(self, client):
|
|
"""Test endpoints with expired JWT token"""
|
|
expired_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.expired.token"
|
|
headers = {"Authorization": f"Bearer {expired_token}"}
|
|
response = client.get("/api/v1/admin/vendors", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["status_code"] == 401
|
|
|
|
def test_vendor_not_found(self, client, admin_headers):
|
|
"""Test accessing non-existent vendor"""
|
|
response = client.get(
|
|
"/api/v1/admin/vendors/NONEXISTENT_CODE", headers=admin_headers
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
assert data["error_code"] == "VENDOR_NOT_FOUND"
|
|
assert data["status_code"] == 404
|
|
assert data["details"]["resource_type"] == "Vendor"
|
|
|
|
def test_marketplace_product_not_found(self, client, admin_headers):
|
|
"""Test accessing non-existent marketplace product"""
|
|
response = client.get("/api/v1/admin/products/999999", headers=admin_headers)
|
|
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
# API returns PRODUCT_NOT_FOUND for marketplace product lookups
|
|
assert data["error_code"] == "PRODUCT_NOT_FOUND"
|
|
assert data["status_code"] == 404
|
|
|
|
def test_insufficient_permissions_regular_user(self, client, auth_headers):
|
|
"""Test accessing admin endpoints with regular user token"""
|
|
response = client.get("/api/v1/admin/users", headers=auth_headers)
|
|
|
|
# Should get 401 (not admin) or 403 (forbidden)
|
|
assert response.status_code in [401, 403]
|
|
if response.status_code == 401:
|
|
data = response.json()
|
|
assert data["error_code"] in [
|
|
"ADMIN_REQUIRED",
|
|
"INSUFFICIENT_PERMISSIONS",
|
|
"INVALID_TOKEN",
|
|
]
|
|
|
|
def test_nonexistent_endpoint(self, client, admin_headers):
|
|
"""Test 404 for completely non-existent endpoints"""
|
|
response = client.get("/api/v1/nonexistent-endpoint", headers=admin_headers)
|
|
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
assert data["error_code"] == "ENDPOINT_NOT_FOUND"
|
|
assert data["status_code"] == 404
|
|
assert data["details"]["path"] == "/api/v1/nonexistent-endpoint"
|
|
assert data["details"]["method"] == "GET"
|
|
|
|
def test_method_not_allowed(self, client, admin_headers):
|
|
"""Test 405 for wrong HTTP method on existing endpoints"""
|
|
# Try DELETE on vendors list endpoint that only supports GET/POST
|
|
response = client.delete("/api/v1/admin/vendors", headers=admin_headers)
|
|
|
|
assert response.status_code == 405
|
|
|
|
def test_health_check_always_works(self, client):
|
|
"""Test health check endpoint works without auth"""
|
|
response = client.get("/health")
|
|
assert response.status_code == 200
|
|
|
|
def test_error_response_consistency(self, client, admin_headers):
|
|
"""Test that all error responses follow consistent structure"""
|
|
# Test 404 error
|
|
response = client.get(
|
|
"/api/v1/admin/vendors/NONEXISTENT", headers=admin_headers
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
data = response.json()
|
|
# All error responses should have these fields
|
|
required_fields = ["error_code", "message", "status_code"]
|
|
for field in required_fields:
|
|
assert field in data, f"Missing {field} in error response"
|
|
|
|
# Details field should be present (can be empty dict)
|
|
assert "details" in data
|
|
assert isinstance(data["details"], dict)
|
|
|
|
def test_cors_preflight_request(self, client):
|
|
"""Test CORS preflight requests are handled"""
|
|
response = client.options("/api/v1/admin/vendors")
|
|
|
|
# Should either succeed or be handled gracefully
|
|
assert response.status_code in [200, 204, 405]
|
|
|
|
def test_authentication_error_no_sensitive_data(self, client):
|
|
"""Test authentication errors don't expose sensitive information"""
|
|
response = client.get("/api/v1/admin/vendors")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
message = data.get("message", "").lower()
|
|
# Should not expose sensitive internal details
|
|
assert "password" not in message
|
|
assert "secret" not in message
|
|
assert "key" not in message
|
|
|
|
|
|
@pytest.mark.system
|
|
class TestVendorAPIErrors:
|
|
"""Test vendor-specific API error handling"""
|
|
|
|
def test_vendor_api_requires_vendor_token(self, client, admin_headers):
|
|
"""Admin token without vendor_id claim cannot access vendor API"""
|
|
# Admin token doesn't have vendor_id, so vendor endpoints should fail
|
|
response = client.get("/api/v1/vendor/products", headers=admin_headers)
|
|
|
|
# Should fail - admin token lacks vendor_id claim
|
|
assert response.status_code in [401, 403]
|
|
|
|
def test_inventory_requires_vendor_context(self, client, admin_headers):
|
|
"""Test inventory endpoints require vendor context"""
|
|
# Admin headers don't have vendor context
|
|
response = client.get("/api/v1/vendor/inventory", headers=admin_headers)
|
|
|
|
assert response.status_code in [401, 403]
|
|
|
|
|
|
@pytest.mark.system
|
|
class TestErrorRecovery:
|
|
"""Test system recovery and graceful degradation"""
|
|
|
|
def test_health_check_endpoint(self, client):
|
|
"""Test health check responds"""
|
|
response = client.get("/health")
|
|
|
|
# Health check should respond, even if it reports unhealthy
|
|
assert response.status_code in [200, 503]
|
|
|
|
if response.status_code == 503:
|
|
data = response.json()
|
|
assert data["error_code"] == "SERVICE_UNAVAILABLE"
|
|
|
|
def test_basic_api_availability(self, client, admin_headers):
|
|
"""Test that API endpoints respond"""
|
|
# Basic endpoint should respond
|
|
health_response = client.get("/health")
|
|
assert health_response.status_code == 200
|
|
|
|
# Admin endpoint should respond (with data or auth error)
|
|
api_response = client.get("/api/v1/admin/vendors", headers=admin_headers)
|
|
# Should get either data or a proper error, not a crash
|
|
assert api_response.status_code in [200, 401, 403, 500, 503]
|
|
|
|
def test_error_logging_integration(self, client, admin_headers, caplog):
|
|
"""Test that errors are properly logged"""
|
|
import logging
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
# Trigger a 404 error
|
|
client.get("/api/v1/admin/vendors/NONEXISTENT", headers=admin_headers)
|
|
|
|
# Check that the error was logged
|
|
log_messages = [record.message for record in caplog.records]
|
|
# 404s are typically logged at WARNING level in the exception handler
|
|
assert any("404" in msg or "NONEXISTENT" in msg for msg in log_messages)
|
|
|
|
|
|
@pytest.mark.system
|
|
class TestValidationErrors:
|
|
"""Test validation error handling"""
|
|
|
|
def test_invalid_vendor_code_format(self, client, admin_headers, test_company):
|
|
"""Test validation error for invalid vendor code format"""
|
|
vendor_data = {
|
|
"company_id": test_company.id,
|
|
"vendor_code": "INVALID@VENDOR!", # Invalid chars
|
|
"name": "Test Vendor",
|
|
}
|
|
|
|
response = client.post(
|
|
"/api/v1/admin/vendors", headers=admin_headers, json=vendor_data
|
|
)
|
|
|
|
assert response.status_code == 422
|
|
data = response.json()
|
|
# Can be VALIDATION_ERROR (Pydantic) or INVALID_VENDOR_DATA (custom)
|
|
assert data["error_code"] in ["VALIDATION_ERROR", "INVALID_VENDOR_DATA"]
|
|
|
|
def test_duplicate_vendor_creation(
|
|
self, client, admin_headers, test_vendor, test_company
|
|
):
|
|
"""Test creating vendor with duplicate vendor code"""
|
|
import uuid
|
|
|
|
# Create vendor with valid unique subdomain (required field)
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
vendor_data = {
|
|
"company_id": test_company.id,
|
|
"vendor_code": test_vendor.vendor_code, # Already exists
|
|
"name": "Duplicate Vendor",
|
|
"subdomain": f"duplicate{unique_id}", # Unique subdomain
|
|
}
|
|
|
|
response = client.post(
|
|
"/api/v1/admin/vendors", headers=admin_headers, json=vendor_data
|
|
)
|
|
|
|
# Should be 409 (conflict) or 422 (validation caught it)
|
|
assert response.status_code in [409, 422]
|
|
data = response.json()
|
|
if response.status_code == 409:
|
|
assert data["error_code"] == "VENDOR_ALREADY_EXISTS"
|
|
else:
|
|
# Validation caught the duplicate before service layer
|
|
assert data["error_code"] in ["VALIDATION_ERROR", "VENDOR_ALREADY_EXISTS"]
|
|
|
|
|
|
@pytest.mark.system
|
|
class TestAdminUserManagement:
|
|
"""Test admin user management error handling"""
|
|
|
|
def test_user_not_found(self, client, admin_headers):
|
|
"""Test accessing non-existent user"""
|
|
response = client.get("/api/v1/admin/users/999999", headers=admin_headers)
|
|
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
assert data["error_code"] == "USER_NOT_FOUND"
|
|
assert data["status_code"] == 404
|
|
|
|
def test_cannot_delete_self(self, client, admin_headers, test_admin):
|
|
"""Test admin cannot delete themselves"""
|
|
response = client.delete(
|
|
f"/api/v1/admin/users/{test_admin.id}", headers=admin_headers
|
|
)
|
|
|
|
# Should be rejected
|
|
assert response.status_code in [400, 403, 409]
|
|
data = response.json()
|
|
assert data["error_code"] in [
|
|
"CANNOT_MODIFY_SELF",
|
|
"CANNOT_DELETE_SELF",
|
|
"USER_CANNOT_BE_DELETED",
|
|
]
|