From 93731b717351d2cd04d068824b24b640eda6e308 Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Thu, 19 Feb 2026 17:38:31 +0100 Subject: [PATCH] test: add 73 unit tests for app/api/deps.py auth dependencies Cover all core authentication paths: helpers (_get_token_from_request, _validate_user_token, _get_user_model, _validate_customer_token), admin/store/merchant/customer auth (cookie + header + API variants), optional auth, store permission factories, and store ownership checks. Co-Authored-By: Claude Opus 4.6 --- .../test-api-deps-auth-dependencies.md | 261 ++++ tests/unit/api/__init__.py | 0 tests/unit/api/test_deps.py | 1302 +++++++++++++++++ 3 files changed, 1563 insertions(+) create mode 100644 docs/proposals/test-api-deps-auth-dependencies.md create mode 100644 tests/unit/api/__init__.py create mode 100644 tests/unit/api/test_deps.py diff --git a/docs/proposals/test-api-deps-auth-dependencies.md b/docs/proposals/test-api-deps-auth-dependencies.md new file mode 100644 index 00000000..91f38237 --- /dev/null +++ b/docs/proposals/test-api-deps-auth-dependencies.md @@ -0,0 +1,261 @@ +# Test Plan: app/api/deps.py — Authentication Dependencies + +**Date:** 2026-02-19 +**Status:** Planned +**Priority:** P0 — Security-critical, zero test coverage +**File under test:** `app/api/deps.py` (1,668 lines, 31 functions) + +## Why This File Is Critical + +`deps.py` is the **single entry point for all authentication and authorization** in the application. Every protected route depends on it. It enforces: + +- Role isolation (admins can't access store routes, stores can't access admin routes) +- Cookie path restrictions (prevent cross-context token leakage) +- Token priority (Authorization header > cookie) +- Platform access control for platform admins +- Store permission checks (owner, team member, specific permissions) +- Merchant ownership verification +- Customer token validation with store-matching +- Module and menu-based access control + +A bug here is a security vulnerability. Zero test coverage is unacceptable. + +## Current State + +- **Existing test coverage:** None for deps.py directly +- **Related tests:** `tests/integration/security/test_authentication.py` (3 tests — basic endpoint protection only) +- **Related tests:** `tests/unit/models/schema/test_auth.py` (tests UserContext schema) + +## Test Structure + +Tests will live at: `tests/unit/api/test_deps.py` + +This follows the project convention of matching source layout (`app/api/deps.py` → `tests/unit/api/test_deps.py`). + +## Functions To Test (31 total) + +### Phase 1: Helper Functions (4 functions) + +These are pure/near-pure functions — easiest to test, highest value as foundation. + +| Function | Lines | What it does | Test count | +|----------|-------|-------------|------------| +| `_get_token_from_request()` | 76–105 | Extract token from header or cookie, header takes priority | 4 | +| `_validate_user_token()` | 108–123 | Validate JWT, return User model | 3 | +| `_get_user_model()` | 126–155 | Load User from DB by UserContext.id, copy token attrs | 3 | +| `_validate_customer_token()` | 1026–1114 | Decode customer JWT, verify type/expiry/store match | 7 | + +**Tests for `_get_token_from_request`:** +- Returns (token, "header") when Authorization header present +- Returns (token, "cookie") when only cookie present +- Header takes priority over cookie when both present +- Returns (None, None) when neither present + +**Tests for `_validate_user_token`:** +- Returns User for valid token +- Raises InvalidTokenException for invalid/expired token +- Raises InvalidTokenException for token with non-existent user + +**Tests for `_get_user_model`:** +- Returns User model with token attributes copied +- Raises InvalidTokenException when user not in DB +- Copies token_store_id, token_store_code, token_store_role from context + +**Tests for `_validate_customer_token`:** +- Returns CustomerContext for valid customer token +- Rejects token with wrong type (not "customer") +- Rejects token with missing "sub" claim +- Rejects expired token +- Rejects token for non-existent customer +- Rejects token for inactive customer +- Rejects token with store_id mismatch (cross-store attack) + +### Phase 2: Admin Authentication (6 functions) + +| Function | Lines | What it does | Test count | +|----------|-------|-------------|------------| +| `get_current_admin_from_cookie_or_header()` | 163–209 | Admin auth via cookie or header | 4 | +| `get_current_admin_api()` | 212–242 | Admin auth via header only (CSRF-safe) | 3 | +| `get_current_super_admin()` | 250–283 | Require super admin role | 3 | +| `get_current_super_admin_api()` | 286–312 | Super admin, header only | 2 | +| `require_platform_access()` | 315–357 | Factory: platform-specific admin access | 4 | +| `get_admin_with_platform_context()` | 360–430 | Admin with platform from JWT | 4 | + +**Key tests:** +- Valid admin token accepted (cookie and header) +- Non-admin role rejected with AdminRequiredException +- No token raises InvalidTokenException +- Super admin check: platform admin rejected +- Platform access: super admin bypasses, platform admin checked against accessible_platform_ids +- Platform context: platform_id extracted from token, stored in request.state + +### Phase 3: Store Authentication (2 functions) + +| Function | Lines | What it does | Test count | +|----------|-------|-------------|------------| +| `get_current_store_from_cookie_or_header()` | 665–720 | Store auth via cookie or header | 5 | +| `get_current_store_api()` | 723–780 | Store API auth, validates store context | 5 | + +**Key tests:** +- Valid store token accepted +- **Admin blocked from store routes** (critical security boundary) +- Customer blocked from store routes +- No token raises InvalidTokenException +- Store API requires token_store_id claim +- Store API verifies user still member of store (revocation check) + +### Phase 4: Merchant Authentication (5 functions) + +| Function | Lines | What it does | Test count | +|----------|-------|-------------|------------| +| `get_current_merchant_from_cookie_or_header()` | 788–848 | Merchant auth, verifies ownership | 4 | +| `get_current_merchant_api()` | 851–896 | Merchant API auth | 3 | +| `get_current_merchant_optional()` | 899–940 | Returns None if not authenticated | 3 | +| `get_merchant_for_current_user()` | 943–979 | Load Merchant object for API user | 3 | +| `get_merchant_for_current_user_page()` | 982–1018 | Load Merchant object for page user | 2 | + +**Key tests:** +- Valid merchant owner accepted +- User without active merchants rejected +- Optional variant returns None on failure (no exception) +- Merchant object stored in request.state + +### Phase 5: Customer Authentication (2 functions) + +| Function | Lines | What it does | Test count | +|----------|-------|-------------|------------| +| `get_current_customer_from_cookie_or_header()` | 1117–1156 | Customer auth via cookie or header | 3 | +| `get_current_customer_api()` | 1159–1185 | Customer API auth | 2 | + +**Key tests:** +- Valid customer token accepted +- No token raises InvalidTokenException +- API variant requires Authorization header + +### Phase 6: Access Control (6 functions) + +| Function | Lines | What it does | Test count | +|----------|-------|-------------|------------| +| `require_module_access()` | 438–538 | Factory: module enablement check | 4 | +| `require_menu_access()` | 546–657 | Factory: menu visibility check | 4 | +| `require_store_permission()` | 1278–1324 | Factory: specific permission check | 4 | +| `require_store_owner()` | 1327–1369 | Require store owner role | 3 | +| `require_any_store_permission()` | 1372–1425 | Factory: ANY of N permissions | 3 | +| `require_all_store_permissions()` | 1428–1483 | Factory: ALL of N permissions | 3 | + +**Key tests:** +- Module disabled → InsufficientPermissionsException +- Super admin bypasses module check +- Store owner has all permissions +- Team member checked against specific permissions +- Missing permission raises InsufficientStorePermissionsException + +### Phase 7: Optional Auth & Utilities (4 functions) + +| Function | Lines | What it does | Test count | +|----------|-------|-------------|------------| +| `get_current_admin_optional()` | 1535–1579 | Returns None on failure | 3 | +| `get_current_store_optional()` | 1582–1626 | Returns None on failure | 3 | +| `get_current_customer_optional()` | 1629–1667 | Returns None on failure | 3 | +| `get_user_permissions()` | 1486–1527 | List all permissions for user in store | 3 | +| `get_user_store()` | 1225–1270 | Verify store ownership/membership | 3 | + +**Key tests:** +- Optional variants return None (not raise) when token invalid +- Optional variants return context when token valid +- get_user_permissions returns all permissions for owner, specific for team member + +## Total Test Count + +| Phase | Functions | Tests | +|-------|----------|-------| +| 1. Helpers | 4 | 17 | +| 2. Admin auth | 6 | 20 | +| 3. Store auth | 2 | 10 | +| 4. Merchant auth | 5 | 15 | +| 5. Customer auth | 2 | 5 | +| 6. Access control | 6 | 21 | +| 7. Optional & utils | 5 | 15 | +| **Total** | **30** | **~103** | + +## Test Approach + +### Unit vs Integration + +These will be **unit tests** with mocked dependencies: + +- `db` → SQLAlchemy session with test data (using existing `db` fixture) +- `auth_manager` → Use the real AuthManager but with test users/tokens (existing `auth_fixtures.py`) +- `request` → Mock FastAPI Request with `request.state`, `request.url.path` +- FastAPI `Depends()` → Call functions directly, passing dependencies explicitly + +This avoids the overhead of full HTTP request cycles and tests the logic in isolation. + +### Fixtures Needed + +Most already exist in `tests/fixtures/auth_fixtures.py`: + +- `test_admin` — admin user (is_super_admin=True) +- `test_platform_admin` — platform admin (is_super_admin=False) +- `test_store_user` — store-role user +- `test_user` — regular user + +**New fixtures to add in the test file:** + +- `mock_request` — Mock Request with configurable state and url.path +- `test_customer` — Customer model for customer auth tests +- `test_merchant` — Merchant model for merchant auth tests +- `test_store` — Store with platform association +- `admin_token` — JWT token for admin user +- `store_token` — JWT token with store context claims +- `customer_token` — Customer JWT token +- `merchant_token` — JWT token for merchant owner + +## Execution Order + +1. **Phase 1 first** — helper functions have no dependencies on other deps.py functions +2. **Phase 2–5 next** — auth functions build on helpers +3. **Phase 6–7 last** — access control builds on auth functions + +Phases 2–5 can be done in parallel. Phase 6 depends on 2–3 being done. + +## Verification + +```bash +# Run just the deps tests +python -m pytest tests/unit/api/test_deps.py -v --timeout=60 + +# Run with coverage +python -m pytest tests/unit/api/test_deps.py -v --cov=app.api.deps --cov-report=term-missing +``` + +## Corrected Coverage Summary + +The earlier 360 analysis incorrectly reported module service coverage at 10%. The actual numbers (after finding tests in `app/modules/*/tests/`): + +| Module | Services | Unit Tests | Integration Tests | Coverage | +|--------|----------|-----------|-------------------|----------| +| billing | 7 | 8 | 4 | **100%+** | +| catalog | 4 | 4 | 0 | **100%** | +| checkout | 1 | 1 | 0 | **100%** | +| cms | 4 | 4 | 0 | **100%** | +| core | 6 | 5 | 0 | **83%** | +| customers | 3 | 3 | 0 | **100%** | +| dev_tools | 2 | 2 | 0 | **100%** | +| inventory | 3 | 3 | 0 | **100%** | +| loyalty | 8 | 8 | 1 | **100%** | +| marketplace | 5 | 8 | 0 | **100%+** | +| messaging | 6 | 5 | 0 | **83%** | +| monitoring | 6 | 6 | 0 | **100%** | +| orders | 6 | 5 | 0 | **83%** | +| payments | 2 | 2 | 0 | **100%** | +| tenancy | 11 | 17 | 6 | **100%+** | +| cart | 1 | 1 | 0 | **100%** | +| analytics | 1 | 0 | 0 | **0%** | + +**Remaining gaps (non-service):** +- **Models:** 12 modules have no model tests +- **Schemas:** 12 modules have no schema tests +- **Routes:** 14 modules have no route/integration tests +- **Tasks:** 0 modules have task tests (5 modules have tasks) +- **Framework:** `app/api/deps.py`, `middleware/auth.py`, `app/exceptions/`, `app/handlers/stripe_webhook.py` have no tests diff --git a/tests/unit/api/__init__.py b/tests/unit/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/api/test_deps.py b/tests/unit/api/test_deps.py new file mode 100644 index 00000000..89396dbd --- /dev/null +++ b/tests/unit/api/test_deps.py @@ -0,0 +1,1302 @@ +# tests/unit/api/test_deps.py +""" +Unit tests for app/api/deps.py — Authentication dependencies. + +Tests the helper functions and auth dependency logic that every protected +route in the application relies on. +""" + +import uuid +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.security import HTTPAuthorizationCredentials + +from app.api.deps import ( + _get_token_from_request, + _get_user_model, + _validate_customer_token, + _validate_user_token, + get_current_admin_api, + get_current_admin_from_cookie_or_header, + get_current_admin_optional, + get_current_customer_api, + get_current_customer_from_cookie_or_header, + get_current_customer_optional, + get_current_merchant_api, + get_current_merchant_from_cookie_or_header, + get_current_merchant_optional, + get_current_store_api, + get_current_store_from_cookie_or_header, + get_current_store_optional, + get_current_super_admin, + get_current_super_admin_api, + get_user_store, + require_all_store_permissions, + require_any_store_permission, + require_store_owner, + require_store_permission, +) +from app.modules.tenancy.exceptions import ( + AdminRequiredException, + InsufficientPermissionsException, + InsufficientStorePermissionsException, + InvalidTokenException, + StoreNotFoundException, + StoreOwnerOnlyException, + UnauthorizedStoreAccessException, +) +from app.modules.tenancy.models import User +from middleware.auth import AuthManager +from models.schema.auth import UserContext + +# ============================================================================ +# Fixtures +# ============================================================================ + + +def _make_credentials(token: str) -> HTTPAuthorizationCredentials: + """Helper to create Bearer credentials.""" + return HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) + + +def _make_request(path: str = "/test") -> MagicMock: + """Helper to create a mock FastAPI Request.""" + request = MagicMock() + request.url.path = path + request.state = MagicMock() + return request + + +# ============================================================================ +# Phase 1: Helper Functions +# ============================================================================ + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetTokenFromRequest: + """Test _get_token_from_request helper.""" + + def test_returns_header_token_when_present(self): + """Authorization header token is returned with source 'header'.""" + creds = _make_credentials("header_token_123") + token, source = _get_token_from_request(creds, None, "admin_token", "/admin/test") + assert token == "header_token_123" + assert source == "header" + + def test_returns_cookie_token_when_no_header(self): + """Cookie token is returned when no Authorization header.""" + token, source = _get_token_from_request(None, "cookie_token_456", "admin_token", "/admin/test") + assert token == "cookie_token_456" + assert source == "cookie" + + def test_header_takes_priority_over_cookie(self): + """Authorization header takes priority when both present.""" + creds = _make_credentials("header_token") + token, source = _get_token_from_request(creds, "cookie_token", "admin_token", "/admin/test") + assert token == "header_token" + assert source == "header" + + def test_returns_none_when_neither_present(self): + """Returns (None, None) when no token source available.""" + token, source = _get_token_from_request(None, None, "admin_token", "/admin/test") + assert token is None + assert source is None + + +@pytest.mark.unit +@pytest.mark.auth +class TestValidateUserToken: + """Test _validate_user_token helper.""" + + def test_returns_user_for_valid_token(self, db, auth_manager, test_admin): + """Valid JWT token returns the corresponding User model.""" + token_data = auth_manager.create_access_token(user=test_admin) + user = _validate_user_token(token_data["access_token"], db) + assert user.id == test_admin.id + assert user.username == test_admin.username + + def test_raises_for_invalid_token(self, db): + """Invalid JWT token raises InvalidTokenException.""" + with pytest.raises(Exception): # AuthManager raises on invalid token + _validate_user_token("completely_invalid_jwt_token", db) + + def test_raises_for_expired_token(self, db, test_admin): + """Expired JWT token is rejected.""" + # Create a token that's already expired by patching the auth manager + am = AuthManager() + # Generate token with negative expiry + from jose import jwt as jose_jwt + + payload = { + "sub": str(test_admin.id), + "username": test_admin.username, + "role": test_admin.role, + "exp": datetime.now(UTC) - timedelta(hours=1), + } + expired_token = jose_jwt.encode(payload, am.secret_key, algorithm=am.algorithm) + with pytest.raises(Exception): + _validate_user_token(expired_token, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetUserModel: + """Test _get_user_model helper.""" + + def test_returns_user_model_from_context(self, db, test_admin): + """Loads User model from database using UserContext.id.""" + context = UserContext.from_user(test_admin, include_store_context=False) + user = _get_user_model(context, db) + assert user.id == test_admin.id + assert user.username == test_admin.username + + def test_copies_token_attributes_to_model(self, db, test_store_user): + """Token store attributes from context are copied to the User model.""" + context = UserContext.from_user(test_store_user) + # Manually set token attributes on context + context.token_store_id = 42 + context.token_store_code = "TESTSTORE" + context.token_store_role = "owner" + + user = _get_user_model(context, db) + assert user.token_store_id == 42 + assert user.token_store_code == "TESTSTORE" + assert user.token_store_role == "owner" + + def test_raises_for_nonexistent_user(self, db): + """Raises InvalidTokenException when user ID doesn't exist in DB.""" + fake_context = MagicMock() + fake_context.id = 999999 + fake_context.token_store_id = None + fake_context.token_store_code = None + fake_context.token_store_role = None + + with pytest.raises(InvalidTokenException): + _get_user_model(fake_context, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestValidateCustomerToken: + """Test _validate_customer_token helper.""" + + def _make_customer_token(self, auth_manager, customer_id, store_id=None, token_type="customer", expired=False): + """Helper to generate a customer JWT token.""" + from jose import jwt as jose_jwt + + exp = datetime.now(UTC) + (timedelta(hours=-1) if expired else timedelta(hours=1)) + payload = { + "sub": str(customer_id), + "type": token_type, + "store_id": store_id, + "exp": exp, + } + return jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm) + + def _create_test_customer(self, db, email_prefix="customer", is_active=True): + """Helper to create a customer with the required store FK.""" + from app.modules.customers.models.customer import Customer + from app.modules.tenancy.models import Merchant, Store, User + + uid = uuid.uuid4().hex[:8] + # Customer requires store_id FK — create minimal user + merchant + store + owner = User( + email=f"owner_{uid}@test.com", + username=f"owner_{uid}", + hashed_password="not_a_real_hash", + role="store", + is_active=True, + ) + db.add(owner) + db.flush() + merchant = Merchant( + name=f"CustTestMerchant_{uid}", + owner_user_id=owner.id, + contact_email=f"m_{uid}@test.com", + is_active=True, + is_verified=True, + ) + db.add(merchant) + db.flush() + store = Store( + merchant_id=merchant.id, + store_code=f"CT_{uid}".upper(), + subdomain=f"ct{uid}", + name=f"CustTestStore_{uid}", + is_active=True, + is_verified=True, + ) + db.add(store) + db.flush() + + customer = Customer( + store_id=store.id, + email=f"{email_prefix}_{uid}@example.com", + hashed_password="not_a_real_hash", # noqa: SEC001 + first_name="Test", + last_name="Customer", + customer_number=f"CUST_{uid}", + is_active=is_active, + ) + db.add(customer) + db.commit() + db.refresh(customer) + return customer, store + + def test_valid_customer_token_returns_context(self, db, auth_manager): + """Valid customer token returns CustomerContext.""" + customer, _store = self._create_test_customer(db) + + token = self._make_customer_token(auth_manager, customer.id) + request = _make_request("/storefront/account") + request.state.store = None + + result = _validate_customer_token(token, request, db) + assert result.id == customer.id + assert result.email == customer.email + + def test_rejects_non_customer_token_type(self, db, auth_manager): + """Token with type != 'customer' is rejected.""" + token = self._make_customer_token(auth_manager, 1, token_type="user") + request = _make_request() + request.state.store = None + + with pytest.raises(InvalidTokenException, match="Customer authentication required"): + _validate_customer_token(token, request, db) + + def test_rejects_token_missing_sub(self, db, auth_manager): + """Token without 'sub' claim is rejected.""" + from jose import jwt as jose_jwt + + payload = { + "type": "customer", + "exp": datetime.now(UTC) + timedelta(hours=1), + } + token = jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm) + request = _make_request() + request.state.store = None + + with pytest.raises(InvalidTokenException, match="Invalid token"): + _validate_customer_token(token, request, db) + + def test_rejects_expired_customer_token(self, db, auth_manager): + """Expired customer token is rejected (caught by jose as ExpiredSignatureError).""" + customer, _store = self._create_test_customer(db) + + token = self._make_customer_token(auth_manager, customer.id, expired=True) + request = _make_request() + request.state.store = None + + with pytest.raises(InvalidTokenException, match="Could not validate credentials"): + _validate_customer_token(token, request, db) + + def test_rejects_nonexistent_customer(self, db, auth_manager): + """Token for non-existent customer is rejected.""" + token = self._make_customer_token(auth_manager, 999999) + request = _make_request() + request.state.store = None + + with pytest.raises(InvalidTokenException, match="Customer not found"): + _validate_customer_token(token, request, db) + + def test_rejects_inactive_customer(self, db, auth_manager): + """Token for inactive customer is rejected.""" + customer, _store = self._create_test_customer(db, is_active=False) + + token = self._make_customer_token(auth_manager, customer.id) + request = _make_request() + request.state.store = None + + with pytest.raises(InvalidTokenException, match="inactive"): + _validate_customer_token(token, request, db) + + def test_rejects_store_mismatch(self, db, auth_manager): + """Token with store_id that doesn't match request store is rejected.""" + from app.modules.tenancy.exceptions import UnauthorizedStoreAccessException + + customer, store = self._create_test_customer(db) + + # Token has the real store_id, but request store has a different id + token = self._make_customer_token(auth_manager, customer.id, store_id=store.id) + request = _make_request() + mock_store = MagicMock() + mock_store.id = store.id + 9999 # different store + mock_store.store_code = "WRONG" + request.state.store = mock_store + + with pytest.raises(UnauthorizedStoreAccessException): + _validate_customer_token(token, request, db) + + +# ============================================================================ +# Phase 2: Admin Authentication +# ============================================================================ + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentAdminFromCookieOrHeader: + """Test get_current_admin_from_cookie_or_header.""" + + def test_valid_admin_via_header(self, db, auth_manager, test_admin): + """Admin user with valid header token returns UserContext.""" + token_data = auth_manager.create_access_token(user=test_admin) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/admin/dashboard") + + result = get_current_admin_from_cookie_or_header(request, creds, None, db) + assert result.id == test_admin.id + assert result.role == "admin" + + def test_valid_admin_via_cookie(self, db, auth_manager, test_admin): + """Admin user with valid cookie token returns UserContext.""" + token_data = auth_manager.create_access_token(user=test_admin) + request = _make_request("/admin/dashboard") + + result = get_current_admin_from_cookie_or_header( + request, None, token_data["access_token"], db + ) + assert result.id == test_admin.id + + def test_rejects_non_admin_user(self, db, auth_manager, test_store_user): + """Store user attempting admin route raises AdminRequiredException.""" + token_data = auth_manager.create_access_token(user=test_store_user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/admin/dashboard") + + with pytest.raises(AdminRequiredException): + get_current_admin_from_cookie_or_header(request, creds, None, db) + + def test_raises_without_token(self, db): + """No token raises InvalidTokenException.""" + request = _make_request("/admin/dashboard") + + with pytest.raises(InvalidTokenException): + get_current_admin_from_cookie_or_header(request, None, None, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentAdminApi: + """Test get_current_admin_api (header-only, CSRF-safe).""" + + def test_valid_admin_header(self, db, auth_manager, test_admin): + """Valid admin token via header returns UserContext.""" + token_data = auth_manager.create_access_token(user=test_admin) + creds = _make_credentials(token_data["access_token"]) + + result = get_current_admin_api(creds, db) + assert result.id == test_admin.id + assert result.role == "admin" + + def test_rejects_non_admin(self, db, auth_manager, test_store_user): + """Non-admin user rejected with AdminRequiredException.""" + token_data = auth_manager.create_access_token(user=test_store_user) + creds = _make_credentials(token_data["access_token"]) + + with pytest.raises(AdminRequiredException): + get_current_admin_api(creds, db) + + def test_rejects_missing_credentials(self, db): + """Missing credentials raises InvalidTokenException.""" + with pytest.raises(InvalidTokenException): + get_current_admin_api(None, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentSuperAdmin: + """Test get_current_super_admin.""" + + def test_super_admin_accepted(self, db, auth_manager, test_super_admin): + """Super admin user passes the check.""" + token_data = auth_manager.create_access_token(user=test_super_admin) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/admin/settings") + + result = get_current_super_admin(request, creds, None, db) + assert result.is_super_admin is True + + def test_platform_admin_rejected(self, db, auth_manager, test_platform_admin): + """Platform admin (not super) is rejected.""" + token_data = auth_manager.create_access_token(user=test_platform_admin) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/admin/settings") + + with pytest.raises(AdminRequiredException, match="Super admin"): + get_current_super_admin(request, creds, None, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentSuperAdminApi: + """Test get_current_super_admin_api (header-only).""" + + def test_super_admin_accepted(self, db, auth_manager, test_super_admin): + """Super admin via API header passes.""" + token_data = auth_manager.create_access_token(user=test_super_admin) + creds = _make_credentials(token_data["access_token"]) + + result = get_current_super_admin_api(creds, db) + assert result.is_super_admin is True + + def test_platform_admin_rejected(self, db, auth_manager, test_platform_admin): + """Platform admin rejected via API.""" + token_data = auth_manager.create_access_token(user=test_platform_admin) + creds = _make_credentials(token_data["access_token"]) + + with pytest.raises(AdminRequiredException, match="Super admin"): + get_current_super_admin_api(creds, db) + + +# ============================================================================ +# Phase 3: Store Authentication +# ============================================================================ + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentStoreFromCookieOrHeader: + """Test get_current_store_from_cookie_or_header.""" + + def test_valid_store_user_via_header(self, db, auth_manager, test_store_user): + """Store user with valid header token returns UserContext.""" + token_data = auth_manager.create_access_token(user=test_store_user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/store/dashboard") + + result = get_current_store_from_cookie_or_header(request, creds, None, db) + assert result.id == test_store_user.id + assert result.role == "store" + + def test_valid_store_user_via_cookie(self, db, auth_manager, test_store_user): + """Store user with valid cookie token returns UserContext.""" + token_data = auth_manager.create_access_token(user=test_store_user) + request = _make_request("/store/dashboard") + + result = get_current_store_from_cookie_or_header( + request, None, token_data["access_token"], db + ) + assert result.id == test_store_user.id + + def test_admin_blocked_from_store_routes(self, db, auth_manager, test_admin): + """Admin user is explicitly blocked from store routes.""" + token_data = auth_manager.create_access_token(user=test_admin) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/store/dashboard") + + with pytest.raises(Exception, match="Store access only"): + get_current_store_from_cookie_or_header(request, creds, None, db) + + def test_raises_without_token(self, db): + """No token raises InvalidTokenException.""" + request = _make_request("/store/dashboard") + + with pytest.raises(InvalidTokenException): + get_current_store_from_cookie_or_header(request, None, None, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentStoreApi: + """Test get_current_store_api (header-only, validates store context).""" + + def test_valid_store_user_with_store_context(self, db, auth_manager, test_store_user): + """Store user with store context in token returns UserContext.""" + # Create token with store context + token_data = auth_manager.create_access_token( + user=test_store_user, + store_id=1, + store_code="TEST", + store_role="owner", + ) + creds = _make_credentials(token_data["access_token"]) + + # Mock is_member_of to return True + with patch.object(User, "is_member_of", return_value=True): + result = get_current_store_api(creds, db) + assert result.id == test_store_user.id + + def test_rejects_missing_credentials(self, db): + """Missing credentials raises InvalidTokenException.""" + with pytest.raises(InvalidTokenException): + get_current_store_api(None, db) + + def test_admin_blocked(self, db, auth_manager, test_admin): + """Admin user blocked from store API.""" + token_data = auth_manager.create_access_token(user=test_admin) + creds = _make_credentials(token_data["access_token"]) + + with pytest.raises(Exception, match="Store access only"): + get_current_store_api(creds, db) + + +# ============================================================================ +# Phase 7: Optional Authentication +# ============================================================================ + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentAdminOptional: + """Test get_current_admin_optional — returns None instead of raising.""" + + def test_returns_context_for_valid_admin(self, db, auth_manager, test_admin): + """Valid admin token returns UserContext.""" + token_data = auth_manager.create_access_token(user=test_admin) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/admin/login") + + result = get_current_admin_optional(request, creds, None, db) + assert result is not None + assert result.id == test_admin.id + + def test_returns_none_without_token(self, db): + """No token returns None (not exception).""" + request = _make_request("/admin/login") + result = get_current_admin_optional(request, None, None, db) + assert result is None + + def test_returns_none_for_invalid_token(self, db): + """Invalid token returns None (not exception).""" + creds = _make_credentials("bad_token") + request = _make_request("/admin/login") + result = get_current_admin_optional(request, creds, None, db) + assert result is None + + def test_returns_none_for_non_admin(self, db, auth_manager, test_store_user): + """Store user token returns None (not admin).""" + token_data = auth_manager.create_access_token(user=test_store_user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/admin/login") + + result = get_current_admin_optional(request, creds, None, db) + assert result is None + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentStoreOptional: + """Test get_current_store_optional — returns None instead of raising.""" + + def test_returns_context_for_valid_store_user(self, db, auth_manager, test_store_user): + """Valid store token returns UserContext.""" + token_data = auth_manager.create_access_token(user=test_store_user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/store/login") + + result = get_current_store_optional(request, creds, None, db) + assert result is not None + assert result.id == test_store_user.id + + def test_returns_none_without_token(self, db): + """No token returns None.""" + request = _make_request("/store/login") + result = get_current_store_optional(request, None, None, db) + assert result is None + + def test_returns_none_for_admin(self, db, auth_manager, test_admin): + """Admin user token returns None (not store role).""" + token_data = auth_manager.create_access_token(user=test_admin) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/store/login") + + result = get_current_store_optional(request, creds, None, db) + assert result is None + + +# ============================================================================ +# Phase 4: Merchant Authentication +# ============================================================================ + + +def _create_merchant_owner(db, auth_manager): + """Create a user who owns an active merchant.""" + uid = uuid.uuid4().hex[:8] + from app.modules.tenancy.models import Merchant + + user = User( + email=f"merchant_{uid}@example.com", + username=f"merchant_{uid}", + hashed_password=auth_manager.hash_password("testpass123"), + role="store", + is_active=True, + is_email_verified=True, + ) + db.add(user) + db.flush() + + merchant = Merchant( + name=f"Merchant_{uid}", + owner_user_id=user.id, + contact_email=f"m_{uid}@merchant.com", + is_active=True, + is_verified=True, + ) + db.add(merchant) + db.commit() + db.refresh(user) + db.refresh(merchant) + return user, merchant + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentMerchantFromCookieOrHeader: + """Test get_current_merchant_from_cookie_or_header.""" + + def test_valid_merchant_owner_via_header(self, db, auth_manager): + """User who owns a merchant is accepted.""" + user, _merchant = _create_merchant_owner(db, auth_manager) + token_data = auth_manager.create_access_token(user=user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/merchants/dashboard") + + result = get_current_merchant_from_cookie_or_header(request, creds, None, db) + assert result.id == user.id + + def test_valid_merchant_owner_via_cookie(self, db, auth_manager): + """Merchant owner accepted via cookie token.""" + user, _merchant = _create_merchant_owner(db, auth_manager) + token_data = auth_manager.create_access_token(user=user) + request = _make_request("/merchants/dashboard") + + result = get_current_merchant_from_cookie_or_header( + request, None, token_data["access_token"], db + ) + assert result.id == user.id + + def test_rejects_user_without_merchants(self, db, auth_manager, test_store_user): + """User who owns no merchants is rejected.""" + token_data = auth_manager.create_access_token(user=test_store_user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/merchants/dashboard") + + with pytest.raises(InsufficientPermissionsException, match="Merchant owner"): + get_current_merchant_from_cookie_or_header(request, creds, None, db) + + def test_raises_without_token(self, db): + """No token raises InvalidTokenException.""" + request = _make_request("/merchants/dashboard") + + with pytest.raises(InvalidTokenException): + get_current_merchant_from_cookie_or_header(request, None, None, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentMerchantApi: + """Test get_current_merchant_api (header-only).""" + + def test_valid_merchant_owner_header(self, db, auth_manager): + """Merchant owner accepted via API header.""" + user, _merchant = _create_merchant_owner(db, auth_manager) + token_data = auth_manager.create_access_token(user=user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/api/merchants") + + result = get_current_merchant_api(request, creds, db) + assert result.id == user.id + + def test_rejects_non_merchant_owner(self, db, auth_manager, test_store_user): + """Non-merchant-owner user rejected.""" + token_data = auth_manager.create_access_token(user=test_store_user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/api/merchants") + + with pytest.raises(InsufficientPermissionsException, match="Merchant owner"): + get_current_merchant_api(request, creds, db) + + def test_rejects_missing_credentials(self, db): + """Missing credentials raises InvalidTokenException.""" + request = _make_request("/api/merchants") + + with pytest.raises(InvalidTokenException): + get_current_merchant_api(request, None, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentMerchantOptional: + """Test get_current_merchant_optional — returns None instead of raising.""" + + def test_returns_context_for_merchant_owner(self, db, auth_manager): + """Valid merchant owner returns UserContext.""" + user, _merchant = _create_merchant_owner(db, auth_manager) + token_data = auth_manager.create_access_token(user=user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/merchants/login") + + result = get_current_merchant_optional(request, creds, None, db) + assert result is not None + assert result.id == user.id + + def test_returns_none_without_token(self, db): + """No token returns None.""" + request = _make_request("/merchants/login") + result = get_current_merchant_optional(request, None, None, db) + assert result is None + + def test_returns_none_for_non_owner(self, db, auth_manager, test_store_user): + """User without merchants returns None.""" + token_data = auth_manager.create_access_token(user=test_store_user) + creds = _make_credentials(token_data["access_token"]) + request = _make_request("/merchants/login") + + result = get_current_merchant_optional(request, creds, None, db) + assert result is None + + +# ============================================================================ +# Phase 5: Customer Authentication Endpoints +# ============================================================================ + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentCustomerFromCookieOrHeader: + """Test get_current_customer_from_cookie_or_header.""" + + def _make_customer_token(self, auth_manager, customer_id, store_id=None): + """Helper to generate a customer JWT token.""" + from jose import jwt as jose_jwt + + payload = { + "sub": str(customer_id), + "type": "customer", + "store_id": store_id, + "exp": datetime.now(UTC) + timedelta(hours=1), + } + return jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm) + + def _create_test_customer(self, db): + """Create a customer with required store FK.""" + from app.modules.customers.models.customer import Customer + from app.modules.tenancy.models import Merchant, Store + + uid = uuid.uuid4().hex[:8] + owner = User( + email=f"csowner_{uid}@test.com", + username=f"csowner_{uid}", + hashed_password="not_a_real_hash", + role="store", + is_active=True, + ) + db.add(owner) + db.flush() + merchant = Merchant( + name=f"CSMerchant_{uid}", + owner_user_id=owner.id, + contact_email=f"csm_{uid}@test.com", + is_active=True, + is_verified=True, + ) + db.add(merchant) + db.flush() + store = Store( + merchant_id=merchant.id, + store_code=f"CS_{uid}".upper(), + subdomain=f"cs{uid}", + name=f"CSStore_{uid}", + is_active=True, + is_verified=True, + ) + db.add(store) + db.flush() + customer = Customer( + store_id=store.id, + email=f"cust_{uid}@example.com", + hashed_password="not_a_real_hash", # noqa: SEC001 + first_name="Test", + last_name="Customer", + customer_number=f"CUST_{uid}", + is_active=True, + ) + db.add(customer) + db.commit() + db.refresh(customer) + return customer + + def test_valid_customer_via_header(self, db, auth_manager): + """Valid customer token via header returns CustomerContext.""" + customer = self._create_test_customer(db) + token = self._make_customer_token(auth_manager, customer.id) + creds = _make_credentials(token) + request = _make_request("/storefront/account") + request.state.store = None + + result = get_current_customer_from_cookie_or_header(request, creds, None, db) + assert result.id == customer.id + + def test_valid_customer_via_cookie(self, db, auth_manager): + """Valid customer token via cookie returns CustomerContext.""" + customer = self._create_test_customer(db) + token = self._make_customer_token(auth_manager, customer.id) + request = _make_request("/storefront/account") + request.state.store = None + + result = get_current_customer_from_cookie_or_header(request, None, token, db) + assert result.id == customer.id + + def test_raises_without_token(self, db): + """No token raises InvalidTokenException.""" + request = _make_request("/storefront/account") + request.state.store = None + + with pytest.raises(InvalidTokenException, match="Customer authentication required"): + get_current_customer_from_cookie_or_header(request, None, None, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentCustomerApi: + """Test get_current_customer_api (header-only).""" + + def _make_customer_token(self, auth_manager, customer_id): + from jose import jwt as jose_jwt + + payload = { + "sub": str(customer_id), + "type": "customer", + "store_id": None, + "exp": datetime.now(UTC) + timedelta(hours=1), + } + return jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm) + + def _create_test_customer(self, db): + from app.modules.customers.models.customer import Customer + from app.modules.tenancy.models import Merchant, Store + + uid = uuid.uuid4().hex[:8] + owner = User( + email=f"caowner_{uid}@test.com", + username=f"caowner_{uid}", + hashed_password="not_a_real_hash", + role="store", + is_active=True, + ) + db.add(owner) + db.flush() + merchant = Merchant( + name=f"CAMerchant_{uid}", + owner_user_id=owner.id, + contact_email=f"cam_{uid}@test.com", + is_active=True, + is_verified=True, + ) + db.add(merchant) + db.flush() + store = Store( + merchant_id=merchant.id, + store_code=f"CA_{uid}".upper(), + subdomain=f"ca{uid}", + name=f"CAStore_{uid}", + is_active=True, + is_verified=True, + ) + db.add(store) + db.flush() + customer = Customer( + store_id=store.id, + email=f"capi_{uid}@example.com", + hashed_password="not_a_real_hash", # noqa: SEC001 + first_name="API", + last_name="Customer", + customer_number=f"CAPI_{uid}", + is_active=True, + ) + db.add(customer) + db.commit() + db.refresh(customer) + return customer + + def test_valid_customer_header(self, db, auth_manager): + """Valid customer token via API header returns CustomerContext.""" + customer = self._create_test_customer(db) + token = self._make_customer_token(auth_manager, customer.id) + creds = _make_credentials(token) + request = _make_request("/api/storefront/account") + request.state.store = None + + result = get_current_customer_api(request, creds, db) + assert result.id == customer.id + + def test_rejects_missing_credentials(self, db): + """Missing credentials raises InvalidTokenException.""" + request = _make_request("/api/storefront/account") + request.state.store = None + + with pytest.raises(InvalidTokenException): + get_current_customer_api(request, None, db) + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetCurrentCustomerOptional: + """Test get_current_customer_optional — returns None instead of raising.""" + + def _make_customer_token(self, auth_manager, customer_id): + from jose import jwt as jose_jwt + + payload = { + "sub": str(customer_id), + "type": "customer", + "store_id": None, + "exp": datetime.now(UTC) + timedelta(hours=1), + } + return jose_jwt.encode(payload, auth_manager.secret_key, algorithm=auth_manager.algorithm) + + def _create_test_customer(self, db): + from app.modules.customers.models.customer import Customer + from app.modules.tenancy.models import Merchant, Store + + uid = uuid.uuid4().hex[:8] + owner = User( + email=f"coowner_{uid}@test.com", + username=f"coowner_{uid}", + hashed_password="not_a_real_hash", + role="store", + is_active=True, + ) + db.add(owner) + db.flush() + merchant = Merchant( + name=f"COMerchant_{uid}", + owner_user_id=owner.id, + contact_email=f"com_{uid}@test.com", + is_active=True, + is_verified=True, + ) + db.add(merchant) + db.flush() + store = Store( + merchant_id=merchant.id, + store_code=f"CO_{uid}".upper(), + subdomain=f"co{uid}", + name=f"COStore_{uid}", + is_active=True, + is_verified=True, + ) + db.add(store) + db.flush() + customer = Customer( + store_id=store.id, + email=f"copt_{uid}@example.com", + hashed_password="not_a_real_hash", # noqa: SEC001 + first_name="Optional", + last_name="Customer", + customer_number=f"COPT_{uid}", + is_active=True, + ) + db.add(customer) + db.commit() + db.refresh(customer) + return customer + + def test_returns_context_for_valid_customer(self, db, auth_manager): + """Valid customer token returns CustomerContext.""" + customer = self._create_test_customer(db) + token = self._make_customer_token(auth_manager, customer.id) + creds = _make_credentials(token) + request = _make_request("/storefront/login") + request.state.store = None + + result = get_current_customer_optional(request, creds, None, db) + assert result is not None + assert result.id == customer.id + + def test_returns_none_without_token(self, db): + """No token returns None.""" + request = _make_request("/storefront/login") + request.state.store = None + result = get_current_customer_optional(request, None, None, db) + assert result is None + + def test_returns_none_for_invalid_token(self, db): + """Invalid token returns None.""" + creds = _make_credentials("bad_customer_token") + request = _make_request("/storefront/login") + request.state.store = None + + result = get_current_customer_optional(request, creds, None, db) + assert result is None + + +# ============================================================================ +# Phase 6: Access Control (Store Permissions) +# ============================================================================ + + +@pytest.mark.unit +@pytest.mark.auth +class TestRequireStorePermission: + """Test require_store_permission factory.""" + + def _make_user_context(self, user, store_id=1, store_code="TEST"): + """Create a UserContext with store context.""" + ctx = UserContext.from_user(user) + ctx.token_store_id = store_id + ctx.token_store_code = store_code + ctx.token_store_role = "owner" + return ctx + + def test_allows_user_with_permission(self, db, auth_manager, test_store_user): + """User with required permission passes.""" + mock_store = MagicMock() + mock_store.id = 1 + mock_store.store_code = "TEST" + + checker = require_store_permission("products.view") + request = _make_request("/store/TEST/products") + user_ctx = self._make_user_context(test_store_user) + + with ( + patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store), + patch.object(User, "has_store_permission", return_value=True), + ): + result = checker(request, db, user_ctx) + assert result.id == test_store_user.id + + def test_rejects_user_without_permission(self, db, auth_manager, test_store_user): + """User without required permission raises InsufficientStorePermissionsException.""" + mock_store = MagicMock() + mock_store.id = 1 + mock_store.store_code = "TEST" + + checker = require_store_permission("products.delete") + request = _make_request("/store/TEST/products") + user_ctx = self._make_user_context(test_store_user) + + with ( + patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store), + patch.object(User, "has_store_permission", return_value=False), + ): + with pytest.raises(InsufficientStorePermissionsException): + checker(request, db, user_ctx) + + def test_rejects_missing_store_context(self, db, auth_manager, test_store_user): + """Token without store_id raises InvalidTokenException.""" + checker = require_store_permission("products.view") + request = _make_request("/store/TEST/products") + user_ctx = UserContext.from_user(test_store_user) + user_ctx.token_store_id = None + + with pytest.raises(InvalidTokenException, match="missing store information"): + checker(request, db, user_ctx) + + +@pytest.mark.unit +@pytest.mark.auth +class TestRequireStoreOwner: + """Test require_store_owner.""" + + def test_allows_store_owner(self, db, auth_manager, test_store_user): + """Store owner passes.""" + mock_store = MagicMock() + mock_store.id = 1 + mock_store.store_code = "TEST" + + request = _make_request("/store/TEST/team") + user_ctx = UserContext.from_user(test_store_user) + user_ctx.token_store_id = 1 + user_ctx.token_store_code = "TEST" + + with ( + patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store), + patch.object(User, "is_owner_of", return_value=True), + ): + result = require_store_owner(request, db, user_ctx) + assert result.id == test_store_user.id + + def test_rejects_non_owner(self, db, auth_manager, test_store_user): + """Non-owner team member rejected.""" + mock_store = MagicMock() + mock_store.id = 1 + mock_store.store_code = "TEST" + + request = _make_request("/store/TEST/team") + user_ctx = UserContext.from_user(test_store_user) + user_ctx.token_store_id = 1 + user_ctx.token_store_code = "TEST" + + with ( + patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store), + patch.object(User, "is_owner_of", return_value=False), + ): + with pytest.raises(StoreOwnerOnlyException): + require_store_owner(request, db, user_ctx) + + def test_rejects_missing_store_context(self, db, auth_manager, test_store_user): + """Token without store_id raises InvalidTokenException.""" + request = _make_request("/store/TEST/team") + user_ctx = UserContext.from_user(test_store_user) + user_ctx.token_store_id = None + + with pytest.raises(InvalidTokenException, match="missing store information"): + require_store_owner(request, db, user_ctx) + + +@pytest.mark.unit +@pytest.mark.auth +class TestRequireAnyStorePermission: + """Test require_any_store_permission factory.""" + + def test_allows_user_with_any_permission(self, db, auth_manager, test_store_user): + """User with at least one of the required permissions passes.""" + mock_store = MagicMock() + mock_store.id = 1 + mock_store.store_code = "TEST" + + checker = require_any_store_permission("dashboard.view", "reports.view") + request = _make_request("/store/TEST/dashboard") + user_ctx = UserContext.from_user(test_store_user) + user_ctx.token_store_id = 1 + + # has_store_permission returns True for first perm, False for second + with ( + patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store), + patch.object(User, "has_store_permission", side_effect=[True, False]), + ): + result = checker(request, db, user_ctx) + assert result.id == test_store_user.id + + def test_rejects_user_with_no_permissions(self, db, auth_manager, test_store_user): + """User with none of the required permissions is rejected.""" + mock_store = MagicMock() + mock_store.id = 1 + mock_store.store_code = "TEST" + + checker = require_any_store_permission("dashboard.view", "reports.view") + request = _make_request("/store/TEST/dashboard") + user_ctx = UserContext.from_user(test_store_user) + user_ctx.token_store_id = 1 + + with ( + patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store), + patch.object(User, "has_store_permission", return_value=False), + ): + with pytest.raises(InsufficientStorePermissionsException): + checker(request, db, user_ctx) + + +@pytest.mark.unit +@pytest.mark.auth +class TestRequireAllStorePermissions: + """Test require_all_store_permissions factory.""" + + def test_allows_user_with_all_permissions(self, db, auth_manager, test_store_user): + """User with all required permissions passes.""" + mock_store = MagicMock() + mock_store.id = 1 + mock_store.store_code = "TEST" + + checker = require_all_store_permissions("products.view", "products.edit") + request = _make_request("/store/TEST/products/bulk") + user_ctx = UserContext.from_user(test_store_user) + user_ctx.token_store_id = 1 + + with ( + patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store), + patch.object(User, "has_store_permission", return_value=True), + ): + result = checker(request, db, user_ctx) + assert result.id == test_store_user.id + + def test_rejects_user_missing_one_permission(self, db, auth_manager, test_store_user): + """User missing any required permission is rejected.""" + mock_store = MagicMock() + mock_store.id = 1 + mock_store.store_code = "TEST" + + checker = require_all_store_permissions("products.view", "products.delete") + request = _make_request("/store/TEST/products/bulk") + user_ctx = UserContext.from_user(test_store_user) + user_ctx.token_store_id = 1 + + # First permission passes, second fails + with ( + patch("app.api.deps.store_service.get_store_by_id", return_value=mock_store), + patch.object(User, "has_store_permission", side_effect=[True, False]), + ): + with pytest.raises(InsufficientStorePermissionsException): + checker(request, db, user_ctx) + + +# ============================================================================ +# Phase 7 remaining: get_user_store +# ============================================================================ + + +@pytest.mark.unit +@pytest.mark.auth +class TestGetUserStore: + """Test get_user_store — verify store ownership/membership.""" + + def test_returns_store_for_owner(self, db, auth_manager): + """Store owner gets access to their store.""" + from app.modules.tenancy.models import Merchant, Store + + user, merchant = _create_merchant_owner(db, auth_manager) + uid = uuid.uuid4().hex[:8].upper() + store = Store( + merchant_id=merchant.id, + store_code=f"GUS_{uid}", + subdomain=f"gus{uid.lower()}", + name=f"GetUserStore_{uid}", + is_active=True, + is_verified=True, + ) + db.add(store) + db.commit() + db.refresh(store) + + user_ctx = UserContext.from_user(user) + result = get_user_store(store.store_code, user_ctx, db) + assert result.id == store.id + + def test_raises_for_nonexistent_store(self, db, auth_manager, test_store_user): + """Non-existent store raises StoreNotFoundException.""" + user_ctx = UserContext.from_user(test_store_user) + + with pytest.raises(StoreNotFoundException): + get_user_store("NONEXISTENT_STORE_XYZ", user_ctx, db) + + def test_raises_for_unauthorized_user(self, db, auth_manager): + """User without access to store is rejected.""" + from app.modules.tenancy.models import Merchant, Store + + # Create store owned by a different user + owner_user, merchant = _create_merchant_owner(db, auth_manager) + uid = uuid.uuid4().hex[:8].upper() + store = Store( + merchant_id=merchant.id, + store_code=f"GUS2_{uid}", + subdomain=f"gus2{uid.lower()}", + name=f"OtherStore_{uid}", + is_active=True, + is_verified=True, + ) + db.add(store) + db.commit() + db.refresh(store) + + # Different user tries to access + uid2 = uuid.uuid4().hex[:8] + other_user = User( + email=f"other_{uid2}@example.com", + username=f"other_{uid2}", + hashed_password=auth_manager.hash_password("testpass123"), + role="store", + is_active=True, + is_email_verified=True, + ) + db.add(other_user) + db.commit() + db.refresh(other_user) + + other_ctx = UserContext.from_user(other_user) + with pytest.raises(UnauthorizedStoreAccessException): + get_user_store(store.store_code, other_ctx, db)