Adding vendor api tests
This commit is contained in:
384
tests/integration/api/v1/vendor/test_authentication.py
vendored
Normal file
384
tests/integration/api/v1/vendor/test_authentication.py
vendored
Normal file
@@ -0,0 +1,384 @@
|
||||
# tests/integration/api/v1/test_vendor_api_authentication.py
|
||||
"""
|
||||
Integration tests for vendor API authentication using get_current_vendor_api.
|
||||
|
||||
These tests verify that:
|
||||
1. Vendor API endpoints require Authorization header (not cookies)
|
||||
2. Only vendor-role users can access vendor API endpoints
|
||||
3. Admin users are blocked from vendor API routes
|
||||
4. Invalid/expired tokens are rejected
|
||||
5. Vendor context middleware works correctly with API authentication
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from jose import jwt
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.api
|
||||
@pytest.mark.vendor
|
||||
@pytest.mark.auth
|
||||
class TestVendorAPIAuthentication:
|
||||
"""Test authentication for vendor API endpoints using get_current_vendor_api"""
|
||||
|
||||
# ========================================================================
|
||||
# Authentication Tests - /api/v1/vendor/auth/me
|
||||
# ========================================================================
|
||||
|
||||
def test_vendor_auth_me_success(self, client, vendor_user_headers, test_vendor_user):
|
||||
"""Test /auth/me endpoint with valid vendor user token"""
|
||||
response = client.get("/api/v1/vendor/auth/me", headers=vendor_user_headers)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["username"] == test_vendor_user.username
|
||||
assert data["email"] == test_vendor_user.email
|
||||
assert data["role"] == "vendor"
|
||||
assert data["is_active"] is True
|
||||
|
||||
def test_vendor_auth_me_without_token(self, client):
|
||||
"""Test /auth/me endpoint without authorization header"""
|
||||
response = client.get("/api/v1/vendor/auth/me")
|
||||
|
||||
assert response.status_code == 401
|
||||
data = response.json()
|
||||
assert data["error_code"] == "INVALID_TOKEN"
|
||||
assert "Authorization header required" in data["message"]
|
||||
|
||||
def test_vendor_auth_me_invalid_token(self, client):
|
||||
"""Test /auth/me endpoint with invalid token format"""
|
||||
response = client.get(
|
||||
"/api/v1/vendor/auth/me",
|
||||
headers={"Authorization": "Bearer invalid_token_here"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
data = response.json()
|
||||
assert data["error_code"] == "INVALID_TOKEN"
|
||||
|
||||
def test_vendor_auth_me_with_admin_token(self, client, admin_headers, test_admin):
|
||||
"""Test /auth/me endpoint rejects admin users"""
|
||||
response = client.get("/api/v1/vendor/auth/me", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 403
|
||||
data = response.json()
|
||||
assert data["error_code"] == "FORBIDDEN"
|
||||
assert "Admin users cannot access vendor API" in data["message"]
|
||||
|
||||
def test_vendor_auth_me_with_regular_user_token(self, client, auth_headers, test_user):
|
||||
"""Test /auth/me endpoint rejects regular users"""
|
||||
response = client.get("/api/v1/vendor/auth/me", headers=auth_headers)
|
||||
|
||||
assert response.status_code == 403
|
||||
data = response.json()
|
||||
assert data["error_code"] == "FORBIDDEN"
|
||||
assert "vendor API routes" in data["message"].lower()
|
||||
|
||||
def test_vendor_auth_me_expired_token(self, client, test_vendor_user, auth_manager):
|
||||
"""Test /auth/me endpoint with expired token"""
|
||||
# Create expired token
|
||||
expired_payload = {
|
||||
"sub": str(test_vendor_user.id),
|
||||
"username": test_vendor_user.username,
|
||||
"email": test_vendor_user.email,
|
||||
"role": test_vendor_user.role,
|
||||
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
|
||||
"iat": datetime.now(timezone.utc) - timedelta(hours=2),
|
||||
}
|
||||
|
||||
expired_token = jwt.encode(
|
||||
expired_payload,
|
||||
auth_manager.secret_key,
|
||||
algorithm=auth_manager.algorithm
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
"/api/v1/vendor/auth/me",
|
||||
headers={"Authorization": f"Bearer {expired_token}"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
data = response.json()
|
||||
assert data["error_code"] == "TOKEN_EXPIRED"
|
||||
|
||||
# ========================================================================
|
||||
# Dashboard Stats Endpoint Tests - /api/v1/vendor/dashboard/stats
|
||||
# ========================================================================
|
||||
|
||||
def test_vendor_dashboard_stats_success(
|
||||
self, client, vendor_user_headers, test_vendor_with_vendor_user, db
|
||||
):
|
||||
"""Test dashboard stats with valid vendor authentication"""
|
||||
response = client.get(
|
||||
"/api/v1/vendor/dashboard/stats",
|
||||
headers=vendor_user_headers
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "vendor" in data
|
||||
assert "products" in data
|
||||
assert "orders" in data
|
||||
assert "customers" in data
|
||||
assert "revenue" in data
|
||||
|
||||
def test_vendor_dashboard_stats_without_auth(self, client):
|
||||
"""Test dashboard stats without authentication"""
|
||||
response = client.get("/api/v1/vendor/dashboard/stats")
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
def test_vendor_dashboard_stats_with_admin(self, client, admin_headers):
|
||||
"""Test dashboard stats rejects admin users"""
|
||||
response = client.get(
|
||||
"/api/v1/vendor/dashboard/stats",
|
||||
headers=admin_headers
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
data = response.json()
|
||||
assert "Admin users cannot access vendor API" in data["message"]
|
||||
|
||||
def test_vendor_dashboard_stats_with_cookie_only(self, client, test_vendor_user):
|
||||
"""Test dashboard stats does not accept cookie authentication"""
|
||||
# Login to get session cookie
|
||||
login_response = client.post(
|
||||
"/api/v1/vendor/auth/login",
|
||||
json={
|
||||
"username": test_vendor_user.username,
|
||||
"password": "vendorpass123"
|
||||
}
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
|
||||
# Try to access API endpoint with just cookies (no Authorization header)
|
||||
response = client.get("/api/v1/vendor/dashboard/stats")
|
||||
|
||||
# Should fail because get_current_vendor_api requires Authorization header
|
||||
assert response.status_code == 401
|
||||
|
||||
# ========================================================================
|
||||
# CSRF Protection Tests
|
||||
# ========================================================================
|
||||
|
||||
def test_csrf_protection_api_endpoints_require_header(
|
||||
self, client, test_vendor_user
|
||||
):
|
||||
"""Test that API endpoints require Authorization header (CSRF protection)"""
|
||||
# Get a valid session by logging in
|
||||
login_response = client.post(
|
||||
"/api/v1/vendor/auth/login",
|
||||
json={
|
||||
"username": test_vendor_user.username,
|
||||
"password": "vendorpass123"
|
||||
}
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
|
||||
# List of vendor API endpoints that should require header auth
|
||||
api_endpoints = [
|
||||
"/api/v1/vendor/auth/me",
|
||||
"/api/v1/vendor/dashboard/stats",
|
||||
"/api/v1/vendor/products",
|
||||
"/api/v1/vendor/orders",
|
||||
"/api/v1/vendor/profile",
|
||||
"/api/v1/vendor/settings",
|
||||
]
|
||||
|
||||
for endpoint in api_endpoints:
|
||||
# Try to access with just session cookie (no Authorization header)
|
||||
response = client.get(endpoint)
|
||||
|
||||
# All should fail with 401 (header required)
|
||||
assert response.status_code == 401, \
|
||||
f"Endpoint {endpoint} should reject cookie-only auth"
|
||||
|
||||
# ========================================================================
|
||||
# Role-Based Access Control Tests
|
||||
# ========================================================================
|
||||
|
||||
def test_vendor_endpoints_block_non_vendor_roles(
|
||||
self, client, auth_headers, admin_headers
|
||||
):
|
||||
"""Test that vendor API endpoints block non-vendor users"""
|
||||
endpoints = [
|
||||
"/api/v1/vendor/auth/me",
|
||||
"/api/v1/vendor/dashboard/stats",
|
||||
"/api/v1/vendor/profile",
|
||||
]
|
||||
|
||||
for endpoint in endpoints:
|
||||
# Test with regular user token
|
||||
response = client.get(endpoint, headers=auth_headers)
|
||||
assert response.status_code == 403, \
|
||||
f"Endpoint {endpoint} should reject regular users"
|
||||
|
||||
# Test with admin token
|
||||
response = client.get(endpoint, headers=admin_headers)
|
||||
assert response.status_code == 403, \
|
||||
f"Endpoint {endpoint} should reject admin users"
|
||||
|
||||
def test_vendor_api_accepts_only_vendor_role(
|
||||
self, client, vendor_user_headers, test_vendor_user
|
||||
):
|
||||
"""Test that vendor API endpoints accept vendor-role users"""
|
||||
endpoints = [
|
||||
"/api/v1/vendor/auth/me",
|
||||
]
|
||||
|
||||
for endpoint in endpoints:
|
||||
response = client.get(endpoint, headers=vendor_user_headers)
|
||||
assert response.status_code in [200, 404], \
|
||||
f"Endpoint {endpoint} should accept vendor users (got {response.status_code})"
|
||||
|
||||
# ========================================================================
|
||||
# Token Validation Tests
|
||||
# ========================================================================
|
||||
|
||||
def test_malformed_authorization_header(self, client):
|
||||
"""Test various malformed Authorization headers"""
|
||||
malformed_headers = [
|
||||
{"Authorization": "InvalidFormat token123"},
|
||||
{"Authorization": "Bearer"}, # Missing token
|
||||
{"Authorization": "bearer token123"}, # Wrong case
|
||||
{"Authorization": " Bearer token123"}, # Leading space
|
||||
{"Authorization": "Bearer token123"}, # Double space
|
||||
]
|
||||
|
||||
for headers in malformed_headers:
|
||||
response = client.get("/api/v1/vendor/auth/me", headers=headers)
|
||||
assert response.status_code == 401, \
|
||||
f"Should reject malformed header: {headers}"
|
||||
|
||||
def test_token_with_missing_claims(self, client, auth_manager):
|
||||
"""Test token missing required claims"""
|
||||
# Create token without 'role' claim
|
||||
invalid_payload = {
|
||||
"sub": "123",
|
||||
"username": "test",
|
||||
"exp": datetime.now(timezone.utc) + timedelta(hours=1),
|
||||
}
|
||||
|
||||
invalid_token = jwt.encode(
|
||||
invalid_payload,
|
||||
auth_manager.secret_key,
|
||||
algorithm=auth_manager.algorithm
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
"/api/v1/vendor/auth/me",
|
||||
headers={"Authorization": f"Bearer {invalid_token}"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
# ========================================================================
|
||||
# Edge Cases
|
||||
# ========================================================================
|
||||
|
||||
def test_inactive_vendor_user(self, client, db, test_vendor_user, auth_manager):
|
||||
"""Test that inactive vendor users are rejected"""
|
||||
# Deactivate the vendor user
|
||||
test_vendor_user.is_active = False
|
||||
db.add(test_vendor_user)
|
||||
db.commit()
|
||||
|
||||
# Create token for inactive user
|
||||
token_data = auth_manager.create_access_token(test_vendor_user)
|
||||
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
|
||||
|
||||
response = client.get("/api/v1/vendor/auth/me", headers=headers)
|
||||
|
||||
# Should fail because user is inactive
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
# Reactivate for cleanup
|
||||
test_vendor_user.is_active = True
|
||||
db.add(test_vendor_user)
|
||||
db.commit()
|
||||
|
||||
def test_concurrent_requests_with_same_token(
|
||||
self, client, vendor_user_headers
|
||||
):
|
||||
"""Test that the same token can be used for multiple concurrent requests"""
|
||||
# Make multiple requests with the same token
|
||||
responses = []
|
||||
for _ in range(5):
|
||||
response = client.get("/api/v1/vendor/auth/me", headers=vendor_user_headers)
|
||||
responses.append(response)
|
||||
|
||||
# All should succeed
|
||||
for response in responses:
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_vendor_api_with_empty_authorization_header(self, client):
|
||||
"""Test vendor API with empty Authorization header value"""
|
||||
response = client.get(
|
||||
"/api/v1/vendor/auth/me",
|
||||
headers={"Authorization": ""}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.api
|
||||
@pytest.mark.vendor
|
||||
class TestVendorAPIConsistency:
|
||||
"""Test that all vendor API endpoints use consistent authentication"""
|
||||
|
||||
def test_all_vendor_endpoints_require_header_auth(
|
||||
self, client, test_vendor_user
|
||||
):
|
||||
"""Verify all vendor API endpoints require Authorization header"""
|
||||
# Login to establish session
|
||||
client.post(
|
||||
"/api/v1/vendor/auth/login",
|
||||
json={
|
||||
"username": test_vendor_user.username,
|
||||
"password": "vendorpass123"
|
||||
}
|
||||
)
|
||||
|
||||
# All vendor API endpoints (excluding public endpoints like /info)
|
||||
vendor_api_endpoints = [
|
||||
("/api/v1/vendor/auth/me", "GET"),
|
||||
("/api/v1/vendor/dashboard/stats", "GET"),
|
||||
("/api/v1/vendor/profile", "GET"),
|
||||
("/api/v1/vendor/settings", "GET"),
|
||||
("/api/v1/vendor/products", "GET"),
|
||||
("/api/v1/vendor/orders", "GET"),
|
||||
("/api/v1/vendor/customers", "GET"),
|
||||
("/api/v1/vendor/inventory", "GET"),
|
||||
("/api/v1/vendor/analytics", "GET"),
|
||||
]
|
||||
|
||||
for endpoint, method in vendor_api_endpoints:
|
||||
if method == "GET":
|
||||
response = client.get(endpoint)
|
||||
elif method == "POST":
|
||||
response = client.post(endpoint, json={})
|
||||
|
||||
# All should reject cookie-only auth with 401
|
||||
assert response.status_code == 401, \
|
||||
f"Endpoint {endpoint} should require Authorization header (got {response.status_code})"
|
||||
|
||||
def test_vendor_endpoints_accept_vendor_token(
|
||||
self, client, vendor_user_headers, test_vendor_with_vendor_user
|
||||
):
|
||||
"""Verify all vendor API endpoints accept valid vendor tokens"""
|
||||
# Endpoints that should work with just vendor authentication
|
||||
# (may return 404 or other errors due to missing data, but not 401/403)
|
||||
vendor_api_endpoints = [
|
||||
"/api/v1/vendor/auth/me",
|
||||
"/api/v1/vendor/profile",
|
||||
"/api/v1/vendor/settings",
|
||||
]
|
||||
|
||||
for endpoint in vendor_api_endpoints:
|
||||
response = client.get(endpoint, headers=vendor_user_headers)
|
||||
|
||||
# Should not be authentication/authorization errors
|
||||
assert response.status_code not in [401, 403], \
|
||||
f"Endpoint {endpoint} should accept vendor token (got {response.status_code}: {response.text})"
|
||||
Reference in New Issue
Block a user