feat: implement complete RBAC access control with tests
Some checks failed
CI / pytest (push) Failing after 45m29s
CI / validate (push) Successful in 24s
CI / dependency-scanning (push) Successful in 28s
CI / docs (push) Has been skipped
CI / deploy (push) Has been skipped
CI / ruff (push) Successful in 9s

Add 4-layer access control stack (subscription → module → menu → permissions):
- P1: Wire requires_permission into menu sidebar filtering
- P2: Expose window.USER_PERMISSIONS for Alpine.js client-side gating
- P3: Add page-level permission guards on store routes
- P4: Role CRUD API endpoints and role editor UI
- P5: Audit trail for all role/permission changes

Includes unit tests (menu permission filtering, role CRUD service) and
integration tests (role API endpoints). All 404 core+tenancy tests pass.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-26 18:26:59 +01:00
parent 962862ccc1
commit cb3bc3c118
29 changed files with 1850 additions and 17 deletions

View File

@@ -0,0 +1,355 @@
# app/modules/tenancy/tests/integration/test_store_team_roles_api.py
"""
Integration tests for store team role CRUD API endpoints.
Tests the role management endpoints at:
/api/v1/store/team/roles
Authentication: Overrides get_current_store_from_cookie_or_header to return
a UserContext with the correct token_store_id. The test user is the merchant
owner, so all permission checks pass (owner bypass).
"""
import uuid
import pytest
from app.api.deps import get_current_store_from_cookie_or_header
from app.modules.tenancy.models import Merchant, Role, Store, StoreUser, User
from main import app
from models.schema.auth import UserContext
# ============================================================================
# Fixtures
# ============================================================================
BASE = "/api/v1/store/team"
@pytest.fixture
def role_owner(db):
"""Create a store owner user for role tests."""
from middleware.auth import AuthManager
auth = AuthManager()
uid = uuid.uuid4().hex[:8]
user = User(
email=f"roleowner_{uid}@test.com",
username=f"roleowner_{uid}",
hashed_password=auth.hash_password("rolepass123"),
role="merchant_owner",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture
def role_merchant(db, role_owner):
"""Create a merchant owned by role_owner."""
merchant = Merchant(
name="Role Test Merchant",
owner_user_id=role_owner.id,
contact_email=role_owner.email,
is_active=True,
is_verified=True,
)
db.add(merchant)
db.commit()
db.refresh(merchant)
return merchant
@pytest.fixture
def role_store(db, role_merchant):
"""Create a store for role tests."""
uid = uuid.uuid4().hex[:8]
store = Store(
merchant_id=role_merchant.id,
store_code=f"ROLETEST_{uid.upper()}",
subdomain=f"roletest{uid}",
name=f"Role Test Store {uid}",
is_active=True,
is_verified=True,
)
db.add(store)
db.commit()
db.refresh(store)
return store
@pytest.fixture
def role_store_user(db, role_store, role_owner):
"""Create a StoreUser association for the owner."""
store_user = StoreUser(
store_id=role_store.id,
user_id=role_owner.id,
is_active=True,
)
db.add(store_user)
db.commit()
db.refresh(store_user)
return store_user
@pytest.fixture
def role_auth(role_owner, role_store, role_store_user):
"""Override auth dependency to simulate authenticated store owner.
Overrides get_current_store_from_cookie_or_header so that both
require_store_owner and require_store_permission(...) inner functions
receive the correct UserContext. The owner bypass ensures all
permission checks pass.
"""
user_context = UserContext(
id=role_owner.id,
email=role_owner.email,
username=role_owner.username,
role="merchant_owner",
is_active=True,
token_store_id=role_store.id,
)
def _override():
return user_context
app.dependency_overrides[get_current_store_from_cookie_or_header] = _override
yield {"Authorization": "Bearer fake-token"}
app.dependency_overrides.pop(get_current_store_from_cookie_or_header, None)
@pytest.fixture
def existing_custom_role(db, role_store):
"""Create an existing custom role for update/delete tests."""
role = Role(
store_id=role_store.id,
name="test_custom_role",
permissions=["products.view", "orders.view"],
)
db.add(role)
db.commit()
db.refresh(role)
return role
# ============================================================================
# GET /team/roles
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestListRoles:
"""Tests for GET /api/v1/store/team/roles."""
def test_list_roles_creates_defaults(self, client, role_auth, role_store):
"""GET /roles returns default preset roles when none exist."""
response = client.get(f"{BASE}/roles", headers=role_auth)
assert response.status_code == 200
data = response.json()
assert "roles" in data
assert data["total"] >= 5
role_names = {r["name"] for r in data["roles"]}
assert "manager" in role_names
assert "staff" in role_names
assert "support" in role_names
assert "viewer" in role_names
assert "marketing" in role_names
def test_list_roles_includes_custom(self, client, role_auth, existing_custom_role):
"""GET /roles includes custom roles alongside presets."""
response = client.get(f"{BASE}/roles", headers=role_auth)
assert response.status_code == 200
data = response.json()
role_names = {r["name"] for r in data["roles"]}
assert "test_custom_role" in role_names
def test_list_roles_response_shape(self, client, role_auth, existing_custom_role):
"""Each role in the response has expected fields."""
response = client.get(f"{BASE}/roles", headers=role_auth)
assert response.status_code == 200
role = response.json()["roles"][0]
assert "id" in role
assert "name" in role
assert "permissions" in role
assert isinstance(role["permissions"], list)
# ============================================================================
# POST /team/roles
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestCreateRole:
"""Tests for POST /api/v1/store/team/roles."""
def test_create_custom_role_success(self, client, role_auth, role_store):
"""POST /roles creates a new custom role."""
response = client.post(
f"{BASE}/roles",
headers=role_auth,
json={
"name": "api_test_role",
"permissions": ["products.view", "orders.view"],
},
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "api_test_role"
assert "products.view" in data["permissions"]
assert "orders.view" in data["permissions"]
def test_create_role_preset_name_rejected(self, client, role_auth):
"""POST /roles rejects preset role names."""
response = client.post(
f"{BASE}/roles",
headers=role_auth,
json={
"name": "manager",
"permissions": ["products.view"],
},
)
assert response.status_code == 422
assert "preset" in response.json()["message"].lower()
def test_create_role_duplicate_name_rejected(
self, client, role_auth, existing_custom_role
):
"""POST /roles rejects duplicate role names."""
response = client.post(
f"{BASE}/roles",
headers=role_auth,
json={
"name": "test_custom_role",
"permissions": ["products.view"],
},
)
assert response.status_code == 422
assert "already exists" in response.json()["message"].lower()
def test_create_role_invalid_permissions(self, client, role_auth):
"""POST /roles rejects invalid permission IDs."""
response = client.post(
f"{BASE}/roles",
headers=role_auth,
json={
"name": "bad_perms_role",
"permissions": ["totally.fake.permission"],
},
)
assert response.status_code == 422
assert "invalid" in response.json()["message"].lower()
# ============================================================================
# PUT /team/roles/{role_id}
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestUpdateRole:
"""Tests for PUT /api/v1/store/team/roles/{role_id}."""
def test_update_role_name(self, client, role_auth, existing_custom_role):
"""PUT /roles/{id} updates the role name."""
response = client.put(
f"{BASE}/roles/{existing_custom_role.id}",
headers=role_auth,
json={
"name": "renamed_role",
"permissions": existing_custom_role.permissions,
},
)
assert response.status_code == 200
assert response.json()["name"] == "renamed_role"
def test_update_role_permissions(self, client, role_auth, existing_custom_role):
"""PUT /roles/{id} updates permissions."""
response = client.put(
f"{BASE}/roles/{existing_custom_role.id}",
headers=role_auth,
json={
"name": existing_custom_role.name,
"permissions": ["products.view", "products.edit", "orders.view", "orders.edit"],
},
)
assert response.status_code == 200
data = response.json()
assert "products.edit" in data["permissions"]
assert "orders.edit" in data["permissions"]
def test_update_nonexistent_role(self, client, role_auth):
"""PUT /roles/{id} returns 400 for non-existent role."""
response = client.put(
f"{BASE}/roles/99999",
headers=role_auth,
json={"name": "whatever", "permissions": []},
)
assert response.status_code == 422
def test_update_role_rename_to_preset_rejected(
self, client, role_auth, existing_custom_role
):
"""PUT /roles/{id} rejects renaming to a preset name."""
response = client.put(
f"{BASE}/roles/{existing_custom_role.id}",
headers=role_auth,
json={"name": "staff", "permissions": existing_custom_role.permissions},
)
assert response.status_code == 422
assert "preset" in response.json()["message"].lower()
# ============================================================================
# DELETE /team/roles/{role_id}
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestDeleteRole:
"""Tests for DELETE /api/v1/store/team/roles/{role_id}."""
def test_delete_custom_role_success(self, client, role_auth, existing_custom_role, db):
"""DELETE /roles/{id} removes a custom role."""
response = client.delete(
f"{BASE}/roles/{existing_custom_role.id}",
headers=role_auth,
)
assert response.status_code == 204
# Verify role is deleted
db.expire_all()
deleted = db.query(Role).filter(Role.id == existing_custom_role.id).first()
assert deleted is None
def test_delete_nonexistent_role(self, client, role_auth):
"""DELETE /roles/{id} returns 400 for non-existent role."""
response = client.delete(
f"{BASE}/roles/99999",
headers=role_auth,
)
assert response.status_code == 422
def test_delete_preset_role_rejected(self, client, role_auth, role_store, db):
"""DELETE /roles/{id} rejects deleting preset roles."""
preset_role = Role(
store_id=role_store.id,
name="staff",
permissions=["orders.view"],
)
db.add(preset_role)
db.commit()
db.refresh(preset_role)
response = client.delete(
f"{BASE}/roles/{preset_role.id}",
headers=role_auth,
)
assert response.status_code == 422
assert "preset" in response.json()["message"].lower()

View File

@@ -9,6 +9,7 @@ import pytest
from app.modules.tenancy.exceptions import (
CannotRemoveOwnerException,
InvalidInvitationTokenException,
InvalidRoleException,
UserNotFoundException,
)
from app.modules.tenancy.models import Role, Store, StoreUser, User
@@ -417,3 +418,313 @@ class TestStoreTeamServiceGetRoles:
for role in roles:
assert "permissions" in role
assert isinstance(role["permissions"], list)
# =============================================================================
# CUSTOM ROLE CRUD TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestStoreTeamServiceCreateCustomRole:
"""Test suite for creating custom roles."""
def test_create_custom_role_success(self, db, team_store, test_user):
"""Test creating a custom role with valid permissions."""
role = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="content_editor",
permissions=["products.view", "products.edit"],
actor_user_id=test_user.id,
)
db.commit()
assert role is not None
assert role.name == "content_editor"
assert role.store_id == team_store.id
assert "products.view" in role.permissions
assert "products.edit" in role.permissions
def test_create_role_with_preset_name_raises_error(self, db, team_store, test_user):
"""Test creating a role with a preset name raises ValueError."""
with pytest.raises(InvalidRoleException, match="preset name"):
store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="manager",
permissions=["products.view"],
actor_user_id=test_user.id,
)
def test_create_role_with_preset_name_case_insensitive(self, db, team_store, test_user):
"""Test preset name check is case-insensitive."""
with pytest.raises(InvalidRoleException, match="preset name"):
store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="Manager",
permissions=["products.view"],
actor_user_id=test_user.id,
)
def test_create_duplicate_role_name_raises_error(self, db, team_store, test_user):
"""Test creating a role with duplicate name raises ValueError."""
store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="editor",
permissions=["products.view"],
actor_user_id=test_user.id,
)
db.flush()
with pytest.raises(InvalidRoleException, match="already exists"):
store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="editor",
permissions=["orders.view"],
actor_user_id=test_user.id,
)
def test_create_role_with_invalid_permissions_raises_error(self, db, team_store, test_user):
"""Test creating a role with invalid permission IDs raises ValueError."""
with pytest.raises(InvalidRoleException, match="Invalid permission IDs"):
store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="invalid_role",
permissions=["completely.fake.permission"],
actor_user_id=test_user.id,
)
def test_create_role_with_empty_permissions(self, db, team_store, test_user):
"""Test creating a role with empty permissions list."""
role = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="observer",
permissions=[],
actor_user_id=test_user.id,
)
db.commit()
assert role is not None
assert role.name == "observer"
assert role.permissions == []
@pytest.mark.unit
@pytest.mark.tenancy
class TestStoreTeamServiceUpdateCustomRole:
"""Test suite for updating roles."""
def test_update_role_name(self, db, team_store, test_user):
"""Test updating a role's name."""
role = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="old_name",
permissions=["products.view"],
actor_user_id=test_user.id,
)
db.flush()
updated = store_team_service.update_role(
db=db,
store_id=team_store.id,
role_id=role.id,
name="new_name",
actor_user_id=test_user.id,
)
db.commit()
assert updated.name == "new_name"
def test_update_role_permissions(self, db, team_store, test_user):
"""Test updating a role's permissions."""
role = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="perm_test",
permissions=["products.view"],
actor_user_id=test_user.id,
)
db.flush()
updated = store_team_service.update_role(
db=db,
store_id=team_store.id,
role_id=role.id,
permissions=["products.view", "products.edit", "orders.view"],
actor_user_id=test_user.id,
)
db.commit()
assert "products.edit" in updated.permissions
assert "orders.view" in updated.permissions
def test_update_role_not_found_raises_error(self, db, team_store):
"""Test updating a non-existent role raises ValueError."""
with pytest.raises(InvalidRoleException, match="not found"):
store_team_service.update_role(
db=db,
store_id=team_store.id,
role_id=99999,
name="new_name",
)
def test_update_role_rename_to_preset_raises_error(self, db, team_store, test_user):
"""Test renaming to a preset name raises ValueError."""
role = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="custom_role",
permissions=["products.view"],
actor_user_id=test_user.id,
)
db.flush()
with pytest.raises(InvalidRoleException, match="preset name"):
store_team_service.update_role(
db=db,
store_id=team_store.id,
role_id=role.id,
name="staff",
)
def test_update_role_with_invalid_permissions(self, db, team_store, test_user):
"""Test updating with invalid permissions raises ValueError."""
role = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="to_update",
permissions=["products.view"],
actor_user_id=test_user.id,
)
db.flush()
with pytest.raises(InvalidRoleException, match="Invalid permission IDs"):
store_team_service.update_role(
db=db,
store_id=team_store.id,
role_id=role.id,
permissions=["nonexistent.perm"],
)
def test_update_role_duplicate_name_raises_error(self, db, team_store, test_user):
"""Test renaming to an existing role name raises ValueError."""
store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="role_a",
permissions=["products.view"],
actor_user_id=test_user.id,
)
role_b = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="role_b",
permissions=["orders.view"],
actor_user_id=test_user.id,
)
db.flush()
with pytest.raises(InvalidRoleException, match="already exists"):
store_team_service.update_role(
db=db,
store_id=team_store.id,
role_id=role_b.id,
name="role_a",
)
@pytest.mark.unit
@pytest.mark.tenancy
class TestStoreTeamServiceDeleteRole:
"""Test suite for deleting roles."""
def test_delete_custom_role_success(self, db, team_store, test_user):
"""Test deleting a custom role."""
role = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="to_delete",
permissions=["products.view"],
actor_user_id=test_user.id,
)
db.flush()
role_id = role.id
store_team_service.delete_role(
db=db,
store_id=team_store.id,
role_id=role_id,
actor_user_id=test_user.id,
)
db.commit()
# Verify role is deleted
deleted = db.query(Role).filter(Role.id == role_id).first()
assert deleted is None
def test_delete_preset_role_raises_error(self, db, team_store):
"""Test deleting a preset role raises ValueError."""
# Ensure default roles exist
store_team_service.get_store_roles(db, team_store.id)
db.flush()
# Find the "staff" preset role
staff_role = (
db.query(Role)
.filter(Role.store_id == team_store.id, Role.name == "staff")
.first()
)
assert staff_role is not None
with pytest.raises(InvalidRoleException, match="preset role"):
store_team_service.delete_role(
db=db,
store_id=team_store.id,
role_id=staff_role.id,
)
def test_delete_role_not_found_raises_error(self, db, team_store):
"""Test deleting non-existent role raises ValueError."""
with pytest.raises(InvalidRoleException, match="not found"):
store_team_service.delete_role(
db=db,
store_id=team_store.id,
role_id=99999,
)
def test_delete_role_with_members_raises_error(self, db, team_store, test_user, other_user, auth_manager):
"""Test deleting a role that has assigned members raises ValueError."""
role = store_team_service.create_custom_role(
db=db,
store_id=team_store.id,
name="in_use_role",
permissions=["products.view"],
actor_user_id=test_user.id,
)
db.flush()
# Assign the role to a store user
store_user = StoreUser(
store_id=team_store.id,
user_id=other_user.id,
role_id=role.id,
is_active=True,
invitation_accepted_at=datetime.utcnow(),
)
db.add(store_user)
db.flush()
with pytest.raises(InvalidRoleException, match="team member"):
store_team_service.delete_role(
db=db,
store_id=team_store.id,
role_id=role.id,
)