Files
orion/tests/integration/api/v1/vendor/test_authentication.py

385 lines
15 KiB
Python

# tests/integration/api/v1/test_vendor_api_authentication.py
"""
Integration tests for vendor API authentication using get_current_vendor_api.
These tests verify that:
1. Vendor API endpoints require Authorization header (not cookies)
2. Only vendor-role users can access vendor API endpoints
3. Admin users are blocked from vendor API routes
4. Invalid/expired tokens are rejected
5. Vendor context middleware works correctly with API authentication
"""
import pytest
from datetime import datetime, timedelta, timezone
from jose import jwt
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.vendor
@pytest.mark.auth
class TestVendorAPIAuthentication:
"""Test authentication for vendor API endpoints using get_current_vendor_api"""
# ========================================================================
# Authentication Tests - /api/v1/vendor/auth/me
# ========================================================================
def test_vendor_auth_me_success(self, client, vendor_user_headers, test_vendor_user):
"""Test /auth/me endpoint with valid vendor user token"""
response = client.get("/api/v1/vendor/auth/me", headers=vendor_user_headers)
assert response.status_code == 200
data = response.json()
assert data["username"] == test_vendor_user.username
assert data["email"] == test_vendor_user.email
assert data["role"] == "vendor"
assert data["is_active"] is True
def test_vendor_auth_me_without_token(self, client):
"""Test /auth/me endpoint without authorization header"""
response = client.get("/api/v1/vendor/auth/me")
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
assert "Authorization header required" in data["message"]
def test_vendor_auth_me_invalid_token(self, client):
"""Test /auth/me endpoint with invalid token format"""
response = client.get(
"/api/v1/vendor/auth/me",
headers={"Authorization": "Bearer invalid_token_here"}
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
def test_vendor_auth_me_with_admin_token(self, client, admin_headers, test_admin):
"""Test /auth/me endpoint rejects admin users"""
response = client.get("/api/v1/vendor/auth/me", headers=admin_headers)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "FORBIDDEN"
assert "Admin users cannot access vendor API" in data["message"]
def test_vendor_auth_me_with_regular_user_token(self, client, auth_headers, test_user):
"""Test /auth/me endpoint rejects regular users"""
response = client.get("/api/v1/vendor/auth/me", headers=auth_headers)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "FORBIDDEN"
assert "vendor API routes" in data["message"].lower()
def test_vendor_auth_me_expired_token(self, client, test_vendor_user, auth_manager):
"""Test /auth/me endpoint with expired token"""
# Create expired token
expired_payload = {
"sub": str(test_vendor_user.id),
"username": test_vendor_user.username,
"email": test_vendor_user.email,
"role": test_vendor_user.role,
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
"iat": datetime.now(timezone.utc) - timedelta(hours=2),
}
expired_token = jwt.encode(
expired_payload,
auth_manager.secret_key,
algorithm=auth_manager.algorithm
)
response = client.get(
"/api/v1/vendor/auth/me",
headers={"Authorization": f"Bearer {expired_token}"}
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "TOKEN_EXPIRED"
# ========================================================================
# Dashboard Stats Endpoint Tests - /api/v1/vendor/dashboard/stats
# ========================================================================
def test_vendor_dashboard_stats_success(
self, client, vendor_user_headers, test_vendor_with_vendor_user, db
):
"""Test dashboard stats with valid vendor authentication"""
response = client.get(
"/api/v1/vendor/dashboard/stats",
headers=vendor_user_headers
)
assert response.status_code == 200
data = response.json()
assert "vendor" in data
assert "products" in data
assert "orders" in data
assert "customers" in data
assert "revenue" in data
def test_vendor_dashboard_stats_without_auth(self, client):
"""Test dashboard stats without authentication"""
response = client.get("/api/v1/vendor/dashboard/stats")
assert response.status_code == 401
def test_vendor_dashboard_stats_with_admin(self, client, admin_headers):
"""Test dashboard stats rejects admin users"""
response = client.get(
"/api/v1/vendor/dashboard/stats",
headers=admin_headers
)
assert response.status_code == 403
data = response.json()
assert "Admin users cannot access vendor API" in data["message"]
def test_vendor_dashboard_stats_with_cookie_only(self, client, test_vendor_user):
"""Test dashboard stats does not accept cookie authentication"""
# Login to get session cookie
login_response = client.post(
"/api/v1/vendor/auth/login",
json={
"username": test_vendor_user.username,
"password": "vendorpass123"
}
)
assert login_response.status_code == 200
# Try to access API endpoint with just cookies (no Authorization header)
response = client.get("/api/v1/vendor/dashboard/stats")
# Should fail because get_current_vendor_api requires Authorization header
assert response.status_code == 401
# ========================================================================
# CSRF Protection Tests
# ========================================================================
def test_csrf_protection_api_endpoints_require_header(
self, client, test_vendor_user
):
"""Test that API endpoints require Authorization header (CSRF protection)"""
# Get a valid session by logging in
login_response = client.post(
"/api/v1/vendor/auth/login",
json={
"username": test_vendor_user.username,
"password": "vendorpass123"
}
)
assert login_response.status_code == 200
# List of vendor API endpoints that should require header auth
api_endpoints = [
"/api/v1/vendor/auth/me",
"/api/v1/vendor/dashboard/stats",
"/api/v1/vendor/products",
"/api/v1/vendor/orders",
"/api/v1/vendor/profile",
"/api/v1/vendor/settings",
]
for endpoint in api_endpoints:
# Try to access with just session cookie (no Authorization header)
response = client.get(endpoint)
# All should fail with 401 (header required)
assert response.status_code == 401, \
f"Endpoint {endpoint} should reject cookie-only auth"
# ========================================================================
# Role-Based Access Control Tests
# ========================================================================
def test_vendor_endpoints_block_non_vendor_roles(
self, client, auth_headers, admin_headers
):
"""Test that vendor API endpoints block non-vendor users"""
endpoints = [
"/api/v1/vendor/auth/me",
"/api/v1/vendor/dashboard/stats",
"/api/v1/vendor/profile",
]
for endpoint in endpoints:
# Test with regular user token
response = client.get(endpoint, headers=auth_headers)
assert response.status_code == 403, \
f"Endpoint {endpoint} should reject regular users"
# Test with admin token
response = client.get(endpoint, headers=admin_headers)
assert response.status_code == 403, \
f"Endpoint {endpoint} should reject admin users"
def test_vendor_api_accepts_only_vendor_role(
self, client, vendor_user_headers, test_vendor_user
):
"""Test that vendor API endpoints accept vendor-role users"""
endpoints = [
"/api/v1/vendor/auth/me",
]
for endpoint in endpoints:
response = client.get(endpoint, headers=vendor_user_headers)
assert response.status_code in [200, 404], \
f"Endpoint {endpoint} should accept vendor users (got {response.status_code})"
# ========================================================================
# Token Validation Tests
# ========================================================================
def test_malformed_authorization_header(self, client):
"""Test various malformed Authorization headers"""
malformed_headers = [
{"Authorization": "InvalidFormat token123"},
{"Authorization": "Bearer"}, # Missing token
{"Authorization": "bearer token123"}, # Wrong case
{"Authorization": " Bearer token123"}, # Leading space
{"Authorization": "Bearer token123"}, # Double space
]
for headers in malformed_headers:
response = client.get("/api/v1/vendor/auth/me", headers=headers)
assert response.status_code == 401, \
f"Should reject malformed header: {headers}"
def test_token_with_missing_claims(self, client, auth_manager):
"""Test token missing required claims"""
# Create token without 'role' claim
invalid_payload = {
"sub": "123",
"username": "test",
"exp": datetime.now(timezone.utc) + timedelta(hours=1),
}
invalid_token = jwt.encode(
invalid_payload,
auth_manager.secret_key,
algorithm=auth_manager.algorithm
)
response = client.get(
"/api/v1/vendor/auth/me",
headers={"Authorization": f"Bearer {invalid_token}"}
)
assert response.status_code == 401
# ========================================================================
# Edge Cases
# ========================================================================
def test_inactive_vendor_user(self, client, db, test_vendor_user, auth_manager):
"""Test that inactive vendor users are rejected"""
# Deactivate the vendor user
test_vendor_user.is_active = False
db.add(test_vendor_user)
db.commit()
# Create token for inactive user
token_data = auth_manager.create_access_token(test_vendor_user)
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
response = client.get("/api/v1/vendor/auth/me", headers=headers)
# Should fail because user is inactive
assert response.status_code in [401, 403, 404]
# Reactivate for cleanup
test_vendor_user.is_active = True
db.add(test_vendor_user)
db.commit()
def test_concurrent_requests_with_same_token(
self, client, vendor_user_headers
):
"""Test that the same token can be used for multiple concurrent requests"""
# Make multiple requests with the same token
responses = []
for _ in range(5):
response = client.get("/api/v1/vendor/auth/me", headers=vendor_user_headers)
responses.append(response)
# All should succeed
for response in responses:
assert response.status_code == 200
def test_vendor_api_with_empty_authorization_header(self, client):
"""Test vendor API with empty Authorization header value"""
response = client.get(
"/api/v1/vendor/auth/me",
headers={"Authorization": ""}
)
assert response.status_code == 401
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.vendor
class TestVendorAPIConsistency:
"""Test that all vendor API endpoints use consistent authentication"""
def test_all_vendor_endpoints_require_header_auth(
self, client, test_vendor_user
):
"""Verify all vendor API endpoints require Authorization header"""
# Login to establish session
client.post(
"/api/v1/vendor/auth/login",
json={
"username": test_vendor_user.username,
"password": "vendorpass123"
}
)
# All vendor API endpoints (excluding public endpoints like /info)
vendor_api_endpoints = [
("/api/v1/vendor/auth/me", "GET"),
("/api/v1/vendor/dashboard/stats", "GET"),
("/api/v1/vendor/profile", "GET"),
("/api/v1/vendor/settings", "GET"),
("/api/v1/vendor/products", "GET"),
("/api/v1/vendor/orders", "GET"),
("/api/v1/vendor/customers", "GET"),
("/api/v1/vendor/inventory", "GET"),
("/api/v1/vendor/analytics", "GET"),
]
for endpoint, method in vendor_api_endpoints:
if method == "GET":
response = client.get(endpoint)
elif method == "POST":
response = client.post(endpoint, json={})
# All should reject cookie-only auth with 401
assert response.status_code == 401, \
f"Endpoint {endpoint} should require Authorization header (got {response.status_code})"
def test_vendor_endpoints_accept_vendor_token(
self, client, vendor_user_headers, test_vendor_with_vendor_user
):
"""Verify all vendor API endpoints accept valid vendor tokens"""
# Endpoints that should work with just vendor authentication
# (may return 404 or other errors due to missing data, but not 401/403)
vendor_api_endpoints = [
"/api/v1/vendor/auth/me",
"/api/v1/vendor/profile",
"/api/v1/vendor/settings",
]
for endpoint in vendor_api_endpoints:
response = client.get(endpoint, headers=vendor_user_headers)
# Should not be authentication/authorization errors
assert response.status_code not in [401, 403], \
f"Endpoint {endpoint} should accept vendor token (got {response.status_code}: {response.text})"