120 lines
4.5 KiB
Python
120 lines
4.5 KiB
Python
# tests/test_security.py
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
|
|
|
|
class TestSecurity:
|
|
def test_debug_direct_bearer(self, client):
|
|
"""Test HTTPBearer directly"""
|
|
|
|
response = client.get("/api/v1/debug-bearer")
|
|
print(f"Direct Bearer - Status: {response.status_code}")
|
|
print(
|
|
f"Direct Bearer - Response: {response.json() if response.content else 'No content'}"
|
|
)
|
|
|
|
def test_debug_dependencies(self, client):
|
|
"""Debug the dependency chain step by step"""
|
|
|
|
# Test 1: Direct endpoint with no auth
|
|
response = client.get("/api/v1/admin/users")
|
|
print(f"Admin endpoint - Status: {response.status_code}")
|
|
try:
|
|
print(f"Admin endpoint - Response: {response.json()}")
|
|
except:
|
|
print(f"Admin endpoint - Raw: {response.content}")
|
|
|
|
# Test 2: Try a regular endpoint that uses get_current_user
|
|
response2 = client.get(
|
|
"/api/v1/product"
|
|
) # or any endpoint with get_current_user
|
|
print(f"Regular endpoint - Status: {response2.status_code}")
|
|
try:
|
|
print(f"Regular endpoint - Response: {response2.json()}")
|
|
except:
|
|
print(f"Regular endpoint - Raw: {response2.content}")
|
|
|
|
def test_debug_available_routes(self, client):
|
|
"""Debug test to see all available routes"""
|
|
print("\n=== All Available Routes ===")
|
|
for route in client.app.routes:
|
|
if hasattr(route, "path") and hasattr(route, "methods"):
|
|
print(f"{list(route.methods)} {route.path}")
|
|
|
|
print("\n=== Testing Product Endpoint Variations ===")
|
|
variations = [
|
|
"/api/v1/product", # Your current attempt
|
|
"/api/v1/product/", # With trailing slash
|
|
"/api/v1/product/list", # With list endpoint
|
|
"/api/v1/product/all", # With all endpoint
|
|
]
|
|
|
|
for path in variations:
|
|
response = client.get(path)
|
|
print(f"{path}: Status {response.status_code}")
|
|
|
|
def test_protected_endpoint_without_auth(self, client):
|
|
"""Test that protected endpoints reject unauthenticated requests"""
|
|
protected_endpoints = [
|
|
"/api/v1/admin/users",
|
|
"/api/v1/admin/shops",
|
|
"/api/v1/marketplace/import-jobs",
|
|
"/api/v1/product",
|
|
"/api/v1/shop",
|
|
"/api/v1/stats",
|
|
"/api/v1/stock",
|
|
]
|
|
|
|
for endpoint in protected_endpoints:
|
|
response = client.get(endpoint)
|
|
assert response.status_code == 401 # Authentication missing
|
|
|
|
def test_protected_endpoint_with_invalid_token(self, client):
|
|
"""Test protected endpoints with invalid token"""
|
|
headers = {"Authorization": "Bearer invalid_token_here"}
|
|
|
|
response = client.get("/api/v1/product", headers=headers)
|
|
assert response.status_code == 401 # Token is not valid
|
|
|
|
def test_admin_endpoint_requires_admin_role(self, client, auth_headers):
|
|
"""Test that admin endpoints require admin role"""
|
|
response = client.get("/api/v1/admin/users", headers=auth_headers)
|
|
assert (
|
|
response.status_code == 403
|
|
) # Token is valid but user does not have access.
|
|
# Regular user should be denied
|
|
|
|
def test_sql_injection_prevention(self, client, auth_headers):
|
|
"""Test SQL injection prevention in search parameters"""
|
|
# Try SQL injection in search parameter
|
|
malicious_search = "'; DROP TABLE products; --"
|
|
|
|
response = client.get(
|
|
f"/api/v1/product?search={malicious_search}", headers=auth_headers
|
|
)
|
|
|
|
# Should not crash and should return normal response
|
|
assert response.status_code == 200
|
|
# Database should still be intact (no products dropped)
|
|
|
|
# def test_input_validation(self, client, auth_headers):
|
|
# # TODO: implement sanitization
|
|
# """Test input validation and sanitization"""
|
|
# # Test XSS attempt in product creation
|
|
# xss_payload = "<script>alert('xss')</script>"
|
|
#
|
|
# product_data = {
|
|
# "product_id": "XSS_TEST",
|
|
# "title": xss_payload,
|
|
# "description": xss_payload,
|
|
# }
|
|
#
|
|
# response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
|
|
#
|
|
# assert response.status_code == 200
|
|
# data = response.json()
|
|
# assert "<script>" not in data["title"]
|
|
# assert "<script>" in data["title"]
|