425 lines
17 KiB
Python
425 lines
17 KiB
Python
# tests/system/test_error_handling.py
|
|
"""
|
|
System tests for error handling across the LetzShop API.
|
|
|
|
Tests the complete error handling flow from FastAPI through custom exception handlers
|
|
to ensure proper HTTP status codes, error structures, and client-friendly responses.
|
|
"""
|
|
import pytest
|
|
import json
|
|
|
|
|
|
@pytest.mark.system
|
|
class TestErrorHandling:
|
|
"""Test error handling behavior across the API endpoints"""
|
|
|
|
def test_invalid_json_request(self, client, auth_headers):
|
|
"""Test handling of malformed JSON requests"""
|
|
response = client.post(
|
|
"/api/v1/shop",
|
|
headers=auth_headers,
|
|
content="{ invalid json syntax"
|
|
)
|
|
|
|
assert response.status_code == 422
|
|
data = response.json()
|
|
assert data["error_code"] == "VALIDATION_ERROR"
|
|
assert data["message"] == "Request validation failed"
|
|
assert "validation_errors" in data["details"]
|
|
|
|
def test_missing_required_fields_shop_creation(self, client, auth_headers):
|
|
"""Test validation errors for missing required fields"""
|
|
# Missing shop_name
|
|
response = client.post(
|
|
"/api/v1/shop",
|
|
headers=auth_headers,
|
|
json={"shop_code": "TESTSHOP"}
|
|
)
|
|
|
|
assert response.status_code == 422
|
|
data = response.json()
|
|
assert data["error_code"] == "VALIDATION_ERROR"
|
|
assert data["status_code"] == 422
|
|
assert "validation_errors" in data["details"]
|
|
|
|
def test_invalid_field_format_shop_creation(self, client, auth_headers):
|
|
"""Test validation errors for invalid field formats"""
|
|
# Invalid shop_code format (contains special characters)
|
|
response = client.post(
|
|
"/api/v1/shop",
|
|
headers=auth_headers,
|
|
json={
|
|
"shop_code": "INVALID@SHOP!",
|
|
"shop_name": "Test Shop"
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 422
|
|
data = response.json()
|
|
assert data["error_code"] == "INVALID_SHOP_DATA"
|
|
assert data["status_code"] == 422
|
|
assert data["details"]["field"] == "shop_code"
|
|
assert "letters, numbers, underscores, and hyphens" in data["message"]
|
|
|
|
def test_missing_authentication_token(self, client):
|
|
"""Test authentication required endpoints without token"""
|
|
response = client.get("/api/v1/shop")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["error_code"] == "INVALID_TOKEN"
|
|
assert data["status_code"] == 401
|
|
|
|
def test_invalid_authentication_token(self, client):
|
|
"""Test endpoints with invalid JWT token"""
|
|
headers = {"Authorization": "Bearer invalid_token_here"}
|
|
response = client.get("/api/v1/shop", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["error_code"] in ["INVALID_TOKEN", "TOKEN_EXPIRED", "HTTP_401"]
|
|
assert data["status_code"] == 401
|
|
|
|
def test_expired_authentication_token(self, client):
|
|
"""Test endpoints with expired JWT token"""
|
|
# This would require creating an expired token for testing
|
|
expired_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.expired.token"
|
|
headers = {"Authorization": f"Bearer {expired_token}"}
|
|
response = client.get("/api/v1/shop", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["status_code"] == 401
|
|
|
|
def test_shop_not_found(self, client, auth_headers):
|
|
"""Test accessing non-existent shop"""
|
|
response = client.get("/api/v1/shop/NONEXISTENT", headers=auth_headers)
|
|
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
assert data["error_code"] == "SHOP_NOT_FOUND"
|
|
assert data["status_code"] == 404
|
|
assert data["details"]["resource_type"] == "Shop"
|
|
assert data["details"]["identifier"] == "NONEXISTENT"
|
|
|
|
def test_product_not_found(self, client, auth_headers):
|
|
"""Test accessing non-existent product"""
|
|
response = client.get("/api/v1/product/NONEXISTENT", headers=auth_headers)
|
|
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
assert data["error_code"] == "PRODUCT_NOT_FOUND"
|
|
assert data["status_code"] == 404
|
|
assert data["details"]["resource_type"] == "Product"
|
|
assert data["details"]["identifier"] == "NONEXISTENT"
|
|
|
|
def test_duplicate_shop_creation(self, client, auth_headers, test_shop):
|
|
"""Test creating shop with duplicate shop code"""
|
|
shop_data = {
|
|
"shop_code": test_shop.shop_code,
|
|
"shop_name": "Duplicate Shop"
|
|
}
|
|
|
|
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
|
|
|
|
assert response.status_code == 409
|
|
data = response.json()
|
|
assert data["error_code"] == "SHOP_ALREADY_EXISTS"
|
|
assert data["status_code"] == 409
|
|
assert data["details"]["shop_code"] == test_shop.shop_code.upper()
|
|
|
|
def test_duplicate_product_creation(self, client, auth_headers, test_product):
|
|
"""Test creating product with duplicate product ID"""
|
|
product_data = {
|
|
"product_id": test_product.product_id,
|
|
"title": "Duplicate Product",
|
|
"gtin": "1234567890123"
|
|
}
|
|
|
|
response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
|
|
|
|
assert response.status_code == 409
|
|
data = response.json()
|
|
assert data["error_code"] == "PRODUCT_ALREADY_EXISTS"
|
|
assert data["status_code"] == 409
|
|
assert data["details"]["product_id"] == test_product.product_id
|
|
|
|
def test_unauthorized_shop_access(self, client, auth_headers, inactive_shop):
|
|
"""Test accessing shop without proper permissions"""
|
|
response = client.get(f"/api/v1/shop/{inactive_shop.shop_code}", headers=auth_headers)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert data["error_code"] == "UNAUTHORIZED_SHOP_ACCESS"
|
|
assert data["status_code"] == 403
|
|
assert data["details"]["shop_code"] == inactive_shop.shop_code
|
|
|
|
def test_insufficient_permissions(self, client, auth_headers, admin_only_endpoint="/api/v1/admin/users"):
|
|
"""Test accessing admin endpoints with regular user"""
|
|
response = client.get(admin_only_endpoint, headers=auth_headers)
|
|
|
|
assert response.status_code in [403, 404] # 403 for permission denied, 404 if endpoint doesn't exist
|
|
if response.status_code == 403:
|
|
data = response.json()
|
|
assert data["error_code"] in ["ADMIN_REQUIRED", "INSUFFICIENT_PERMISSIONS"]
|
|
assert data["status_code"] == 403
|
|
|
|
def test_business_logic_violation_max_shops(self, client, auth_headers, monkeypatch):
|
|
"""Test business logic violation - creating too many shops"""
|
|
# This test would require mocking the shop limit check
|
|
# For now, test the error structure when creating multiple shops
|
|
shops_created = []
|
|
for i in range(6): # Assume limit is 5
|
|
shop_data = {
|
|
"shop_code": f"SHOP{i:03d}",
|
|
"shop_name": f"Test Shop {i}"
|
|
}
|
|
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
|
|
shops_created.append(response)
|
|
|
|
# At least one should succeed, and if limit is enforced, later ones should fail
|
|
success_count = sum(1 for r in shops_created if r.status_code in [200, 201])
|
|
assert success_count >= 1
|
|
|
|
# If any failed due to limit, check error structure
|
|
failed_responses = [r for r in shops_created if r.status_code == 400]
|
|
if failed_responses:
|
|
data = failed_responses[0].json()
|
|
assert data["error_code"] == "MAX_SHOPS_REACHED"
|
|
assert "max_shops" in data["details"]
|
|
|
|
def test_validation_error_invalid_gtin(self, client, auth_headers):
|
|
"""Test validation error for invalid GTIN format"""
|
|
product_data = {
|
|
"product_id": "TESTPROD001",
|
|
"title": "Test Product",
|
|
"gtin": "invalid_gtin_format"
|
|
}
|
|
|
|
response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
|
|
|
|
assert response.status_code == 422
|
|
data = response.json()
|
|
assert data["error_code"] == "INVALID_PRODUCT_DATA"
|
|
assert data["status_code"] == 422
|
|
assert data["details"]["field"] == "gtin"
|
|
|
|
def test_stock_insufficient_quantity(self, client, auth_headers, test_shop, test_product):
|
|
"""Test business logic error for insufficient stock"""
|
|
# First create some stock
|
|
stock_data = {
|
|
"gtin": test_product.gtin,
|
|
"location": "WAREHOUSE_A",
|
|
"quantity": 5
|
|
}
|
|
client.post("/api/v1/stock", headers=auth_headers, json=stock_data)
|
|
|
|
# Try to remove more than available using your remove endpoint
|
|
remove_data = {
|
|
"gtin": test_product.gtin,
|
|
"location": "WAREHOUSE_A",
|
|
"quantity": 10 # More than the 5 we added
|
|
}
|
|
response = client.post("/api/v1/stock/remove", headers=auth_headers, json=remove_data)
|
|
|
|
# This should ALWAYS fail with insufficient stock error
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert data["error_code"] == "INSUFFICIENT_STOCK"
|
|
assert data["status_code"] == 400
|
|
assert "requested_quantity" in data["details"]
|
|
assert "available_quantity" in data["details"]
|
|
assert data["details"]["requested_quantity"] == 10
|
|
assert data["details"]["available_quantity"] == 5
|
|
|
|
def test_nonexistent_endpoint(self, client, auth_headers):
|
|
"""Test 404 for completely non-existent endpoints"""
|
|
response = client.get("/api/v1/nonexistent-endpoint", headers=auth_headers)
|
|
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
assert data["error_code"] == "ENDPOINT_NOT_FOUND"
|
|
assert data["status_code"] == 404
|
|
assert data["details"]["path"] == "/api/v1/nonexistent-endpoint"
|
|
assert data["details"]["method"] == "GET"
|
|
|
|
def test_method_not_allowed(self, client, auth_headers):
|
|
"""Test 405 for wrong HTTP method on existing endpoints"""
|
|
# Try DELETE on an endpoint that only supports GET
|
|
response = client.delete("/api/v1/shop", headers=auth_headers)
|
|
|
|
assert response.status_code == 405
|
|
# FastAPI automatically handles 405 errors
|
|
|
|
def test_unsupported_content_type(self, client, auth_headers):
|
|
"""Test handling of unsupported content types"""
|
|
headers = {**auth_headers, "Content-Type": "application/xml"}
|
|
response = client.post(
|
|
"/api/v1/shop",
|
|
headers=headers,
|
|
content="<shop><code>TEST</code></shop>"
|
|
)
|
|
|
|
assert response.status_code in [400, 415, 422]
|
|
|
|
def test_large_payload_handling(self, client, auth_headers):
|
|
"""Test handling of unusually large payloads"""
|
|
large_description = "x" * 100000 # Very long description
|
|
shop_data = {
|
|
"shop_code": "LARGESHOP",
|
|
"shop_name": "Large Shop",
|
|
"description": large_description
|
|
}
|
|
|
|
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
|
|
|
|
# Should either accept it or reject with appropriate error
|
|
assert response.status_code in [200, 201, 413, 422]
|
|
if response.status_code in [413, 422]:
|
|
data = response.json()
|
|
assert "status_code" in data
|
|
assert "error_code" in data
|
|
|
|
def test_rate_limiting_response_structure(self, client, auth_headers):
|
|
"""Test rate limiting error structure if implemented"""
|
|
# Make rapid requests to potentially trigger rate limiting
|
|
responses = []
|
|
for _ in range(50): # Aggressive request count
|
|
response = client.get("/api/v1/shop", headers=auth_headers)
|
|
responses.append(response)
|
|
|
|
# Check if any rate limiting occurred and verify error structure
|
|
rate_limited = [r for r in responses if r.status_code == 429]
|
|
if rate_limited:
|
|
data = rate_limited[0].json()
|
|
assert data["error_code"] == "RATE_LIMIT_EXCEEDED"
|
|
assert data["status_code"] == 429
|
|
# May include retry_after in details
|
|
|
|
def test_server_error_structure(self, client, auth_headers, monkeypatch):
|
|
"""Test 500 error handling structure"""
|
|
# This is harder to trigger naturally, but we can test the structure
|
|
# if you have a test endpoint that can simulate server errors
|
|
|
|
# For now, just ensure the health check works (basic server functionality)
|
|
response = client.get("/health")
|
|
assert response.status_code == 200
|
|
|
|
def test_marketplace_import_errors(self, client, auth_headers, test_shop):
|
|
"""Test marketplace import specific errors"""
|
|
# Test invalid marketplace
|
|
import_data = {
|
|
"marketplace": "INVALID_MARKETPLACE",
|
|
"shop_code": test_shop.shop_code
|
|
}
|
|
|
|
response = client.post("/api/v1/imports", headers=auth_headers, json=import_data)
|
|
|
|
if response.status_code == 422:
|
|
data = response.json()
|
|
assert data["error_code"] == "INVALID_MARKETPLACE"
|
|
assert data["status_code"] == 422
|
|
assert data["details"]["field"] == "marketplace"
|
|
|
|
def test_external_service_error_structure(self, client, auth_headers):
|
|
"""Test external service error handling"""
|
|
# This would test marketplace connection failures, etc.
|
|
# For now, just verify the error structure exists in your exception hierarchy
|
|
|
|
# Test with potentially problematic external data
|
|
import_data = {
|
|
"marketplace": "LETZSHOP",
|
|
"external_url": "https://nonexistent-marketplace.com/api"
|
|
}
|
|
|
|
response = client.post("/api/v1/imports", headers=auth_headers, json=import_data)
|
|
|
|
# If it's a real external service error, check structure
|
|
if response.status_code == 502:
|
|
data = response.json()
|
|
assert data["error_code"] == "EXTERNAL_SERVICE_ERROR"
|
|
assert data["status_code"] == 502
|
|
assert "service_name" in data["details"]
|
|
|
|
def test_error_response_consistency(self, client, auth_headers):
|
|
"""Test that all error responses follow consistent structure"""
|
|
test_cases = [
|
|
("/api/v1/shop/NONEXISTENT", 404),
|
|
("/api/v1/product/NONEXISTENT", 404),
|
|
]
|
|
|
|
for endpoint, expected_status in test_cases:
|
|
response = client.get(endpoint, headers=auth_headers)
|
|
assert response.status_code == expected_status
|
|
|
|
data = response.json()
|
|
# All error responses should have these fields
|
|
required_fields = ["error_code", "message", "status_code"]
|
|
for field in required_fields:
|
|
assert field in data, f"Missing {field} in error response for {endpoint}"
|
|
|
|
# Details field should be present (can be empty dict)
|
|
assert "details" in data
|
|
assert isinstance(data["details"], dict)
|
|
|
|
def test_cors_error_handling(self, client):
|
|
"""Test CORS errors are handled properly"""
|
|
# Test preflight request
|
|
response = client.options("/api/v1/shop")
|
|
|
|
# Should either succeed or be handled gracefully
|
|
assert response.status_code in [200, 204, 405]
|
|
|
|
def test_authentication_error_details(self, client):
|
|
"""Test authentication error provides helpful details"""
|
|
# Test missing Authorization header
|
|
response = client.get("/api/v1/shop")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "error_code" in data
|
|
assert "message" in data
|
|
# Should not expose sensitive internal details
|
|
assert "password" not in data.get("message", "").lower()
|
|
assert "secret" not in data.get("message", "").lower()
|
|
|
|
|
|
@pytest.mark.system
|
|
class TestErrorRecovery:
|
|
"""Test system recovery and graceful degradation"""
|
|
|
|
def test_database_connection_recovery(self, client):
|
|
"""Test health check during database issues"""
|
|
response = client.get("/health")
|
|
|
|
# Health check should respond, even if it reports unhealthy
|
|
assert response.status_code in [200, 503]
|
|
|
|
if response.status_code == 503:
|
|
data = response.json()
|
|
assert data["error_code"] == "SERVICE_UNAVAILABLE"
|
|
|
|
def test_partial_service_availability(self, client, auth_headers):
|
|
"""Test that some endpoints work even if others fail"""
|
|
# Basic endpoint should work
|
|
health_response = client.get("/health")
|
|
assert health_response.status_code == 200
|
|
|
|
# API endpoints may or may not work depending on system state
|
|
api_response = client.get("/api/v1/shop", headers=auth_headers)
|
|
# Should get either data or a proper error, not a crash
|
|
assert api_response.status_code in [200, 401, 403, 500, 503]
|
|
|
|
def test_error_logging_integration(self, client, auth_headers, caplog):
|
|
"""Test that errors are properly logged for debugging"""
|
|
import logging
|
|
|
|
with caplog.at_level(logging.ERROR):
|
|
# Trigger an error
|
|
client.get("/api/v1/shop/NONEXISTENT", headers=auth_headers)
|
|
|
|
# Check that error was logged (if your app logs 404s as errors)
|
|
# Adjust based on your logging configuration
|
|
error_logs = [record for record in caplog.records if record.levelno >= logging.ERROR]
|
|
# May or may not have logs depending on whether 404s are logged as errors
|