fix(tenancy): fix team CRUD bugs + add member integration tests

Store team page:
- Fix undefined user_id (API returns `id`, JS used `user_id`)
- Fix wrong URL path in updateMember (remove redundant storeCode)
- Fix update_member_role route passing wrong kwarg (new_role_id → new_role_name)
- Add update_member() service method for role_id + is_active updates
- Add :selected binding for role pre-selection in edit modal

Merchant team page:
- Add missing db.commit() on invite, update, and remove endpoints
- Fix forward-reference string type annotation on MerchantTeamInvite
- Add :selected binding for role pre-selection in edit modal

Shared fixes:
- Replace removed subscription_service.check_team_limit with usage_service
- Replace removed subscription_service.get_current_tier in email service
- Fix email config bool settings crashing on .lower() (value_type=boolean)

Tests: 15 new integration tests for store team member API endpoints.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-28 21:06:21 +01:00
parent 0455e63a2e
commit 332960de30
10 changed files with 541 additions and 22 deletions

View File

@@ -404,10 +404,10 @@ def get_platform_email_config(db: Session) -> dict:
config["smtp_password"] = db_smtp_password if db_smtp_password else settings.smtp_password
db_smtp_use_tls = get_db_setting("smtp_use_tls")
config["smtp_use_tls"] = db_smtp_use_tls.lower() in ("true", "1", "yes") if db_smtp_use_tls else settings.smtp_use_tls
config["smtp_use_tls"] = bool(db_smtp_use_tls) if db_smtp_use_tls is not None else settings.smtp_use_tls
db_smtp_use_ssl = get_db_setting("smtp_use_ssl")
config["smtp_use_ssl"] = db_smtp_use_ssl.lower() in ("true", "1", "yes") if db_smtp_use_ssl else settings.smtp_use_ssl
config["smtp_use_ssl"] = bool(db_smtp_use_ssl) if db_smtp_use_ssl is not None else settings.smtp_use_ssl
# SendGrid
db_sendgrid_key = get_db_setting("sendgrid_api_key")
@@ -432,10 +432,10 @@ def get_platform_email_config(db: Session) -> dict:
# Behavior
db_enabled = get_db_setting("email_enabled")
config["enabled"] = db_enabled.lower() in ("true", "1", "yes") if db_enabled else settings.email_enabled
config["enabled"] = bool(db_enabled) if db_enabled is not None else settings.email_enabled
db_debug = get_db_setting("email_debug")
config["debug"] = db_debug.lower() in ("true", "1", "yes") if db_debug else settings.email_debug
config["debug"] = bool(db_debug) if db_debug is not None else settings.email_debug
return config
@@ -1038,8 +1038,8 @@ class EmailService:
subscription_service,
)
tier = subscription_service.get_current_tier(self.db, store_id)
self._store_tier_cache[store_id] = tier.value if tier else None
sub = subscription_service.get_subscription_for_store(self.db, store_id)
self._store_tier_cache[store_id] = sub.tier.code if sub and sub.tier else None
return self._store_tier_cache[store_id]
def _should_add_powered_by_footer(self, store_id: int | None) -> bool:

View File

