Split integration tests into logical admin/ and vendor/ subdirectories for better organization. Updated fixture imports and test structure. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
388 lines
15 KiB
Python
388 lines
15 KiB
Python
# tests/integration/api/v1/test_vendor_api_authentication.py
|
|
"""
|
|
Integration tests for vendor API authentication using get_current_vendor_api.
|
|
|
|
These tests verify that:
|
|
1. Vendor API endpoints require Authorization header (not cookies)
|
|
2. Only vendor-role users can access vendor API endpoints
|
|
3. Admin users are blocked from vendor API routes
|
|
4. Invalid/expired tokens are rejected
|
|
5. Vendor context middleware works correctly with API authentication
|
|
"""
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
import pytest
|
|
from jose import jwt
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.api
|
|
@pytest.mark.vendor
|
|
@pytest.mark.auth
|
|
class TestVendorAPIAuthentication:
|
|
"""Test authentication for vendor API endpoints using get_current_vendor_api"""
|
|
|
|
# ========================================================================
|
|
# Authentication Tests - /api/v1/vendor/auth/me
|
|
# ========================================================================
|
|
|
|
def test_vendor_auth_me_success(
|
|
self, client, vendor_user_headers, test_vendor_user
|
|
):
|
|
"""Test /auth/me endpoint with valid vendor user token"""
|
|
response = client.get("/api/v1/vendor/auth/me", headers=vendor_user_headers)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["username"] == test_vendor_user.username
|
|
assert data["email"] == test_vendor_user.email
|
|
assert data["role"] == "vendor"
|
|
assert data["is_active"] is True
|
|
|
|
def test_vendor_auth_me_without_token(self, client):
|
|
"""Test /auth/me endpoint without authorization header"""
|
|
response = client.get("/api/v1/vendor/auth/me")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["error_code"] == "INVALID_TOKEN"
|
|
assert "Authorization header required" in data["message"]
|
|
|
|
def test_vendor_auth_me_invalid_token(self, client):
|
|
"""Test /auth/me endpoint with invalid token format"""
|
|
response = client.get(
|
|
"/api/v1/vendor/auth/me",
|
|
headers={"Authorization": "Bearer invalid_token_here"},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["error_code"] == "INVALID_TOKEN"
|
|
|
|
def test_vendor_auth_me_with_admin_token(self, client, admin_headers, test_admin):
|
|
"""Test /auth/me endpoint rejects admin users"""
|
|
response = client.get("/api/v1/vendor/auth/me", headers=admin_headers)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert data["error_code"] == "INSUFFICIENT_PERMISSIONS"
|
|
assert "Vendor access only" in data["message"]
|
|
|
|
def test_vendor_auth_me_with_regular_user_token(
|
|
self, client, auth_headers, test_user
|
|
):
|
|
"""Test /auth/me endpoint rejects regular users"""
|
|
response = client.get("/api/v1/vendor/auth/me", headers=auth_headers)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert data["error_code"] == "INSUFFICIENT_PERMISSIONS"
|
|
# Message may be "Vendor access only" or "Vendor privileges required"
|
|
assert "vendor" in data["message"].lower()
|
|
|
|
def test_vendor_auth_me_expired_token(self, client, test_vendor_user, auth_manager):
|
|
"""Test /auth/me endpoint with expired token"""
|
|
# Create expired token
|
|
expired_payload = {
|
|
"sub": str(test_vendor_user.id),
|
|
"username": test_vendor_user.username,
|
|
"email": test_vendor_user.email,
|
|
"role": test_vendor_user.role,
|
|
"exp": datetime.now(UTC) - timedelta(hours=1),
|
|
"iat": datetime.now(UTC) - timedelta(hours=2),
|
|
}
|
|
|
|
expired_token = jwt.encode(
|
|
expired_payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
|
|
)
|
|
|
|
response = client.get(
|
|
"/api/v1/vendor/auth/me",
|
|
headers={"Authorization": f"Bearer {expired_token}"},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["error_code"] == "TOKEN_EXPIRED"
|
|
|
|
# ========================================================================
|
|
# Dashboard Stats Endpoint Tests - /api/v1/vendor/dashboard/stats
|
|
# ========================================================================
|
|
|
|
def test_vendor_dashboard_stats_success(
|
|
self, client, vendor_user_headers, test_vendor_with_vendor_user, db
|
|
):
|
|
"""Test dashboard stats with valid vendor authentication"""
|
|
response = client.get(
|
|
"/api/v1/vendor/dashboard/stats", headers=vendor_user_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "vendor" in data
|
|
assert "products" in data
|
|
assert "orders" in data
|
|
assert "customers" in data
|
|
assert "revenue" in data
|
|
|
|
def test_vendor_dashboard_stats_without_auth(self, client):
|
|
"""Test dashboard stats without authentication"""
|
|
response = client.get("/api/v1/vendor/dashboard/stats")
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_vendor_dashboard_stats_with_admin(self, client, admin_headers):
|
|
"""Test dashboard stats rejects admin users"""
|
|
response = client.get("/api/v1/vendor/dashboard/stats", headers=admin_headers)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert "Vendor access only" in data["message"]
|
|
|
|
def test_vendor_dashboard_stats_with_cookie_only(
|
|
self, client, test_vendor_user, test_vendor_with_vendor_user
|
|
):
|
|
"""Test dashboard stats does not accept cookie authentication"""
|
|
# Login to get session cookie
|
|
login_response = client.post(
|
|
"/api/v1/vendor/auth/login",
|
|
json={
|
|
"email_or_username": test_vendor_user.username,
|
|
"password": "vendorpass123",
|
|
},
|
|
)
|
|
assert login_response.status_code == 200
|
|
|
|
# Try to access API endpoint with just cookies (no Authorization header)
|
|
response = client.get("/api/v1/vendor/dashboard/stats")
|
|
|
|
# Should fail because get_current_vendor_api requires Authorization header
|
|
assert response.status_code == 401
|
|
|
|
# ========================================================================
|
|
# CSRF Protection Tests
|
|
# ========================================================================
|
|
|
|
def test_csrf_protection_api_endpoints_require_header(
|
|
self, client, test_vendor_user, test_vendor_with_vendor_user
|
|
):
|
|
"""Test that API endpoints require Authorization header (CSRF protection)"""
|
|
# Get a valid session by logging in
|
|
login_response = client.post(
|
|
"/api/v1/vendor/auth/login",
|
|
json={
|
|
"email_or_username": test_vendor_user.username,
|
|
"password": "vendorpass123",
|
|
},
|
|
)
|
|
assert login_response.status_code == 200
|
|
|
|
# List of vendor API endpoints that should require header auth
|
|
api_endpoints = [
|
|
"/api/v1/vendor/auth/me",
|
|
"/api/v1/vendor/dashboard/stats",
|
|
"/api/v1/vendor/products",
|
|
"/api/v1/vendor/orders",
|
|
"/api/v1/vendor/profile",
|
|
"/api/v1/vendor/settings",
|
|
]
|
|
|
|
for endpoint in api_endpoints:
|
|
# Try to access with just session cookie (no Authorization header)
|
|
response = client.get(endpoint)
|
|
|
|
# All should fail with 401 (header required)
|
|
assert response.status_code == 401, (
|
|
f"Endpoint {endpoint} should reject cookie-only auth"
|
|
)
|
|
|
|
# ========================================================================
|
|
# Role-Based Access Control Tests
|
|
# ========================================================================
|
|
|
|
def test_vendor_endpoints_block_non_vendor_roles(
|
|
self, client, auth_headers, admin_headers
|
|
):
|
|
"""Test that vendor API endpoints block non-vendor users"""
|
|
endpoints = [
|
|
"/api/v1/vendor/auth/me",
|
|
"/api/v1/vendor/dashboard/stats",
|
|
"/api/v1/vendor/profile",
|
|
]
|
|
|
|
for endpoint in endpoints:
|
|
# Test with regular user token
|
|
response = client.get(endpoint, headers=auth_headers)
|
|
assert response.status_code == 403, (
|
|
f"Endpoint {endpoint} should reject regular users"
|
|
)
|
|
|
|
# Test with admin token
|
|
response = client.get(endpoint, headers=admin_headers)
|
|
assert response.status_code == 403, (
|
|
f"Endpoint {endpoint} should reject admin users"
|
|
)
|
|
|
|
def test_vendor_api_accepts_only_vendor_role(
|
|
self, client, vendor_user_headers, test_vendor_user
|
|
):
|
|
"""Test that vendor API endpoints accept vendor-role users"""
|
|
endpoints = [
|
|
"/api/v1/vendor/auth/me",
|
|
]
|
|
|
|
for endpoint in endpoints:
|
|
response = client.get(endpoint, headers=vendor_user_headers)
|
|
assert response.status_code in [
|
|
200,
|
|
404,
|
|
], (
|
|
f"Endpoint {endpoint} should accept vendor users (got {response.status_code})"
|
|
)
|
|
|
|
# ========================================================================
|
|
# Token Validation Tests
|
|
# ========================================================================
|
|
|
|
def test_malformed_authorization_header(self, client):
|
|
"""Test various malformed Authorization headers"""
|
|
malformed_headers = [
|
|
{"Authorization": "InvalidFormat token123"},
|
|
{"Authorization": "Bearer"}, # Missing token
|
|
{"Authorization": "bearer token123"}, # Wrong case
|
|
{"Authorization": " Bearer token123"}, # Leading space
|
|
{"Authorization": "Bearer token123"}, # Double space
|
|
]
|
|
|
|
for headers in malformed_headers:
|
|
response = client.get("/api/v1/vendor/auth/me", headers=headers)
|
|
assert response.status_code == 401, (
|
|
f"Should reject malformed header: {headers}"
|
|
)
|
|
|
|
def test_token_with_missing_claims(self, client, auth_manager):
|
|
"""Test token missing required claims"""
|
|
# Create token without 'role' claim
|
|
invalid_payload = {
|
|
"sub": "123",
|
|
"username": "test",
|
|
"exp": datetime.now(UTC) + timedelta(hours=1),
|
|
}
|
|
|
|
invalid_token = jwt.encode(
|
|
invalid_payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
|
|
)
|
|
|
|
response = client.get(
|
|
"/api/v1/vendor/auth/me",
|
|
headers={"Authorization": f"Bearer {invalid_token}"},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
# ========================================================================
|
|
# Edge Cases
|
|
# ========================================================================
|
|
|
|
def test_inactive_vendor_user(self, client, db, test_vendor_user, auth_manager):
|
|
"""Test that inactive vendor users are rejected"""
|
|
# Deactivate the vendor user
|
|
test_vendor_user.is_active = False
|
|
db.add(test_vendor_user)
|
|
db.commit()
|
|
|
|
# Create token for inactive user
|
|
token_data = auth_manager.create_access_token(test_vendor_user)
|
|
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
|
|
|
|
response = client.get("/api/v1/vendor/auth/me", headers=headers)
|
|
|
|
# Should fail because user is inactive
|
|
assert response.status_code in [401, 403, 404]
|
|
|
|
# Reactivate for cleanup
|
|
test_vendor_user.is_active = True
|
|
db.add(test_vendor_user)
|
|
db.commit()
|
|
|
|
def test_concurrent_requests_with_same_token(self, client, vendor_user_headers):
|
|
"""Test that the same token can be used for multiple concurrent requests"""
|
|
# Make multiple requests with the same token
|
|
responses = []
|
|
for _ in range(5):
|
|
response = client.get("/api/v1/vendor/auth/me", headers=vendor_user_headers)
|
|
responses.append(response)
|
|
|
|
# All should succeed
|
|
for response in responses:
|
|
assert response.status_code == 200
|
|
|
|
def test_vendor_api_with_empty_authorization_header(self, client):
|
|
"""Test vendor API with empty Authorization header value"""
|
|
response = client.get("/api/v1/vendor/auth/me", headers={"Authorization": ""})
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.api
|
|
@pytest.mark.vendor
|
|
class TestVendorAPIConsistency:
|
|
"""Test that all vendor API endpoints use consistent authentication"""
|
|
|
|
def test_all_vendor_endpoints_require_header_auth(self, client, test_vendor_user):
|
|
"""Verify all vendor API endpoints require Authorization header"""
|
|
# Login to establish session
|
|
client.post(
|
|
"/api/v1/vendor/auth/login",
|
|
json={"username": test_vendor_user.username, "password": "vendorpass123"},
|
|
)
|
|
|
|
# All vendor API endpoints (excluding public endpoints like /info)
|
|
vendor_api_endpoints = [
|
|
("/api/v1/vendor/auth/me", "GET"),
|
|
("/api/v1/vendor/dashboard/stats", "GET"),
|
|
("/api/v1/vendor/profile", "GET"),
|
|
("/api/v1/vendor/settings", "GET"),
|
|
("/api/v1/vendor/products", "GET"),
|
|
("/api/v1/vendor/orders", "GET"),
|
|
("/api/v1/vendor/customers", "GET"),
|
|
("/api/v1/vendor/inventory", "GET"),
|
|
("/api/v1/vendor/analytics", "GET"),
|
|
]
|
|
|
|
for endpoint, method in vendor_api_endpoints:
|
|
if method == "GET":
|
|
response = client.get(endpoint)
|
|
elif method == "POST":
|
|
response = client.post(endpoint, json={})
|
|
|
|
# All should reject cookie-only auth with 401
|
|
assert response.status_code == 401, (
|
|
f"Endpoint {endpoint} should require Authorization header (got {response.status_code})"
|
|
)
|
|
|
|
def test_vendor_endpoints_accept_vendor_token(
|
|
self, client, vendor_user_headers, test_vendor_with_vendor_user
|
|
):
|
|
"""Verify all vendor API endpoints accept valid vendor tokens"""
|
|
# Endpoints that should work with just vendor authentication
|
|
# (may return 404 or other errors due to missing data, but not 401/403)
|
|
vendor_api_endpoints = [
|
|
"/api/v1/vendor/auth/me",
|
|
"/api/v1/vendor/profile",
|
|
"/api/v1/vendor/settings",
|
|
]
|
|
|
|
for endpoint in vendor_api_endpoints:
|
|
response = client.get(endpoint, headers=vendor_user_headers)
|
|
|
|
# Should not be authentication/authorization errors
|
|
assert response.status_code not in [
|
|
401,
|
|
403,
|
|
], (
|
|
f"Endpoint {endpoint} should accept vendor token (got {response.status_code}: {response.text})"
|
|
)
|