Some checks failed
Consolidate User.role (2-value: admin/store) + User.is_super_admin (boolean) into a single 4-value UserRole enum: super_admin, platform_admin, merchant_owner, store_member. Drop stale StoreUser.user_type column. Fix role="user" bug in merchant creation. Key changes: - Expand UserRole enum from 2 to 4 values with computed properties (is_admin, is_super_admin, is_platform_admin, is_merchant_owner, is_store_user) - Add Alembic migration (tenancy_003) for data migration + column drops - Remove is_super_admin from JWT token payload - Update all auth dependencies, services, routes, templates, JS, and tests - Update all RBAC documentation 66 files changed, 1219 unit tests passing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1583 lines
61 KiB
Python
1583 lines
61 KiB
Python
# tests/unit/api/test_deps.py
|
|
"""
|
|
Unit tests for app/api/deps.py — Authentication dependencies.
|
|
|
|
Tests the helper functions and auth dependency logic that every protected
|
|
route in the application relies on.
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import UTC, datetime, timedelta
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from fastapi.security import HTTPAuthorizationCredentials
|
|
|
|
from app.api.deps import (
|
|
_get_token_from_request,
|
|
_get_user_model,
|
|
_validate_customer_token,
|
|
_validate_user_token,
|
|
get_admin_with_platform_context,
|
|
get_current_admin_api,
|
|
get_current_admin_from_cookie_or_header,
|
|
get_current_admin_optional,
|
|
get_current_customer_api,
|
|
get_current_customer_from_cookie_or_header,
|
|
get_current_customer_optional,
|
|
get_current_merchant_api,
|
|
get_current_merchant_from_cookie_or_header,
|
|
get_current_merchant_optional,
|
|
get_current_store_api,
|
|
get_current_store_from_cookie_or_header,
|
|
get_current_store_optional,
|
|
get_current_super_admin,
|
|
get_current_super_admin_api,
|
|
get_user_permissions,
|
|
get_user_store,
|
|
require_all_store_permissions,
|
|
require_any_store_permission,
|
|
require_module_access,
|
|
require_platform_access,
|
|
require_store_owner,
|
|
require_store_permission,
|
|
)
|
|
from app.modules.tenancy.exceptions import (
|
|
AdminRequiredException,
|
|
InsufficientPermissionsException,
|
|
InsufficientStorePermissionsException,
|
|
InvalidTokenException,
|
|
StoreNotFoundException,
|
|
StoreOwnerOnlyException,
|
|
UnauthorizedStoreAccessException,
|
|
)
|
|
from app.modules.tenancy.models import User
|
|
from middleware.auth import AuthManager
|
|
from models.schema.auth import UserContext
|
|
|
|
# ============================================================================
|
|
# Fixtures
|
|
# ============================================================================
|
|
|
|
|
|
def _make_credentials(token: str) -> HTTPAuthorizationCredentials:
|
|
"""Helper to create Bearer credentials."""
|
|
return HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
|
|
|
|
|
|
def _make_request(path: str = "/test") -> MagicMock:
|
|
"""Helper to create a mock FastAPI Request."""
|
|
request = MagicMock()
|
|
request.url.path = path
|
|
request.state = MagicMock()
|
|
return request
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 1: Helper Functions
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetTokenFromRequest:
|
|
"""Test _get_token_from_request helper."""
|
|
|
|
def test_returns_header_token_when_present(self):
|
|
"""Authorization header token is returned with source 'header'."""
|
|
creds = _make_credentials("header_token_123")
|
|
token, source = _get_token_from_request(creds, None, "admin_token", "/admin/test")
|
|
assert token == "header_token_123"
|
|
assert source == "header"
|
|
|
|
def test_returns_cookie_token_when_no_header(self):
|
|
"""Cookie token is returned when no Authorization header."""
|
|
token, source = _get_token_from_request(None, "cookie_token_456", "admin_token", "/admin/test")
|
|
assert token == "cookie_token_456"
|
|
assert source == "cookie"
|
|
|
|
def test_header_takes_priority_over_cookie(self):
|
|
"""Authorization header takes priority when both present."""
|
|
creds = _make_credentials("header_token")
|
|
token, source = _get_token_from_request(creds, "cookie_token", "admin_token", "/admin/test")
|
|
assert token == "header_token"
|
|
assert source == "header"
|
|
|
|
def test_returns_none_when_neither_present(self):
|
|
"""Returns (None, None) when no token source available."""
|
|
token, source = _get_token_from_request(None, None, "admin_token", "/admin/test")
|
|
assert token is None
|
|
assert source is None
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestValidateUserToken:
|
|
"""Test _validate_user_token helper."""
|
|
|
|
def test_returns_user_for_valid_token(self, db, auth_manager, test_admin):
|
|
"""Valid JWT token returns the corresponding User model."""
|
|
token_data = auth_manager.create_access_token(user=test_admin)
|
|
user = _validate_user_token(token_data["access_token"], db)
|
|
assert user.id == test_admin.id
|
|
assert user.username == test_admin.username
|
|
|
|
def test_raises_for_invalid_token(self, db):
|
|
"""Invalid JWT token raises InvalidTokenException."""
|
|
with pytest.raises(Exception): # AuthManager raises on invalid token
|
|
_validate_user_token("completely_invalid_jwt_token", db)
|
|
|
|
def test_raises_for_expired_token(self, db, test_admin):
|
|
"""Expired JWT token is rejected."""
|
|
# Create a token that's already expired by patching the auth manager
|
|
am = AuthManager()
|
|
# Generate token with negative expiry
|
|
from jose import jwt as jose_jwt
|
|
|
|
payload = {
|
|
"sub": str(test_admin.id),
|
|
"username": test_admin.username,
|
|
"role": test_admin.role,
|
|
"exp": datetime.now(UTC) - timedelta(hours=1),
|
|
}
|
|
expired_token = jose_jwt.encode(payload, am.secret_key, algorithm=am.algorithm)
|
|
with pytest.raises(Exception):
|
|
_validate_user_token(expired_token, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetUserModel:
|
|
"""Test _get_user_model helper."""
|
|
|
|
def test_returns_user_model_from_context(self, db, test_admin):
|
|
"""Loads User model from database using UserContext.id."""
|
|
context = UserContext.from_user(test_admin, include_store_context=False)
|
|
user = _get_user_model(context, db)
|
|
assert user.id == test_admin.id
|
|
assert user.username == test_admin.username
|
|
|
|
def test_copies_token_attributes_to_model(self, db, test_store_user):
|
|
"""Token store attributes from context are copied to the User model."""
|
|
context = UserContext.from_user(test_store_user)
|
|
# Manually set token attributes on context
|
|
context.token_store_id = 42
|
|
context.token_store_code = "TESTSTORE"
|
|
context.token_store_role = "owner"
|
|
|
|
user = _get_user_model(context, db)
|
|
assert user.token_store_id == 42
|
|
assert user.token_store_code == "TESTSTORE"
|
|
assert user.token_store_role == "owner"
|
|
|
|
def test_raises_for_nonexistent_user(self, db):
|
|
"""Raises InvalidTokenException when user ID doesn't exist in DB."""
|
|
fake_context = MagicMock()
|
|
fake_context.id = 999999
|
|
fake_context.token_store_id = None
|
|
fake_context.token_store_code = None
|
|
fake_context.token_store_role = None
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
_get_user_model(fake_context, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestValidateCustomerToken:
|
|
"""Test _validate_customer_token helper."""
|
|
|
|
def _make_customer_token(self, auth_manager, customer_id, store_id=None, token_type="customer", expired=False):
|
|
"""Helper to generate a customer JWT token."""
|
|
from jose import jwt as jose_jwt
|
|
|
|
exp = datetime.now(UTC) + (timedelta(hours=-1) if expired else timedelta(hours=1))
|
|
payload = {
|
|
"sub": str(customer_id),
|
|
"type": token_type,
|
|
"store_id": store_id,
|
|
"exp": exp,
|
|
}
|
|
return jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
|
|
|
|
def _create_test_customer(self, db, email_prefix="customer", is_active=True):
|
|
"""Helper to create a customer with the required store FK."""
|
|
from app.modules.customers.models.customer import Customer
|
|
from app.modules.tenancy.models import Merchant, Store, User
|
|
|
|
uid = uuid.uuid4().hex[:8]
|
|
# Customer requires store_id FK — create minimal user + merchant + store
|
|
owner = User(
|
|
email=f"owner_{uid}@test.com",
|
|
username=f"owner_{uid}",
|
|
hashed_password="not_a_real_hash",
|
|
role="merchant_owner",
|
|
is_active=True,
|
|
)
|
|
db.add(owner)
|
|
db.flush()
|
|
merchant = Merchant(
|
|
name=f"CustTestMerchant_{uid}",
|
|
owner_user_id=owner.id,
|
|
contact_email=f"m_{uid}@test.com",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(merchant)
|
|
db.flush()
|
|
store = Store(
|
|
merchant_id=merchant.id,
|
|
store_code=f"CT_{uid}".upper(),
|
|
subdomain=f"ct{uid}",
|
|
name=f"CustTestStore_{uid}",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(store)
|
|
db.flush()
|
|
|
|
customer = Customer(
|
|
store_id=store.id,
|
|
email=f"{email_prefix}_{uid}@example.com",
|
|
hashed_password="not_a_real_hash", # noqa: SEC001
|
|
first_name="Test",
|
|
last_name="Customer",
|
|
customer_number=f"CUST_{uid}",
|
|
is_active=is_active,
|
|
)
|
|
db.add(customer)
|
|
db.commit()
|
|
db.refresh(customer)
|
|
return customer, store
|
|
|
|
def test_valid_customer_token_returns_context(self, db, auth_manager):
|
|
"""Valid customer token returns CustomerContext."""
|
|
customer, _store = self._create_test_customer(db)
|
|
|
|
token = self._make_customer_token(auth_manager, customer.id)
|
|
request = _make_request("/storefront/account")
|
|
request.state.store = None
|
|
|
|
result = _validate_customer_token(token, request, db)
|
|
assert result.id == customer.id
|
|
assert result.email == customer.email
|
|
|
|
def test_rejects_non_customer_token_type(self, db, auth_manager):
|
|
"""Token with type != 'customer' is rejected."""
|
|
token = self._make_customer_token(auth_manager, 1, token_type="user")
|
|
request = _make_request()
|
|
request.state.store = None
|
|
|
|
with pytest.raises(InvalidTokenException, match="Customer authentication required"):
|
|
_validate_customer_token(token, request, db)
|
|
|
|
def test_rejects_token_missing_sub(self, db, auth_manager):
|
|
"""Token without 'sub' claim is rejected."""
|
|
from jose import jwt as jose_jwt
|
|
|
|
payload = {
|
|
"type": "customer",
|
|
"exp": datetime.now(UTC) + timedelta(hours=1),
|
|
}
|
|
token = jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
|
|
request = _make_request()
|
|
request.state.store = None
|
|
|
|
with pytest.raises(InvalidTokenException, match="Invalid token"):
|
|
_validate_customer_token(token, request, db)
|
|
|
|
def test_rejects_expired_customer_token(self, db, auth_manager):
|
|
"""Expired customer token is rejected (caught by jose as ExpiredSignatureError)."""
|
|
customer, _store = self._create_test_customer(db)
|
|
|
|
token = self._make_customer_token(auth_manager, customer.id, expired=True)
|
|
request = _make_request()
|
|
request.state.store = None
|
|
|
|
with pytest.raises(InvalidTokenException, match="Could not validate credentials"):
|
|
_validate_customer_token(token, request, db)
|
|
|
|
def test_rejects_nonexistent_customer(self, db, auth_manager):
|
|
"""Token for non-existent customer is rejected."""
|
|
token = self._make_customer_token(auth_manager, 999999)
|
|
request = _make_request()
|
|
request.state.store = None
|
|
|
|
with pytest.raises(InvalidTokenException, match="Customer not found"):
|
|
_validate_customer_token(token, request, db)
|
|
|
|
def test_rejects_inactive_customer(self, db, auth_manager):
|
|
"""Token for inactive customer is rejected."""
|
|
customer, _store = self._create_test_customer(db, is_active=False)
|
|
|
|
token = self._make_customer_token(auth_manager, customer.id)
|
|
request = _make_request()
|
|
request.state.store = None
|
|
|
|
with pytest.raises(InvalidTokenException, match="inactive"):
|
|
_validate_customer_token(token, request, db)
|
|
|
|
def test_rejects_store_mismatch(self, db, auth_manager):
|
|
"""Token with store_id that doesn't match request store is rejected."""
|
|
from app.modules.tenancy.exceptions import UnauthorizedStoreAccessException
|
|
|
|
customer, store = self._create_test_customer(db)
|
|
|
|
# Token has the real store_id, but request store has a different id
|
|
token = self._make_customer_token(auth_manager, customer.id, store_id=store.id)
|
|
request = _make_request()
|
|
mock_store = MagicMock()
|
|
mock_store.id = store.id + 9999 # different store
|
|
mock_store.store_code = "WRONG"
|
|
request.state.store = mock_store
|
|
|
|
with pytest.raises(UnauthorizedStoreAccessException):
|
|
_validate_customer_token(token, request, db)
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 2: Admin Authentication
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentAdminFromCookieOrHeader:
|
|
"""Test get_current_admin_from_cookie_or_header."""
|
|
|
|
def test_valid_admin_via_header(self, db, auth_manager, test_admin):
|
|
"""Admin user with valid header token returns UserContext."""
|
|
token_data = auth_manager.create_access_token(user=test_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/dashboard")
|
|
|
|
result = get_current_admin_from_cookie_or_header(request, creds, None, db)
|
|
assert result.id == test_admin.id
|
|
assert result.is_admin is True
|
|
|
|
def test_valid_admin_via_cookie(self, db, auth_manager, test_admin):
|
|
"""Admin user with valid cookie token returns UserContext."""
|
|
token_data = auth_manager.create_access_token(user=test_admin)
|
|
request = _make_request("/admin/dashboard")
|
|
|
|
result = get_current_admin_from_cookie_or_header(
|
|
request, None, token_data["access_token"], db
|
|
)
|
|
assert result.id == test_admin.id
|
|
|
|
def test_rejects_non_admin_user(self, db, auth_manager, test_store_user):
|
|
"""Store user attempting admin route raises AdminRequiredException."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/dashboard")
|
|
|
|
with pytest.raises(AdminRequiredException):
|
|
get_current_admin_from_cookie_or_header(request, creds, None, db)
|
|
|
|
def test_raises_without_token(self, db):
|
|
"""No token raises InvalidTokenException."""
|
|
request = _make_request("/admin/dashboard")
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
get_current_admin_from_cookie_or_header(request, None, None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentAdminApi:
|
|
"""Test get_current_admin_api (header-only, CSRF-safe)."""
|
|
|
|
def test_valid_admin_header(self, db, auth_manager, test_admin):
|
|
"""Valid admin token via header returns UserContext."""
|
|
token_data = auth_manager.create_access_token(user=test_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
|
|
result = get_current_admin_api(creds, db)
|
|
assert result.id == test_admin.id
|
|
assert result.is_admin is True
|
|
|
|
def test_rejects_non_admin(self, db, auth_manager, test_store_user):
|
|
"""Non-admin user rejected with AdminRequiredException."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
|
|
with pytest.raises(AdminRequiredException):
|
|
get_current_admin_api(creds, db)
|
|
|
|
def test_rejects_missing_credentials(self, db):
|
|
"""Missing credentials raises InvalidTokenException."""
|
|
with pytest.raises(InvalidTokenException):
|
|
get_current_admin_api(None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentSuperAdmin:
|
|
"""Test get_current_super_admin."""
|
|
|
|
def test_super_admin_accepted(self, db, auth_manager, test_super_admin):
|
|
"""Super admin user passes the check."""
|
|
token_data = auth_manager.create_access_token(user=test_super_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/settings")
|
|
|
|
result = get_current_super_admin(request, creds, None, db)
|
|
assert result.is_super_admin is True
|
|
|
|
def test_platform_admin_rejected(self, db, auth_manager, test_platform_admin):
|
|
"""Platform admin (not super) is rejected."""
|
|
token_data = auth_manager.create_access_token(user=test_platform_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/settings")
|
|
|
|
with pytest.raises(AdminRequiredException, match="Super admin"):
|
|
get_current_super_admin(request, creds, None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentSuperAdminApi:
|
|
"""Test get_current_super_admin_api (header-only)."""
|
|
|
|
def test_super_admin_accepted(self, db, auth_manager, test_super_admin):
|
|
"""Super admin via API header passes."""
|
|
token_data = auth_manager.create_access_token(user=test_super_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
|
|
result = get_current_super_admin_api(creds, db)
|
|
assert result.is_super_admin is True
|
|
|
|
def test_platform_admin_rejected(self, db, auth_manager, test_platform_admin):
|
|
"""Platform admin rejected via API."""
|
|
token_data = auth_manager.create_access_token(user=test_platform_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
|
|
with pytest.raises(AdminRequiredException, match="Super admin"):
|
|
get_current_super_admin_api(creds, db)
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 3: Store Authentication
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentStoreFromCookieOrHeader:
|
|
"""Test get_current_store_from_cookie_or_header."""
|
|
|
|
def test_valid_store_user_via_header(self, db, auth_manager, test_store_user):
|
|
"""Store user with valid header token returns UserContext."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/store/dashboard")
|
|
|
|
result = get_current_store_from_cookie_or_header(request, creds, None, db)
|
|
assert result.id == test_store_user.id
|
|
assert result.is_store_user is True
|
|
|
|
def test_valid_store_user_via_cookie(self, db, auth_manager, test_store_user):
|
|
"""Store user with valid cookie token returns UserContext."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
request = _make_request("/store/dashboard")
|
|
|
|
result = get_current_store_from_cookie_or_header(
|
|
request, None, token_data["access_token"], db
|
|
)
|
|
assert result.id == test_store_user.id
|
|
|
|
def test_admin_blocked_from_store_routes(self, db, auth_manager, test_admin):
|
|
"""Admin user is explicitly blocked from store routes."""
|
|
token_data = auth_manager.create_access_token(user=test_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/store/dashboard")
|
|
|
|
with pytest.raises(Exception, match="Store access only"):
|
|
get_current_store_from_cookie_or_header(request, creds, None, db)
|
|
|
|
def test_raises_without_token(self, db):
|
|
"""No token raises InvalidTokenException."""
|
|
request = _make_request("/store/dashboard")
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
get_current_store_from_cookie_or_header(request, None, None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentStoreApi:
|
|
"""Test get_current_store_api (header-only, validates store context)."""
|
|
|
|
def test_valid_store_user_with_store_context(self, db, auth_manager, test_store_user):
|
|
"""Store user with store context in token returns UserContext."""
|
|
# Create token with store context
|
|
token_data = auth_manager.create_access_token(
|
|
user=test_store_user,
|
|
store_id=1,
|
|
store_code="TEST",
|
|
store_role="owner",
|
|
)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
|
|
# Mock is_member_of to return True
|
|
with patch.object(User, "is_member_of", return_value=True):
|
|
result = get_current_store_api(creds, db)
|
|
assert result.id == test_store_user.id
|
|
|
|
def test_rejects_missing_credentials(self, db):
|
|
"""Missing credentials raises InvalidTokenException."""
|
|
with pytest.raises(InvalidTokenException):
|
|
get_current_store_api(None, db)
|
|
|
|
def test_admin_blocked(self, db, auth_manager, test_admin):
|
|
"""Admin user blocked from store API."""
|
|
token_data = auth_manager.create_access_token(user=test_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
|
|
with pytest.raises(Exception, match="Store access only"):
|
|
get_current_store_api(creds, db)
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 7: Optional Authentication
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentAdminOptional:
|
|
"""Test get_current_admin_optional — returns None instead of raising."""
|
|
|
|
def test_returns_context_for_valid_admin(self, db, auth_manager, test_admin):
|
|
"""Valid admin token returns UserContext."""
|
|
token_data = auth_manager.create_access_token(user=test_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/login")
|
|
|
|
result = get_current_admin_optional(request, creds, None, db)
|
|
assert result is not None
|
|
assert result.id == test_admin.id
|
|
|
|
def test_returns_none_without_token(self, db):
|
|
"""No token returns None (not exception)."""
|
|
request = _make_request("/admin/login")
|
|
result = get_current_admin_optional(request, None, None, db)
|
|
assert result is None
|
|
|
|
def test_returns_none_for_invalid_token(self, db):
|
|
"""Invalid token returns None (not exception)."""
|
|
creds = _make_credentials("bad_token")
|
|
request = _make_request("/admin/login")
|
|
result = get_current_admin_optional(request, creds, None, db)
|
|
assert result is None
|
|
|
|
def test_returns_none_for_non_admin(self, db, auth_manager, test_store_user):
|
|
"""Store user token returns None (not admin)."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/login")
|
|
|
|
result = get_current_admin_optional(request, creds, None, db)
|
|
assert result is None
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentStoreOptional:
|
|
"""Test get_current_store_optional — returns None instead of raising."""
|
|
|
|
def test_returns_context_for_valid_store_user(self, db, auth_manager, test_store_user):
|
|
"""Valid store token returns UserContext."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/store/login")
|
|
|
|
result = get_current_store_optional(request, creds, None, db)
|
|
assert result is not None
|
|
assert result.id == test_store_user.id
|
|
|
|
def test_returns_none_without_token(self, db):
|
|
"""No token returns None."""
|
|
request = _make_request("/store/login")
|
|
result = get_current_store_optional(request, None, None, db)
|
|
assert result is None
|
|
|
|
def test_returns_none_for_admin(self, db, auth_manager, test_admin):
|
|
"""Admin user token returns None (not store role)."""
|
|
token_data = auth_manager.create_access_token(user=test_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/store/login")
|
|
|
|
result = get_current_store_optional(request, creds, None, db)
|
|
assert result is None
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 4: Merchant Authentication
|
|
# ============================================================================
|
|
|
|
|
|
def _create_merchant_owner(db, auth_manager):
|
|
"""Create a user who owns an active merchant."""
|
|
uid = uuid.uuid4().hex[:8]
|
|
from app.modules.tenancy.models import Merchant
|
|
|
|
user = User(
|
|
email=f"merchant_{uid}@example.com",
|
|
username=f"merchant_{uid}",
|
|
hashed_password=auth_manager.hash_password("testpass123"),
|
|
role="merchant_owner",
|
|
is_active=True,
|
|
is_email_verified=True,
|
|
)
|
|
db.add(user)
|
|
db.flush()
|
|
|
|
merchant = Merchant(
|
|
name=f"Merchant_{uid}",
|
|
owner_user_id=user.id,
|
|
contact_email=f"m_{uid}@merchant.com",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(merchant)
|
|
db.commit()
|
|
db.refresh(user)
|
|
db.refresh(merchant)
|
|
return user, merchant
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentMerchantFromCookieOrHeader:
|
|
"""Test get_current_merchant_from_cookie_or_header."""
|
|
|
|
def test_valid_merchant_owner_via_header(self, db, auth_manager):
|
|
"""User who owns a merchant is accepted."""
|
|
user, _merchant = _create_merchant_owner(db, auth_manager)
|
|
token_data = auth_manager.create_access_token(user=user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/merchants/dashboard")
|
|
|
|
result = get_current_merchant_from_cookie_or_header(request, creds, None, db)
|
|
assert result.id == user.id
|
|
|
|
def test_valid_merchant_owner_via_cookie(self, db, auth_manager):
|
|
"""Merchant owner accepted via cookie token."""
|
|
user, _merchant = _create_merchant_owner(db, auth_manager)
|
|
token_data = auth_manager.create_access_token(user=user)
|
|
request = _make_request("/merchants/dashboard")
|
|
|
|
result = get_current_merchant_from_cookie_or_header(
|
|
request, None, token_data["access_token"], db
|
|
)
|
|
assert result.id == user.id
|
|
|
|
def test_rejects_user_without_merchants(self, db, auth_manager, test_store_user):
|
|
"""User who owns no merchants is rejected."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/merchants/dashboard")
|
|
|
|
with pytest.raises(InsufficientPermissionsException, match="Merchant owner"):
|
|
get_current_merchant_from_cookie_or_header(request, creds, None, db)
|
|
|
|
def test_raises_without_token(self, db):
|
|
"""No token raises InvalidTokenException."""
|
|
request = _make_request("/merchants/dashboard")
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
get_current_merchant_from_cookie_or_header(request, None, None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentMerchantApi:
|
|
"""Test get_current_merchant_api (header-only)."""
|
|
|
|
def test_valid_merchant_owner_header(self, db, auth_manager):
|
|
"""Merchant owner accepted via API header."""
|
|
user, _merchant = _create_merchant_owner(db, auth_manager)
|
|
token_data = auth_manager.create_access_token(user=user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/api/merchants")
|
|
|
|
result = get_current_merchant_api(request, creds, db)
|
|
assert result.id == user.id
|
|
|
|
def test_rejects_non_merchant_owner(self, db, auth_manager, test_store_user):
|
|
"""Non-merchant-owner user rejected."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/api/merchants")
|
|
|
|
with pytest.raises(InsufficientPermissionsException, match="Merchant owner"):
|
|
get_current_merchant_api(request, creds, db)
|
|
|
|
def test_rejects_missing_credentials(self, db):
|
|
"""Missing credentials raises InvalidTokenException."""
|
|
request = _make_request("/api/merchants")
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
get_current_merchant_api(request, None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentMerchantOptional:
|
|
"""Test get_current_merchant_optional — returns None instead of raising."""
|
|
|
|
def test_returns_context_for_merchant_owner(self, db, auth_manager):
|
|
"""Valid merchant owner returns UserContext."""
|
|
user, _merchant = _create_merchant_owner(db, auth_manager)
|
|
token_data = auth_manager.create_access_token(user=user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/merchants/login")
|
|
|
|
result = get_current_merchant_optional(request, creds, None, db)
|
|
assert result is not None
|
|
assert result.id == user.id
|
|
|
|
def test_returns_none_without_token(self, db):
|
|
"""No token returns None."""
|
|
request = _make_request("/merchants/login")
|
|
result = get_current_merchant_optional(request, None, None, db)
|
|
assert result is None
|
|
|
|
def test_returns_none_for_non_owner(self, db, auth_manager, test_store_user):
|
|
"""User without merchants returns None."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/merchants/login")
|
|
|
|
result = get_current_merchant_optional(request, creds, None, db)
|
|
assert result is None
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 5: Customer Authentication Endpoints
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentCustomerFromCookieOrHeader:
|
|
"""Test get_current_customer_from_cookie_or_header."""
|
|
|
|
def _make_customer_token(self, auth_manager, customer_id, store_id=None):
|
|
"""Helper to generate a customer JWT token."""
|
|
from jose import jwt as jose_jwt
|
|
|
|
payload = {
|
|
"sub": str(customer_id),
|
|
"type": "customer",
|
|
"store_id": store_id,
|
|
"exp": datetime.now(UTC) + timedelta(hours=1),
|
|
}
|
|
return jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
|
|
|
|
def _create_test_customer(self, db):
|
|
"""Create a customer with required store FK."""
|
|
from app.modules.customers.models.customer import Customer
|
|
from app.modules.tenancy.models import Merchant, Store
|
|
|
|
uid = uuid.uuid4().hex[:8]
|
|
owner = User(
|
|
email=f"csowner_{uid}@test.com",
|
|
username=f"csowner_{uid}",
|
|
hashed_password="not_a_real_hash",
|
|
role="merchant_owner",
|
|
is_active=True,
|
|
)
|
|
db.add(owner)
|
|
db.flush()
|
|
merchant = Merchant(
|
|
name=f"CSMerchant_{uid}",
|
|
owner_user_id=owner.id,
|
|
contact_email=f"csm_{uid}@test.com",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(merchant)
|
|
db.flush()
|
|
store = Store(
|
|
merchant_id=merchant.id,
|
|
store_code=f"CS_{uid}".upper(),
|
|
subdomain=f"cs{uid}",
|
|
name=f"CSStore_{uid}",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(store)
|
|
db.flush()
|
|
customer = Customer(
|
|
store_id=store.id,
|
|
email=f"cust_{uid}@example.com",
|
|
hashed_password="not_a_real_hash", # noqa: SEC001
|
|
first_name="Test",
|
|
last_name="Customer",
|
|
customer_number=f"CUST_{uid}",
|
|
is_active=True,
|
|
)
|
|
db.add(customer)
|
|
db.commit()
|
|
db.refresh(customer)
|
|
return customer
|
|
|
|
def test_valid_customer_via_header(self, db, auth_manager):
|
|
"""Valid customer token via header returns CustomerContext."""
|
|
customer = self._create_test_customer(db)
|
|
token = self._make_customer_token(auth_manager, customer.id)
|
|
creds = _make_credentials(token)
|
|
request = _make_request("/storefront/account")
|
|
request.state.store = None
|
|
|
|
result = get_current_customer_from_cookie_or_header(request, creds, None, db)
|
|
assert result.id == customer.id
|
|
|
|
def test_valid_customer_via_cookie(self, db, auth_manager):
|
|
"""Valid customer token via cookie returns CustomerContext."""
|
|
customer = self._create_test_customer(db)
|
|
token = self._make_customer_token(auth_manager, customer.id)
|
|
request = _make_request("/storefront/account")
|
|
request.state.store = None
|
|
|
|
result = get_current_customer_from_cookie_or_header(request, None, token, db)
|
|
assert result.id == customer.id
|
|
|
|
def test_raises_without_token(self, db):
|
|
"""No token raises InvalidTokenException."""
|
|
request = _make_request("/storefront/account")
|
|
request.state.store = None
|
|
|
|
with pytest.raises(InvalidTokenException, match="Customer authentication required"):
|
|
get_current_customer_from_cookie_or_header(request, None, None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentCustomerApi:
|
|
"""Test get_current_customer_api (header-only)."""
|
|
|
|
def _make_customer_token(self, auth_manager, customer_id):
|
|
from jose import jwt as jose_jwt
|
|
|
|
payload = {
|
|
"sub": str(customer_id),
|
|
"type": "customer",
|
|
"store_id": None,
|
|
"exp": datetime.now(UTC) + timedelta(hours=1),
|
|
}
|
|
return jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
|
|
|
|
def _create_test_customer(self, db):
|
|
from app.modules.customers.models.customer import Customer
|
|
from app.modules.tenancy.models import Merchant, Store
|
|
|
|
uid = uuid.uuid4().hex[:8]
|
|
owner = User(
|
|
email=f"caowner_{uid}@test.com",
|
|
username=f"caowner_{uid}",
|
|
hashed_password="not_a_real_hash",
|
|
role="merchant_owner",
|
|
is_active=True,
|
|
)
|
|
db.add(owner)
|
|
db.flush()
|
|
merchant = Merchant(
|
|
name=f"CAMerchant_{uid}",
|
|
owner_user_id=owner.id,
|
|
contact_email=f"cam_{uid}@test.com",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(merchant)
|
|
db.flush()
|
|
store = Store(
|
|
merchant_id=merchant.id,
|
|
store_code=f"CA_{uid}".upper(),
|
|
subdomain=f"ca{uid}",
|
|
name=f"CAStore_{uid}",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(store)
|
|
db.flush()
|
|
customer = Customer(
|
|
store_id=store.id,
|
|
email=f"capi_{uid}@example.com",
|
|
hashed_password="not_a_real_hash", # noqa: SEC001
|
|
first_name="API",
|
|
last_name="Customer",
|
|
customer_number=f"CAPI_{uid}",
|
|
is_active=True,
|
|
)
|
|
db.add(customer)
|
|
db.commit()
|
|
db.refresh(customer)
|
|
return customer
|
|
|
|
def test_valid_customer_header(self, db, auth_manager):
|
|
"""Valid customer token via API header returns CustomerContext."""
|
|
customer = self._create_test_customer(db)
|
|
token = self._make_customer_token(auth_manager, customer.id)
|
|
creds = _make_credentials(token)
|
|
request = _make_request("/api/storefront/account")
|
|
request.state.store = None
|
|
|
|
result = get_current_customer_api(request, creds, db)
|
|
assert result.id == customer.id
|
|
|
|
def test_rejects_missing_credentials(self, db):
|
|
"""Missing credentials raises InvalidTokenException."""
|
|
request = _make_request("/api/storefront/account")
|
|
request.state.store = None
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
get_current_customer_api(request, None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetCurrentCustomerOptional:
|
|
"""Test get_current_customer_optional — returns None instead of raising."""
|
|
|
|
def _make_customer_token(self, auth_manager, customer_id):
|
|
from jose import jwt as jose_jwt
|
|
|
|
payload = {
|
|
"sub": str(customer_id),
|
|
"type": "customer",
|
|
"store_id": None,
|
|
"exp": datetime.now(UTC) + timedelta(hours=1),
|
|
}
|
|
return jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm)
|
|
|
|
def _create_test_customer(self, db):
|
|
from app.modules.customers.models.customer import Customer
|
|
from app.modules.tenancy.models import Merchant, Store
|
|
|
|
uid = uuid.uuid4().hex[:8]
|
|
owner = User(
|
|
email=f"coowner_{uid}@test.com",
|
|
username=f"coowner_{uid}",
|
|
hashed_password="not_a_real_hash",
|
|
role="merchant_owner",
|
|
is_active=True,
|
|
)
|
|
db.add(owner)
|
|
db.flush()
|
|
merchant = Merchant(
|
|
name=f"COMerchant_{uid}",
|
|
owner_user_id=owner.id,
|
|
contact_email=f"com_{uid}@test.com",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(merchant)
|
|
db.flush()
|
|
store = Store(
|
|
merchant_id=merchant.id,
|
|
store_code=f"CO_{uid}".upper(),
|
|
subdomain=f"co{uid}",
|
|
name=f"COStore_{uid}",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(store)
|
|
db.flush()
|
|
customer = Customer(
|
|
store_id=store.id,
|
|
email=f"copt_{uid}@example.com",
|
|
hashed_password="not_a_real_hash", # noqa: SEC001
|
|
first_name="Optional",
|
|
last_name="Customer",
|
|
customer_number=f"COPT_{uid}",
|
|
is_active=True,
|
|
)
|
|
db.add(customer)
|
|
db.commit()
|
|
db.refresh(customer)
|
|
return customer
|
|
|
|
def test_returns_context_for_valid_customer(self, db, auth_manager):
|
|
"""Valid customer token returns CustomerContext."""
|
|
customer = self._create_test_customer(db)
|
|
token = self._make_customer_token(auth_manager, customer.id)
|
|
creds = _make_credentials(token)
|
|
request = _make_request("/storefront/login")
|
|
request.state.store = None
|
|
|
|
result = get_current_customer_optional(request, creds, None, db)
|
|
assert result is not None
|
|
assert result.id == customer.id
|
|
|
|
def test_returns_none_without_token(self, db):
|
|
"""No token returns None."""
|
|
request = _make_request("/storefront/login")
|
|
request.state.store = None
|
|
result = get_current_customer_optional(request, None, None, db)
|
|
assert result is None
|
|
|
|
def test_returns_none_for_invalid_token(self, db):
|
|
"""Invalid token returns None."""
|
|
creds = _make_credentials("bad_customer_token")
|
|
request = _make_request("/storefront/login")
|
|
request.state.store = None
|
|
|
|
result = get_current_customer_optional(request, creds, None, db)
|
|
assert result is None
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 6: Access Control (Store Permissions)
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestRequireStorePermission:
|
|
"""Test require_store_permission factory."""
|
|
|
|
def _make_user_context(self, user, store_id=1, store_code="TEST"):
|
|
"""Create a UserContext with store context."""
|
|
ctx = UserContext.from_user(user)
|
|
ctx.token_store_id = store_id
|
|
ctx.token_store_code = store_code
|
|
ctx.token_store_role = "owner"
|
|
return ctx
|
|
|
|
def test_allows_user_with_permission(self, db, auth_manager, test_store_user):
|
|
"""User with required permission passes."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
checker = require_store_permission("products.view")
|
|
request = _make_request("/store/TEST/products")
|
|
user_ctx = self._make_user_context(test_store_user)
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "has_store_permission", return_value=True),
|
|
):
|
|
result = checker(request, db, user_ctx)
|
|
assert result.id == test_store_user.id
|
|
|
|
def test_rejects_user_without_permission(self, db, auth_manager, test_store_user):
|
|
"""User without required permission raises InsufficientStorePermissionsException."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
checker = require_store_permission("products.delete")
|
|
request = _make_request("/store/TEST/products")
|
|
user_ctx = self._make_user_context(test_store_user)
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "has_store_permission", return_value=False),
|
|
):
|
|
with pytest.raises(InsufficientStorePermissionsException):
|
|
checker(request, db, user_ctx)
|
|
|
|
def test_rejects_missing_store_context(self, db, auth_manager, test_store_user):
|
|
"""Token without store_id raises InvalidTokenException."""
|
|
checker = require_store_permission("products.view")
|
|
request = _make_request("/store/TEST/products")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = None
|
|
|
|
with pytest.raises(InvalidTokenException, match="missing store information"):
|
|
checker(request, db, user_ctx)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestRequireStoreOwner:
|
|
"""Test require_store_owner."""
|
|
|
|
def test_allows_store_owner(self, db, auth_manager, test_store_user):
|
|
"""Store owner passes."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
request = _make_request("/store/TEST/team")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
user_ctx.token_store_code = "TEST"
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "is_owner_of", return_value=True),
|
|
):
|
|
result = require_store_owner(request, db, user_ctx)
|
|
assert result.id == test_store_user.id
|
|
|
|
def test_rejects_non_owner(self, db, auth_manager, test_store_user):
|
|
"""Non-owner team member rejected."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
request = _make_request("/store/TEST/team")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
user_ctx.token_store_code = "TEST"
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "is_owner_of", return_value=False),
|
|
):
|
|
with pytest.raises(StoreOwnerOnlyException):
|
|
require_store_owner(request, db, user_ctx)
|
|
|
|
def test_rejects_missing_store_context(self, db, auth_manager, test_store_user):
|
|
"""Token without store_id raises InvalidTokenException."""
|
|
request = _make_request("/store/TEST/team")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = None
|
|
|
|
with pytest.raises(InvalidTokenException, match="missing store information"):
|
|
require_store_owner(request, db, user_ctx)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestRequireAnyStorePermission:
|
|
"""Test require_any_store_permission factory."""
|
|
|
|
def test_allows_user_with_any_permission(self, db, auth_manager, test_store_user):
|
|
"""User with at least one of the required permissions passes."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
checker = require_any_store_permission("dashboard.view", "reports.view")
|
|
request = _make_request("/store/TEST/dashboard")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
|
|
# has_store_permission returns True for first perm, False for second
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "has_store_permission", side_effect=[True, False]),
|
|
):
|
|
result = checker(request, db, user_ctx)
|
|
assert result.id == test_store_user.id
|
|
|
|
def test_rejects_user_with_no_permissions(self, db, auth_manager, test_store_user):
|
|
"""User with none of the required permissions is rejected."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
checker = require_any_store_permission("dashboard.view", "reports.view")
|
|
request = _make_request("/store/TEST/dashboard")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "has_store_permission", return_value=False),
|
|
):
|
|
with pytest.raises(InsufficientStorePermissionsException):
|
|
checker(request, db, user_ctx)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestRequireAllStorePermissions:
|
|
"""Test require_all_store_permissions factory."""
|
|
|
|
def test_allows_user_with_all_permissions(self, db, auth_manager, test_store_user):
|
|
"""User with all required permissions passes."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
checker = require_all_store_permissions("products.view", "products.edit")
|
|
request = _make_request("/store/TEST/products/bulk")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "has_store_permission", return_value=True),
|
|
):
|
|
result = checker(request, db, user_ctx)
|
|
assert result.id == test_store_user.id
|
|
|
|
def test_rejects_user_missing_one_permission(self, db, auth_manager, test_store_user):
|
|
"""User missing any required permission is rejected."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
checker = require_all_store_permissions("products.view", "products.delete")
|
|
request = _make_request("/store/TEST/products/bulk")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
|
|
# First permission passes, second fails
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "has_store_permission", side_effect=[True, False]),
|
|
):
|
|
with pytest.raises(InsufficientStorePermissionsException):
|
|
checker(request, db, user_ctx)
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 7 remaining: get_user_store
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetUserStore:
|
|
"""Test get_user_store — verify store ownership/membership."""
|
|
|
|
def test_returns_store_for_owner(self, db, auth_manager):
|
|
"""Store owner gets access to their store."""
|
|
from app.modules.tenancy.models import Merchant, Store
|
|
|
|
user, merchant = _create_merchant_owner(db, auth_manager)
|
|
uid = uuid.uuid4().hex[:8].upper()
|
|
store = Store(
|
|
merchant_id=merchant.id,
|
|
store_code=f"GUS_{uid}",
|
|
subdomain=f"gus{uid.lower()}",
|
|
name=f"GetUserStore_{uid}",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(store)
|
|
db.commit()
|
|
db.refresh(store)
|
|
|
|
user_ctx = UserContext.from_user(user)
|
|
result = get_user_store(store.store_code, user_ctx, db)
|
|
assert result.id == store.id
|
|
|
|
def test_raises_for_nonexistent_store(self, db, auth_manager, test_store_user):
|
|
"""Non-existent store raises StoreNotFoundException."""
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
|
|
with pytest.raises(StoreNotFoundException):
|
|
get_user_store("NONEXISTENT_STORE_XYZ", user_ctx, db)
|
|
|
|
def test_raises_for_unauthorized_user(self, db, auth_manager):
|
|
"""User without access to store is rejected."""
|
|
from app.modules.tenancy.models import Merchant, Store
|
|
|
|
# Create store owned by a different user
|
|
owner_user, merchant = _create_merchant_owner(db, auth_manager)
|
|
uid = uuid.uuid4().hex[:8].upper()
|
|
store = Store(
|
|
merchant_id=merchant.id,
|
|
store_code=f"GUS2_{uid}",
|
|
subdomain=f"gus2{uid.lower()}",
|
|
name=f"OtherStore_{uid}",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
db.add(store)
|
|
db.commit()
|
|
db.refresh(store)
|
|
|
|
# Different user tries to access
|
|
uid2 = uuid.uuid4().hex[:8]
|
|
other_user = User(
|
|
email=f"other_{uid2}@example.com",
|
|
username=f"other_{uid2}",
|
|
hashed_password=auth_manager.hash_password("testpass123"),
|
|
role="merchant_owner",
|
|
is_active=True,
|
|
is_email_verified=True,
|
|
)
|
|
db.add(other_user)
|
|
db.commit()
|
|
db.refresh(other_user)
|
|
|
|
other_ctx = UserContext.from_user(other_user)
|
|
with pytest.raises(UnauthorizedStoreAccessException):
|
|
get_user_store(store.store_code, other_ctx, db)
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 2 (advanced): Platform Access & Context
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestRequirePlatformAccess:
|
|
"""Test require_platform_access factory."""
|
|
|
|
def test_super_admin_can_access_any_platform(self, db, auth_manager, test_super_admin):
|
|
"""Super admin (accessible_platform_ids=None) can access any platform."""
|
|
token_data = auth_manager.create_access_token(user=test_super_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/platforms/42/stores")
|
|
|
|
checker = require_platform_access(42)
|
|
result = checker(request, creds, None, db)
|
|
assert result.id == test_super_admin.id
|
|
assert result.accessible_platform_ids is None
|
|
|
|
def test_platform_admin_with_access(self, db, auth_manager, test_platform_admin):
|
|
"""Platform admin with matching platform_id is accepted."""
|
|
token_data = auth_manager.create_access_token(user=test_platform_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/platforms/1/stores")
|
|
|
|
checker = require_platform_access(1)
|
|
|
|
# Patch get_current_admin to return context with platform access
|
|
admin_ctx = UserContext.from_user(test_platform_admin)
|
|
admin_ctx.accessible_platform_ids = [1, 2, 3]
|
|
|
|
with patch(
|
|
"app.api.deps.get_current_admin_from_cookie_or_header",
|
|
return_value=admin_ctx,
|
|
):
|
|
result = checker(request, creds, None, db)
|
|
assert result.id == test_platform_admin.id
|
|
|
|
def test_platform_admin_without_access(self, db, auth_manager, test_platform_admin):
|
|
"""Platform admin without matching platform_id is rejected."""
|
|
token_data = auth_manager.create_access_token(user=test_platform_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/platforms/9999/stores")
|
|
|
|
checker = require_platform_access(9999)
|
|
|
|
admin_ctx = UserContext.from_user(test_platform_admin)
|
|
admin_ctx.accessible_platform_ids = [1, 2, 3]
|
|
|
|
with patch(
|
|
"app.api.deps.get_current_admin_from_cookie_or_header",
|
|
return_value=admin_ctx,
|
|
):
|
|
with pytest.raises(InsufficientPermissionsException, match="Access denied to platform"):
|
|
checker(request, creds, None, db)
|
|
|
|
def test_rejects_non_admin(self, db, auth_manager, test_store_user):
|
|
"""Non-admin user rejected before platform check."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/platforms/1/stores")
|
|
|
|
checker = require_platform_access(1)
|
|
with pytest.raises(AdminRequiredException):
|
|
checker(request, creds, None, db)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetAdminWithPlatformContext:
|
|
"""Test get_admin_with_platform_context."""
|
|
|
|
def test_super_admin_bypasses_platform_context(self, db, auth_manager, test_super_admin):
|
|
"""Super admin returns context without platform check."""
|
|
token_data = auth_manager.create_access_token(user=test_super_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/dashboard")
|
|
|
|
result = get_admin_with_platform_context(request, creds, None, db)
|
|
assert result.id == test_super_admin.id
|
|
assert result.is_super_admin is True
|
|
|
|
def test_rejects_non_admin(self, db, auth_manager, test_store_user):
|
|
"""Non-admin user rejected."""
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/dashboard")
|
|
|
|
with pytest.raises(AdminRequiredException):
|
|
get_admin_with_platform_context(request, creds, None, db)
|
|
|
|
def test_rejects_without_token(self, db):
|
|
"""No token raises InvalidTokenException."""
|
|
request = _make_request("/admin/dashboard")
|
|
|
|
with pytest.raises(InvalidTokenException):
|
|
get_admin_with_platform_context(request, None, None, db)
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 6 (advanced): Module & Menu Access Control
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestRequireModuleAccess:
|
|
"""Test require_module_access factory."""
|
|
|
|
def test_super_admin_bypasses_module_check(self, db, auth_manager, test_super_admin):
|
|
"""Super admin bypasses module enablement check entirely."""
|
|
from app.modules.enums import FrontendType
|
|
|
|
token_data = auth_manager.create_access_token(user=test_super_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/billing")
|
|
|
|
checker = require_module_access("billing", FrontendType.ADMIN)
|
|
result = checker(request, creds, None, None, None, db)
|
|
assert result.id == test_super_admin.id
|
|
|
|
def test_store_user_with_enabled_module(self, db, auth_manager, test_store_user):
|
|
"""Store user can access enabled module."""
|
|
from app.modules.enums import FrontendType
|
|
|
|
token_data = auth_manager.create_access_token(user=test_store_user)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/store/inventory")
|
|
request.state.store = None
|
|
|
|
checker = require_module_access("inventory", FrontendType.STORE)
|
|
# No platform context → access is allowed (module check requires platform)
|
|
result = checker(request, creds, None, token_data["access_token"], None, db)
|
|
assert result.id == test_store_user.id
|
|
|
|
def test_rejects_disabled_module(self, db, auth_manager, test_platform_admin):
|
|
"""Platform admin blocked when module is disabled."""
|
|
from app.modules.enums import FrontendType
|
|
|
|
token_data = auth_manager.create_access_token(user=test_platform_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/billing")
|
|
# Set platform context on request state
|
|
mock_platform = MagicMock()
|
|
mock_platform.id = 1
|
|
request.state.admin_platform = mock_platform
|
|
|
|
checker = require_module_access("billing", FrontendType.ADMIN)
|
|
with patch(
|
|
"app.modules.service.module_service.is_module_enabled", return_value=False
|
|
):
|
|
with pytest.raises(
|
|
InsufficientPermissionsException, match="not enabled"
|
|
):
|
|
checker(request, creds, None, None, None, db)
|
|
|
|
def test_allows_enabled_module(self, db, auth_manager, test_platform_admin):
|
|
"""Platform admin can access enabled module."""
|
|
from app.modules.enums import FrontendType
|
|
|
|
token_data = auth_manager.create_access_token(user=test_platform_admin)
|
|
creds = _make_credentials(token_data["access_token"])
|
|
request = _make_request("/admin/billing")
|
|
mock_platform = MagicMock()
|
|
mock_platform.id = 1
|
|
request.state.admin_platform = mock_platform
|
|
|
|
checker = require_module_access("billing", FrontendType.ADMIN)
|
|
with patch(
|
|
"app.modules.service.module_service.is_module_enabled", return_value=True
|
|
):
|
|
result = checker(request, creds, None, None, None, db)
|
|
assert result.id == test_platform_admin.id
|
|
|
|
def test_no_auth_raises(self, db):
|
|
"""No valid authentication raises InvalidTokenException."""
|
|
from app.modules.enums import FrontendType
|
|
|
|
request = _make_request("/admin/billing")
|
|
|
|
checker = require_module_access("billing", FrontendType.ADMIN)
|
|
with pytest.raises(InvalidTokenException, match="Authentication required"):
|
|
checker(request, None, None, None, None, db)
|
|
|
|
|
|
# ============================================================================
|
|
# Phase 7 remaining: get_user_permissions
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.auth
|
|
class TestGetUserPermissions:
|
|
"""Test get_user_permissions."""
|
|
|
|
def test_returns_empty_for_no_store_context(self, db, auth_manager, test_store_user):
|
|
"""Returns empty list if token has no store context."""
|
|
request = _make_request("/store/dashboard")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = None
|
|
|
|
result = get_user_permissions(request, db, user_ctx)
|
|
assert result == []
|
|
|
|
def test_owner_gets_all_permissions(self, db, auth_manager, test_store_user):
|
|
"""Store owner gets all available permissions."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
request = _make_request("/store/dashboard")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
|
|
all_perms = ["products.view", "products.edit", "orders.view", "orders.edit"]
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "is_owner_of", return_value=True),
|
|
patch(
|
|
"app.modules.tenancy.services.permission_discovery_service.permission_discovery_service.get_all_permission_ids",
|
|
return_value=all_perms,
|
|
),
|
|
):
|
|
result = get_user_permissions(request, db, user_ctx)
|
|
assert result == all_perms
|
|
|
|
def test_non_owner_gets_membership_permissions(self, db, auth_manager, test_store_user):
|
|
"""Non-owner team member gets permissions from their membership."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
request = _make_request("/store/dashboard")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
|
|
# Mock a store membership with specific permissions
|
|
mock_membership = MagicMock()
|
|
mock_membership.store_id = 1
|
|
mock_membership.is_active = True
|
|
mock_membership.get_all_permissions.return_value = ["products.view", "orders.view"]
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "is_owner_of", return_value=False),
|
|
patch.object(
|
|
User, "store_memberships", new_callable=lambda: property(lambda self: [mock_membership])
|
|
),
|
|
):
|
|
result = get_user_permissions(request, db, user_ctx)
|
|
assert result == ["products.view", "orders.view"]
|
|
|
|
def test_non_member_gets_empty_list(self, db, auth_manager, test_store_user):
|
|
"""Non-owner with no active membership gets empty list."""
|
|
mock_store = MagicMock()
|
|
mock_store.id = 1
|
|
mock_store.store_code = "TEST"
|
|
|
|
request = _make_request("/store/dashboard")
|
|
user_ctx = UserContext.from_user(test_store_user)
|
|
user_ctx.token_store_id = 1
|
|
|
|
with (
|
|
patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store),
|
|
patch.object(User, "is_owner_of", return_value=False),
|
|
patch.object(
|
|
User, "store_memberships", new_callable=lambda: property(lambda self: [])
|
|
),
|
|
):
|
|
result = get_user_permissions(request, db, user_ctx)
|
|
assert result == []
|