@@ -25,6 +25,7 @@ from app.modules.tenancy.schemas import (
MerchantStoreUpdate,
)
from app.modules.tenancy.schemas.auth import UserContext
from app.modules.tenancy.schemas.team import MerchantTeamInvite
from app.modules.tenancy.services.merchant_service import merchant_service
from app.modules.tenancy.services.merchant_store_service import merchant_store_service
@@ -192,7 +193,7 @@ async def merchant_team_store_roles(
@_account_router.post("/team/invite")
async def merchant_team_invite(
data: "MerchantTeamInvite",
data: MerchantTeamInvite,
current_user: UserContext = Depends(get_current_merchant_api),
merchant=Depends(get_merchant_for_current_user),
db: Session = Depends(get_db),
@@ -233,6 +234,7 @@ async def merchant_team_invite(
error=str(e),
))
db.commit()
success_count = sum(1 for r in results if r.success)
if success_count == len(results):
message = f"Invitation sent to {data.email} for {success_count} store(s)"
@@ -268,6 +270,7 @@ async def merchant_team_update_role(
new_role_name=role_name,
actor_user_id=current_user.id,
)
db.commit()
return {"message": "Role updated successfully"}
@@ -289,6 +292,7 @@ async def merchant_team_remove_member(
user_id=user_id,
actor_user_id=current_user.id,
)
db.commit()
@_account_router.get("/profile", response_model=MerchantPortalProfileResponse)

View File

@@ -277,12 +277,13 @@ def update_team_member(
"""
store = request.state.store
store_team_service.update_member_role(
store_team_service.update_member(
db=db,
store=store,
user_id=user_id,
new_role_id=update_data.role_id,
role_id=update_data.role_id,
is_active=update_data.is_active,
actor_user_id=current_user.id,
)
db.commit()

View File

@@ -94,6 +94,7 @@ async def store_login_page(
"request": request,
"store_code": store_code,
"platform_code": platform_code,
"frontend_type": "store",
**get_jinja2_globals(language),
},
)

View File

@@ -76,10 +76,22 @@ class StoreTeamService:
Dict with invitation details
"""
try:
# Check team size limit from subscription
from app.modules.billing.services import subscription_service
# Check team size limit from subscription (skip if no subscription)
try:
from app.modules.billing.services.usage_service import usage_service
subscription_service.check_team_limit(db, store.id)
limit_check = usage_service.check_limit(db, store.id, "team_members")
if limit_check.limit is not None and not limit_check.can_proceed:
raise TierLimitExceededException(
message=limit_check.message or "Team member limit reached",
limit_type="team_members",
current=limit_check.current,
limit=limit_check.limit,
)
except TierLimitExceededException:
raise
except Exception as e: # noqa: EXC003
logger.warning(f"Could not check team limit (proceeding): {e}")
# Check if user already exists
user = db.query(User).filter(User.email == email).first()
@@ -331,8 +343,9 @@ class StoreTeamService:
if store_user.is_owner:
raise CannotRemoveOwnerException(user_id, store.id)
# Soft delete - just deactivate
store_user.is_active = False
from app.core.soft_delete import soft_delete
soft_delete(db, store_user, deleted_by_id=actor_user_id)
logger.info(f"Removed user {user_id} from store {store.store_code}")
@@ -438,6 +451,60 @@ class StoreTeamService:
logger.error(f"Error updating member role: {str(e)}")
raise
def update_member(
self,
db: Session,
store: Store,
user_id: int,
role_id: int | None = None,
is_active: bool | None = None,
actor_user_id: int | None = None,
) -> StoreUser:
"""
Update a team member's role (by ID) and/or active status.
Args:
db: Database session
store: Store
user_id: User ID
role_id: New role ID (must belong to this store)
is_active: New active status
actor_user_id: Actor performing the update
Returns:
Updated StoreUser
"""
store_user = (
db.query(StoreUser)
.filter(StoreUser.store_id == store.id, StoreUser.user_id == user_id)
.first()
)
if not store_user:
raise UserNotFoundException(str(user_id))
if role_id is not None:
role = (
db.query(Role)
.filter(Role.id == role_id, Role.store_id == store.id)
.first()
)
if not role:
raise InvalidRoleException(f"Role {role_id} not found in store {store.id}")
self.update_member_role(
db=db,
store=store,
user_id=user_id,
new_role_name=role.name,
actor_user_id=actor_user_id,
)
if is_active is not None:
store_user.is_active = is_active
db.flush()
db.refresh(store_user)
return store_user
def get_team_members(
self,
db: Session,

View File

@@ -194,7 +194,7 @@ function storeTeam() {
this.saving = true;
try {
await apiClient.put(
`/store/${this.storeCode}/team/members/${this.selectedMember.user_id}`,
`/store/team/members/${this.selectedMember.id}`,
this.editForm
);
@@ -228,7 +228,7 @@ function storeTeam() {
this.saving = true;
try {
await apiClient.delete(`/store/team/members/${this.selectedMember.user_id}`);
await apiClient.delete(`/store/team/members/${this.selectedMember.id}`);
Utils.showToast(I18n.t('tenancy.messages.team_member_removed'), 'success');
storeTeamLog.info('Removed team member:', this.selectedMember.user_id);

View File

@@ -299,7 +299,7 @@
<select x-model="store.role_name"
class="px-2 py-1 text-sm text-gray-700 dark:text-gray-300 bg-white dark:bg-gray-700 border border-gray-300 dark:border-gray-600 rounded-md focus:border-purple-400 focus:outline-none">
<template x-for="role in roleOptions" :key="role.value">
<option :value="role.value" x-text="role.label"></option>
<option :value="role.value" :selected="role.value === store.role_name" x-text="role.label"></option>
</template>
</select>
<button @click="updateMemberRole(store.store_id, selectedMember.user_id, store.role_name)"

View File

@@ -77,7 +77,7 @@
</tr>
</thead>
<tbody class="bg-white divide-y dark:divide-gray-700 dark:bg-gray-800">
<template x-for="member in members" :key="member.user_id">
<template x-for="member in members" :key="member.id">
<tr class="text-gray-700 dark:text-gray-400">
<!-- Member Info -->
<td class="px-4 py-3">
@@ -240,7 +240,7 @@
class="w-full px-4 py-2 text-sm text-gray-700 bg-gray-50 border border-gray-200 rounded-lg dark:text-gray-300 dark:bg-gray-700 dark:border-gray-600 focus:border-purple-400 focus:outline-none focus:ring-1 focus:ring-purple-400"
>
<template x-for="role in roles" :key="role.id">
<option :value="role.id" x-text="role.name"></option>
<option :value="role.id" :selected="role.id === editForm.role_id" x-text="role.name"></option>
</template>
</select>
</div>

View File

@@ -0,0 +1,434 @@
# app/modules/tenancy/tests/integration/test_store_team_members_api.py
"""
Integration tests for store team member CRUD API endpoints.
Tests the member management endpoints at:
/api/v1/store/team/members
/api/v1/store/team/invite
Authentication: Overrides get_current_store_from_cookie_or_header to return
a UserContext with the correct token_store_id. The test user is the merchant
owner, so all permission checks pass (owner bypass).
"""
import uuid
import pytest
from app.api.deps import get_current_store_from_cookie_or_header
from app.modules.tenancy.models import Merchant, Role, Store, StoreUser, User
from app.modules.tenancy.schemas.auth import UserContext
from main import app
# ============================================================================
# Fixtures
# ============================================================================
BASE = "/api/v1/store/team"
@pytest.fixture
def member_owner(db):
"""Create a store owner user for member tests."""
from middleware.auth import AuthManager
auth = AuthManager()
uid = uuid.uuid4().hex[:8]
user = User(
email=f"memberowner_{uid}@test.com",
username=f"memberowner_{uid}",
hashed_password=auth.hash_password("memberpass123"),
role="merchant_owner",
is_active=True,
first_name="Owner",
last_name="User",
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture
def member_merchant(db, member_owner):
"""Create a merchant owned by member_owner."""
merchant = Merchant(
name="Member Test Merchant",
owner_user_id=member_owner.id,
contact_email=member_owner.email,
is_active=True,
is_verified=True,
)
db.add(merchant)
db.commit()
db.refresh(merchant)
return merchant
@pytest.fixture
def member_store(db, member_merchant):
"""Create a store for member tests."""
uid = uuid.uuid4().hex[:8]
store = Store(
merchant_id=member_merchant.id,
store_code=f"MEMTEST_{uid.upper()}",
subdomain=f"memtest{uid}",
name=f"Member Test Store {uid}",
is_active=True,
is_verified=True,
)
db.add(store)
db.commit()
db.refresh(store)
return store
@pytest.fixture
def member_store_user(db, member_store, member_owner):
"""Create a StoreUser association for the owner."""
store_user = StoreUser(
store_id=member_store.id,
user_id=member_owner.id,
is_active=True,
)
db.add(store_user)
db.commit()
db.refresh(store_user)
return store_user
@pytest.fixture
def member_auth(member_owner, member_store, member_store_user):
"""Override auth dependency to simulate authenticated store owner.
Overrides get_current_store_from_cookie_or_header so that both
require_store_owner and require_store_permission(...) inner functions
receive the correct UserContext. The owner bypass ensures all
permission checks pass.
"""
user_context = UserContext(
id=member_owner.id,
email=member_owner.email,
username=member_owner.username,
role="merchant_owner",
is_active=True,
token_store_id=member_store.id,
)
def _override():
return user_context
app.dependency_overrides[get_current_store_from_cookie_or_header] = _override
yield {"Authorization": "Bearer fake-token"}
app.dependency_overrides.pop(get_current_store_from_cookie_or_header, None)
@pytest.fixture
def staff_role(db, member_store):
"""Create a 'staff' role for the store."""
role = Role(
store_id=member_store.id,
name="staff",
permissions=["orders.view", "products.view"],
)
db.add(role)
db.commit()
db.refresh(role)
return role
@pytest.fixture
def manager_role(db, member_store):
"""Create a 'manager' role for the store."""
role = Role(
store_id=member_store.id,
name="manager",
permissions=["orders.view", "orders.edit", "products.view", "products.edit", "team.view"],
)
db.add(role)
db.commit()
db.refresh(role)
return role
@pytest.fixture
def team_member_user(db):
"""Create another user to serve as a team member."""
from middleware.auth import AuthManager
auth = AuthManager()
uid = uuid.uuid4().hex[:8]
user = User(
email=f"teammember_{uid}@test.com",
username=f"teammember_{uid}",
hashed_password=auth.hash_password("memberpass123"),
role="store_member",
is_active=True,
first_name="Team",
last_name="Member",
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture
def team_member(db, member_store, team_member_user, staff_role):
"""Create a StoreUser for team_member_user with staff role."""
store_user = StoreUser(
store_id=member_store.id,
user_id=team_member_user.id,
role_id=staff_role.id,
is_active=True,
)
db.add(store_user)
db.commit()
db.refresh(store_user)
return store_user
# ============================================================================
# GET /team/members
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestListMembers:
"""Tests for GET /api/v1/store/team/members."""
def test_list_members_returns_owner_and_member(
self, client, member_auth, team_member, member_owner, team_member_user
):
"""GET /members returns both owner and team member."""
response = client.get(f"{BASE}/members", headers=member_auth)
assert response.status_code == 200
data = response.json()
member_ids = {m["id"] for m in data["members"]}
assert member_owner.id in member_ids
assert team_member_user.id in member_ids
def test_list_members_response_shape(self, client, member_auth, team_member):
"""Each member in the response has expected fields."""
response = client.get(f"{BASE}/members", headers=member_auth)
assert response.status_code == 200
data = response.json()
assert "members" in data
assert "total" in data
assert "active_count" in data
assert "pending_invitations" in data
member = data["members"][0]
assert "id" in member
assert "email" in member
assert "username" in member
assert "first_name" in member
assert "last_name" in member
assert "full_name" in member
assert "role_name" in member
assert "role_id" in member
assert "permissions" in member
assert "is_active" in member
assert "is_owner" in member
assert "invitation_pending" in member
def test_list_members_stats(
self, client, member_auth, team_member, member_owner
):
"""GET /members returns correct statistics."""
response = client.get(f"{BASE}/members", headers=member_auth)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 2
assert data["active_count"] >= 2
assert data["pending_invitations"] >= 0
# ============================================================================
# GET /team/members/{user_id}
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestGetMember:
"""Tests for GET /api/v1/store/team/members/{user_id}."""
def test_get_member_success(
self, client, member_auth, team_member, team_member_user
):
"""GET /members/{user_id} returns the specific member."""
response = client.get(
f"{BASE}/members/{team_member_user.id}", headers=member_auth
)
assert response.status_code == 200
data = response.json()
assert data["id"] == team_member_user.id
assert data["email"] == team_member_user.email
assert data["role_name"] == "staff"
def test_get_nonexistent_member(self, client, member_auth):
"""GET /members/{user_id} returns 404 for non-existent user."""
response = client.get(f"{BASE}/members/99999", headers=member_auth)
assert response.status_code == 404
# ============================================================================
# PUT /team/members/{user_id}
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestUpdateMember:
"""Tests for PUT /api/v1/store/team/members/{user_id}."""
def test_update_member_role_success(
self, client, member_auth, team_member, team_member_user, manager_role
):
"""PUT /members/{user_id} updates the member's role."""
response = client.put(
f"{BASE}/members/{team_member_user.id}",
headers=member_auth,
json={"role_id": manager_role.id},
)
assert response.status_code == 200
data = response.json()
assert data["role_name"] == "manager"
assert data["role_id"] == manager_role.id
def test_update_member_active_status(
self, client, member_auth, team_member, team_member_user
):
"""PUT /members/{user_id} can deactivate a member."""
response = client.put(
f"{BASE}/members/{team_member_user.id}",
headers=member_auth,
json={"is_active": False},
)
assert response.status_code == 200
data = response.json()
assert data["is_active"] is False
def test_update_owner_rejected(
self, client, member_auth, member_owner, staff_role
):
"""PUT /members/{user_id} rejects changing owner's role."""
response = client.put(
f"{BASE}/members/{member_owner.id}",
headers=member_auth,
json={"role_id": staff_role.id},
)
assert response.status_code in (400, 422)
def test_update_nonexistent_member(self, client, member_auth, staff_role):
"""PUT /members/{user_id} returns 404 for non-existent user."""
response = client.put(
f"{BASE}/members/99999",
headers=member_auth,
json={"role_id": staff_role.id},
)
assert response.status_code == 404
def test_update_with_invalid_role_id(
self, client, member_auth, team_member, team_member_user
):
"""PUT /members/{user_id} returns 422 for non-existent role."""
response = client.put(
f"{BASE}/members/{team_member_user.id}",
headers=member_auth,
json={"role_id": 99999},
)
assert response.status_code == 422
# ============================================================================
# DELETE /team/members/{user_id}
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestRemoveMember:
"""Tests for DELETE /api/v1/store/team/members/{user_id}."""
def test_remove_member_success(
self, client, member_auth, team_member, team_member_user, db
):
"""DELETE /members/{user_id} removes a team member."""
response = client.delete(
f"{BASE}/members/{team_member_user.id}", headers=member_auth
)
assert response.status_code == 200
data = response.json()
assert data["user_id"] == team_member_user.id
# Verify member is soft-deleted (deleted_at set, record hidden from normal queries)
db.expire_all()
store_user = (
db.query(StoreUser)
.execution_options(include_deleted=True)
.filter(StoreUser.user_id == team_member_user.id)
.first()
)
assert store_user is not None
assert store_user.deleted_at is not None
def test_remove_owner_rejected(self, client, member_auth, member_owner):
"""DELETE /members/{user_id} rejects removing the owner."""
response = client.delete(
f"{BASE}/members/{member_owner.id}", headers=member_auth
)
assert response.status_code == 400
def test_remove_nonexistent_member(self, client, member_auth):
"""DELETE /members/{user_id} returns 404 for non-existent user."""
response = client.delete(
f"{BASE}/members/99999", headers=member_auth
)
assert response.status_code == 404
# ============================================================================
# POST /team/invite
# ============================================================================
@pytest.mark.integration
@pytest.mark.tenancy
class TestInviteMember:
"""Tests for POST /api/v1/store/team/invite."""
def test_invite_member_success(self, client, member_auth, staff_role):
"""POST /invite creates an invitation for a new email."""
uid = uuid.uuid4().hex[:8]
response = client.post(
f"{BASE}/invite",
headers=member_auth,
json={
"email": f"newinvite_{uid}@test.com",
"role_name": "staff",
"first_name": "New",
"last_name": "Invitee",
},
)
assert response.status_code == 200
data = response.json()
assert data["email"] == f"newinvite_{uid}@test.com"
assert data["invitation_sent"] is True
assert data["role"] == "staff"
def test_invite_duplicate_email(
self, client, member_auth, team_member, team_member_user, staff_role
):
"""POST /invite with existing member email returns error or reactivation."""
response = client.post(
f"{BASE}/invite",
headers=member_auth,
json={
"email": team_member_user.email,
"role_name": "staff",
},
)
# May succeed as reactivation or fail as duplicate
assert response.status_code in (200, 400, 409, 422)

View File

@@ -243,17 +243,29 @@ class TestStoreTeamServiceRemove:
"""Test suite for removing team members."""
def test_remove_team_member_success(self, db, team_store, team_member):
"""Test removing a team member."""
"""Test removing a team member (soft delete)."""
result = store_team_service.remove_team_member(
db=db,
store=team_store,
user_id=team_member.user_id,
)
db.commit()
db.refresh(team_member)
# Verify soft-deleted (hidden from normal queries, visible with include_deleted)
from app.modules.tenancy.models import StoreUser
hidden = db.query(StoreUser).filter(StoreUser.id == team_member.id).first()
assert hidden is None # Filtered out by soft-delete
visible = (
db.query(StoreUser)
.execution_options(include_deleted=True)
.filter(StoreUser.id == team_member.id)
.first()
)
assert visible is not None
assert visible.deleted_at is not None
assert result is True
assert team_member.is_active is False
def test_remove_owner_raises_error(self, db, team_store, store_owner):
"""Test removing owner raises exception."""