Files
orion/tests/integration/api/v1/vendor/test_authentication.py
Samir Boulahtit 238c1ec9b8 refactor: modernize code quality tooling with Ruff
- Replace black, isort, and flake8 with Ruff (all-in-one linter and formatter)
- Add comprehensive pyproject.toml configuration
- Simplify Makefile code quality targets
- Configure exclusions for venv/.venv in pyproject.toml
- Auto-fix 1,359 linting issues across codebase

Benefits:
- Much faster builds (Ruff is written in Rust)
- Single tool replaces multiple tools
- More comprehensive rule set (UP, B, C4, SIM, PIE, RET, Q)
- All configuration centralized in pyproject.toml
- Better import sorting and formatting consistency

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 19:37:38 +01:00

379 lines
14 KiB
Python

# tests/integration/api/v1/test_vendor_api_authentication.py
"""
Integration tests for vendor API authentication using get_current_vendor_api.
These tests verify that:
1. Vendor API endpoints require Authorization header (not cookies)
2. Only vendor-role users can access vendor API endpoints
3. Admin users are blocked from vendor API routes
4. Invalid/expired tokens are rejected
5. Vendor context middleware works correctly with API authentication
"""
from datetime import UTC, datetime, timedelta
import pytest
from jose import jwt
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.vendor
@pytest.mark.auth
class TestVendorAPIAuthentication:
"""Test authentication for vendor API endpoints using get_current_vendor_api"""
# ========================================================================
# Authentication Tests - /api/v1/vendor/auth/me
# ========================================================================
def test_vendor_auth_me_success(
self, client, vendor_user_headers, test_vendor_user
):
"""Test /auth/me endpoint with valid vendor user token"""
response = client.get("/api/v1/vendor/auth/me", headers=vendor_user_headers)
assert response.status_code == 200
data = response.json()
assert data["username"] == test_vendor_user.username
assert data["email"] == test_vendor_user.email
assert data["role"] == "vendor"
assert data["is_active"] is True
def test_vendor_auth_me_without_token(self, client):
"""Test /auth/me endpoint without authorization header"""
response = client.get("/api/v1/vendor/auth/me")
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
assert "Authorization header required" in data["message"]
def test_vendor_auth_me_invalid_token(self, client):
"""Test /auth/me endpoint with invalid token format"""
response = client.get(
"/api/v1/vendor/auth/me",
headers={"Authorization": "Bearer invalid_token_here"},
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
def test_vendor_auth_me_with_admin_token(self, client, admin_headers, test_admin):
"""Test /auth/me endpoint rejects admin users"""
response = client.get("/api/v1/vendor/auth/me", headers=admin_headers)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "FORBIDDEN"
assert "Admin users cannot access vendor API" in data["message"]
def test_vendor_auth_me_with_regular_user_token(
self, client, auth_headers, test_user
):
"""Test /auth/me endpoint rejects regular users"""
response = client.get("/api/v1/vendor/auth/me", headers=auth_headers)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "FORBIDDEN"
assert "vendor API routes" in data["message"].lower()
def test_vendor_auth_me_expired_token(self, client, test_vendor_user, auth_manager):
"""Test /auth/me endpoint with expired token"""
# Create expired token
expired_payload = {
"sub": str(test_vendor_user.id),
"username": test_vendor_user.username,
"email": test_vendor_user.email,
"role": test_vendor_user.role,
"exp": datetime.now(UTC) - timedelta(hours=1),
"iat": datetime.now(UTC) - timedelta(hours=2),
}
expired_token = jwt.encode(
expired_payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
)
response = client.get(
"/api/v1/vendor/auth/me",
headers={"Authorization": f"Bearer {expired_token}"},
)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "TOKEN_EXPIRED"
# ========================================================================
# Dashboard Stats Endpoint Tests - /api/v1/vendor/dashboard/stats
# ========================================================================
def test_vendor_dashboard_stats_success(
self, client, vendor_user_headers, test_vendor_with_vendor_user, db
):
"""Test dashboard stats with valid vendor authentication"""
response = client.get(
"/api/v1/vendor/dashboard/stats", headers=vendor_user_headers
)
assert response.status_code == 200
data = response.json()
assert "vendor" in data
assert "products" in data
assert "orders" in data
assert "customers" in data
assert "revenue" in data
def test_vendor_dashboard_stats_without_auth(self, client):
"""Test dashboard stats without authentication"""
response = client.get("/api/v1/vendor/dashboard/stats")
assert response.status_code == 401
def test_vendor_dashboard_stats_with_admin(self, client, admin_headers):
"""Test dashboard stats rejects admin users"""
response = client.get("/api/v1/vendor/dashboard/stats", headers=admin_headers)
assert response.status_code == 403
data = response.json()
assert "Admin users cannot access vendor API" in data["message"]
def test_vendor_dashboard_stats_with_cookie_only(self, client, test_vendor_user):
"""Test dashboard stats does not accept cookie authentication"""
# Login to get session cookie
login_response = client.post(
"/api/v1/vendor/auth/login",
json={"username": test_vendor_user.username, "password": "vendorpass123"},
)
assert login_response.status_code == 200
# Try to access API endpoint with just cookies (no Authorization header)
response = client.get("/api/v1/vendor/dashboard/stats")
# Should fail because get_current_vendor_api requires Authorization header
assert response.status_code == 401
# ========================================================================
# CSRF Protection Tests
# ========================================================================
def test_csrf_protection_api_endpoints_require_header(
self, client, test_vendor_user
):
"""Test that API endpoints require Authorization header (CSRF protection)"""
# Get a valid session by logging in
login_response = client.post(
"/api/v1/vendor/auth/login",
json={"username": test_vendor_user.username, "password": "vendorpass123"},
)
assert login_response.status_code == 200
# List of vendor API endpoints that should require header auth
api_endpoints = [
"/api/v1/vendor/auth/me",
"/api/v1/vendor/dashboard/stats",
"/api/v1/vendor/products",
"/api/v1/vendor/orders",
"/api/v1/vendor/profile",
"/api/v1/vendor/settings",
]
for endpoint in api_endpoints:
# Try to access with just session cookie (no Authorization header)
response = client.get(endpoint)
# All should fail with 401 (header required)
assert response.status_code == 401, (
f"Endpoint {endpoint} should reject cookie-only auth"
)
# ========================================================================
# Role-Based Access Control Tests
# ========================================================================
def test_vendor_endpoints_block_non_vendor_roles(
self, client, auth_headers, admin_headers
):
"""Test that vendor API endpoints block non-vendor users"""
endpoints = [
"/api/v1/vendor/auth/me",
"/api/v1/vendor/dashboard/stats",
"/api/v1/vendor/profile",
]
for endpoint in endpoints:
# Test with regular user token
response = client.get(endpoint, headers=auth_headers)
assert response.status_code == 403, (
f"Endpoint {endpoint} should reject regular users"
)
# Test with admin token
response = client.get(endpoint, headers=admin_headers)
assert response.status_code == 403, (
f"Endpoint {endpoint} should reject admin users"
)
def test_vendor_api_accepts_only_vendor_role(
self, client, vendor_user_headers, test_vendor_user
):
"""Test that vendor API endpoints accept vendor-role users"""
endpoints = [
"/api/v1/vendor/auth/me",
]
for endpoint in endpoints:
response = client.get(endpoint, headers=vendor_user_headers)
assert response.status_code in [
200,
404,
], (
f"Endpoint {endpoint} should accept vendor users (got {response.status_code})"
)
# ========================================================================
# Token Validation Tests
# ========================================================================
def test_malformed_authorization_header(self, client):
"""Test various malformed Authorization headers"""
malformed_headers = [
{"Authorization": "InvalidFormat token123"},
{"Authorization": "Bearer"}, # Missing token
{"Authorization": "bearer token123"}, # Wrong case
{"Authorization": " Bearer token123"}, # Leading space
{"Authorization": "Bearer token123"}, # Double space
]
for headers in malformed_headers:
response = client.get("/api/v1/vendor/auth/me", headers=headers)
assert response.status_code == 401, (
f"Should reject malformed header: {headers}"
)
def test_token_with_missing_claims(self, client, auth_manager):
"""Test token missing required claims"""
# Create token without 'role' claim
invalid_payload = {
"sub": "123",
"username": "test",
"exp": datetime.now(UTC) + timedelta(hours=1),
}
invalid_token = jwt.encode(
invalid_payload, auth_manager.secret_key, algorithm=auth_manager.algorithm
)
response = client.get(
"/api/v1/vendor/auth/me",
headers={"Authorization": f"Bearer {invalid_token}"},
)
assert response.status_code == 401
# ========================================================================
# Edge Cases
# ========================================================================
def test_inactive_vendor_user(self, client, db, test_vendor_user, auth_manager):
"""Test that inactive vendor users are rejected"""
# Deactivate the vendor user
test_vendor_user.is_active = False
db.add(test_vendor_user)
db.commit()
# Create token for inactive user
token_data = auth_manager.create_access_token(test_vendor_user)
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
response = client.get("/api/v1/vendor/auth/me", headers=headers)
# Should fail because user is inactive
assert response.status_code in [401, 403, 404]
# Reactivate for cleanup
test_vendor_user.is_active = True
db.add(test_vendor_user)
db.commit()
def test_concurrent_requests_with_same_token(self, client, vendor_user_headers):
"""Test that the same token can be used for multiple concurrent requests"""
# Make multiple requests with the same token
responses = []
for _ in range(5):
response = client.get("/api/v1/vendor/auth/me", headers=vendor_user_headers)
responses.append(response)
# All should succeed
for response in responses:
assert response.status_code == 200
def test_vendor_api_with_empty_authorization_header(self, client):
"""Test vendor API with empty Authorization header value"""
response = client.get("/api/v1/vendor/auth/me", headers={"Authorization": ""})
assert response.status_code == 401
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.vendor
class TestVendorAPIConsistency:
"""Test that all vendor API endpoints use consistent authentication"""
def test_all_vendor_endpoints_require_header_auth(self, client, test_vendor_user):
"""Verify all vendor API endpoints require Authorization header"""
# Login to establish session
client.post(
"/api/v1/vendor/auth/login",
json={"username": test_vendor_user.username, "password": "vendorpass123"},
)
# All vendor API endpoints (excluding public endpoints like /info)
vendor_api_endpoints = [
("/api/v1/vendor/auth/me", "GET"),
("/api/v1/vendor/dashboard/stats", "GET"),
("/api/v1/vendor/profile", "GET"),
("/api/v1/vendor/settings", "GET"),
("/api/v1/vendor/products", "GET"),
("/api/v1/vendor/orders", "GET"),
("/api/v1/vendor/customers", "GET"),
("/api/v1/vendor/inventory", "GET"),
("/api/v1/vendor/analytics", "GET"),
]
for endpoint, method in vendor_api_endpoints:
if method == "GET":
response = client.get(endpoint)
elif method == "POST":
response = client.post(endpoint, json={})
# All should reject cookie-only auth with 401
assert response.status_code == 401, (
f"Endpoint {endpoint} should require Authorization header (got {response.status_code})"
)
def test_vendor_endpoints_accept_vendor_token(
self, client, vendor_user_headers, test_vendor_with_vendor_user
):
"""Verify all vendor API endpoints accept valid vendor tokens"""
# Endpoints that should work with just vendor authentication
# (may return 404 or other errors due to missing data, but not 401/403)
vendor_api_endpoints = [
"/api/v1/vendor/auth/me",
"/api/v1/vendor/profile",
"/api/v1/vendor/settings",
]
for endpoint in vendor_api_endpoints:
response = client.get(endpoint, headers=vendor_user_headers)
# Should not be authentication/authorization errors
assert response.status_code not in [
401,
403,
], (
f"Endpoint {endpoint} should accept vendor token (got {response.status_code}: {response.text})"
)