Files
orion/app/modules/tenancy/services/store_team_service.py
Samir Boulahtit 823935c016
Some checks failed
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled
CI / ruff (push) Successful in 14s
feat(tenancy): add resend invitation for pending team members
New resend_invitation() service method regenerates the token and
resends the invitation email for pending members.

Available on all frontends:
- Merchant: POST /merchants/account/team/stores/{sid}/members/{uid}/resend
- Store: POST /store/team/members/{uid}/resend

UI: paper-airplane icon appears on pending members in both merchant
and store team pages.

i18n: resend_invitation + invitation_resent keys in 4 locales.
Also translated previously untranslated invitation_sent_successfully
in fr/de/lb.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 16:48:12 +02:00

1006 lines
31 KiB
Python

# app/modules/tenancy/services/store_team_service.py
"""
Store team management service.
Handles:
- Team member invitations
- Invitation acceptance
- Role assignment
- Permission management
"""
import logging
import secrets
from datetime import datetime, timedelta
from typing import Any
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.modules.tenancy.services.permission_discovery_service import (
permission_discovery_service,
)
def get_preset_permissions(preset_name: str) -> set[str]:
"""Get permissions for a preset role."""
return permission_discovery_service.get_preset_permissions(preset_name)
from app.modules.billing.exceptions import TierLimitExceededException
from app.modules.core.services.audit_aggregator import audit_aggregator
from app.modules.tenancy.exceptions import (
CannotRemoveOwnerException,
InvalidInvitationTokenException,
InvalidRoleException,
TeamInvitationAlreadyAcceptedException,
TeamMemberAlreadyExistsException,
UserNotFoundException,
)
from app.modules.tenancy.models import Role, Store, StorePlatform, StoreUser, User
from middleware.auth import AuthManager
logger = logging.getLogger(__name__)
class StoreTeamService:
"""Service for managing store team members."""
def __init__(self):
self.auth_manager = AuthManager()
def invite_team_member(
self,
db: Session,
store: Store,
inviter: User,
email: str,
role_name: str,
first_name: str | None = None,
last_name: str | None = None,
custom_permissions: list[str] | None = None,
) -> dict[str, Any]:
"""
Invite a new team member to a store.
Creates:
1. User account (if doesn't exist)
2. Role (if custom permissions provided)
3. StoreUser relationship with invitation token
Args:
db: Database session
store: Store to invite to
inviter: User sending the invitation
email: Email of person to invite
role_name: Role name (manager, staff, support, etc.)
custom_permissions: Optional custom permissions (overrides preset)
Returns:
Dict with invitation details
"""
try:
# Check team size limit from subscription (skip if no subscription)
try:
from app.modules.billing.services.usage_service import usage_service
limit_check = usage_service.check_limit(db, store.id, "team_members")
if limit_check.limit is not None and not limit_check.can_proceed:
raise TierLimitExceededException(
message=limit_check.message or "Team member limit reached",
limit_type="team_members",
current=limit_check.current,
limit=limit_check.limit,
)
except TierLimitExceededException:
raise
except Exception as e: # noqa: EXC003
logger.warning(f"Could not check team limit (proceeding): {e}")
# Check if user already exists
user = db.query(User).filter(User.email == email).first()
if user:
# Check if already a member
existing_membership = (
db.query(StoreUser)
.filter(
StoreUser.store_id == store.id,
StoreUser.user_id == user.id,
)
.first()
)
if existing_membership:
if existing_membership.is_active:
raise TeamMemberAlreadyExistsException(
email, store.store_code
)
# Reactivate old membership
existing_membership.is_active = (
False # Will be activated on acceptance
)
existing_membership.invitation_token = (
self._generate_invitation_token()
)
existing_membership.invitation_sent_at = datetime.utcnow()
existing_membership.invitation_accepted_at = None
db.flush()
logger.info(
f"Re-invited user {email} to store {store.store_code}"
)
return {
"invitation_token": existing_membership.invitation_token,
"email": email,
"existing_user": True,
}
else:
# Create new user account (inactive until invitation accepted)
username = email.split("@")[0]
# Ensure unique username
base_username = username
counter = 1
while db.query(User).filter(User.username == username).first():
username = f"{base_username}{counter}"
counter += 1
# Generate temporary password (user will set real one on activation)
temp_password = secrets.token_urlsafe(16)
user = User(
email=email,
username=username,
hashed_password=self.auth_manager.hash_password(temp_password),
first_name=first_name,
last_name=last_name,
role="store_member",
is_active=False, # Will be activated when invitation accepted
is_email_verified=False,
)
db.add(user)
db.flush() # Get user.id
logger.info(f"Created new user account for invitation: {email}")
# Get or create role
role = self._get_or_create_role(
db=db,
store=store,
role_name=role_name,
custom_permissions=custom_permissions,
)
# Create store membership with invitation
invitation_token = self._generate_invitation_token()
store_user = StoreUser(
store_id=store.id,
user_id=user.id,
role_id=role.id,
invited_by=inviter.id,
invitation_token=invitation_token,
invitation_sent_at=datetime.utcnow(),
is_active=False, # Will be activated on acceptance
)
db.add(store_user)
db.flush()
logger.info(
f"Invited {email} to store {store.store_code} "
f"as {role_name} by {inviter.username}"
)
try:
self._send_invitation_email(
db=db,
email=email,
store=store,
token=invitation_token,
inviter=inviter,
role_name=role_name,
)
except Exception: # noqa: EXC003
logger.exception(f"Failed to send invitation email to {email}")
audit_aggregator.log(
db=db,
admin_user_id=inviter.id,
action="member.invite",
target_type="store_user",
target_id=str(store_user.id),
details={
"email": email,
"role": role_name,
"store_id": store.id,
"store_code": store.store_code,
},
)
return {
"invitation_token": invitation_token,
"email": email,
"role": role_name,
"existing_user": user.is_active,
}
except (TeamMemberAlreadyExistsException, TierLimitExceededException):
raise
except SQLAlchemyError as e:
logger.error(f"Error inviting team member: {str(e)}")
raise
def accept_invitation(
self,
db: Session,
invitation_token: str,
password: str,
first_name: str | None = None,
last_name: str | None = None,
) -> dict[str, Any]:
"""
Accept a team invitation and activate account.
Args:
db: Database session
invitation_token: Invitation token from email
password: New password to set
first_name: Optional first name
last_name: Optional last name
Returns:
Dict with user and store info
"""
try:
# Find invitation
store_user = (
db.query(StoreUser)
.filter(
StoreUser.invitation_token == invitation_token,
)
.first()
)
if not store_user:
raise InvalidInvitationTokenException()
# Check if already accepted
if store_user.invitation_accepted_at is not None:
raise TeamInvitationAlreadyAcceptedException()
# Check token expiration (7 days)
if store_user.invitation_sent_at:
expiry_date = store_user.invitation_sent_at + timedelta(days=7)
if datetime.utcnow() > expiry_date:
raise InvalidInvitationTokenException("Invitation has expired")
user = store_user.user
store = store_user.store
# Update user
user.hashed_password = self.auth_manager.hash_password(password)
user.is_active = True
user.is_email_verified = True
if first_name:
user.first_name = first_name
if last_name:
user.last_name = last_name
# Activate membership
store_user.is_active = True
store_user.invitation_accepted_at = datetime.utcnow()
store_user.invitation_token = None # Clear token
db.flush()
logger.info(
f"User {user.email} accepted invitation to store {store.store_code}"
)
return {
"user": user,
"store": store,
"role": store_user.role.name if store_user.role else "member",
}
except (
InvalidInvitationTokenException,
TeamInvitationAlreadyAcceptedException,
):
raise
except SQLAlchemyError as e:
logger.error(f"Error accepting invitation: {str(e)}")
raise
def remove_team_member(
self,
db: Session,
store: Store,
user_id: int,
actor_user_id: int | None = None,
) -> bool:
"""
Remove a team member from a store.
Cannot remove owner.
Args:
db: Database session
store: Store to remove from
user_id: User ID to remove
Returns:
True if removed
"""
try:
store_user = (
db.query(StoreUser)
.filter(
StoreUser.store_id == store.id,
StoreUser.user_id == user_id,
)
.first()
)
if not store_user:
raise UserNotFoundException(str(user_id))
# Cannot remove owner
if store_user.is_owner:
raise CannotRemoveOwnerException(user_id, store.id)
from app.core.soft_delete import soft_delete
soft_delete(db, store_user, deleted_by_id=actor_user_id)
logger.info(f"Removed user {user_id} from store {store.store_code}")
if actor_user_id is not None:
audit_aggregator.log(
db=db,
admin_user_id=actor_user_id,
action="member.remove",
target_type="store_user",
target_id=str(store_user.id),
details={
"user_id": user_id,
"store_id": store.id,
"store_code": store.store_code,
},
)
return True
except (UserNotFoundException, CannotRemoveOwnerException):
raise
except SQLAlchemyError as e:
logger.error(f"Error removing team member: {str(e)}")
raise
def update_member_role(
self,
db: Session,
store: Store,
user_id: int,
new_role_name: str,
custom_permissions: list[str] | None = None,
actor_user_id: int | None = None,
) -> StoreUser:
"""
Update a team member's role.
Args:
db: Database session
store: Store
user_id: User ID
new_role_name: New role name
custom_permissions: Optional custom permissions
Returns:
Updated StoreUser
"""
try:
store_user = (
db.query(StoreUser)
.filter(
StoreUser.store_id == store.id,
StoreUser.user_id == user_id,
)
.first()
)
if not store_user:
raise UserNotFoundException(str(user_id))
# Cannot change owner's role
if store_user.is_owner:
raise CannotRemoveOwnerException(user_id, store.id)
# Get or create new role
new_role = self._get_or_create_role(
db=db,
store=store,
role_name=new_role_name,
custom_permissions=custom_permissions,
)
old_role_name = store_user.role.name if store_user.role else "none"
store_user.role_id = new_role.id
db.flush()
db.refresh(store_user)
logger.info(
f"Updated role for user {user_id} in store {store.store_code} "
f"to {new_role_name}"
)
if actor_user_id is not None:
audit_aggregator.log(
db=db,
admin_user_id=actor_user_id,
action="member.role_change",
target_type="store_user",
target_id=str(store_user.id),
details={
"user_id": user_id,
"store_id": store.id,
"old_role": old_role_name,
"new_role": new_role_name,
},
)
return store_user
except (UserNotFoundException, CannotRemoveOwnerException):
raise
except SQLAlchemyError as e:
logger.error(f"Error updating member role: {str(e)}")
raise
def update_member(
self,
db: Session,
store: Store,
user_id: int,
role_id: int | None = None,
is_active: bool | None = None,
actor_user_id: int | None = None,
) -> StoreUser:
"""
Update a team member's role (by ID) and/or active status.
Args:
db: Database session
store: Store
user_id: User ID
role_id: New role ID (must belong to this store)
is_active: New active status
actor_user_id: Actor performing the update
Returns:
Updated StoreUser
"""
store_user = (
db.query(StoreUser)
.filter(StoreUser.store_id == store.id, StoreUser.user_id == user_id)
.first()
)
if not store_user:
raise UserNotFoundException(str(user_id))
if role_id is not None:
role = (
db.query(Role)
.filter(Role.id == role_id, Role.store_id == store.id)
.first()
)
if not role:
raise InvalidRoleException(f"Role {role_id} not found in store {store.id}")
self.update_member_role(
db=db,
store=store,
user_id=user_id,
new_role_name=role.name,
actor_user_id=actor_user_id,
)
if is_active is not None:
store_user.is_active = is_active
db.flush()
db.refresh(store_user)
return store_user
def get_team_members(
self,
db: Session,
store: Store,
include_inactive: bool = False,
) -> list[dict[str, Any]]:
"""
Get all team members for a store.
Args:
db: Database session
store: Store
include_inactive: Include inactive members
Returns:
List of team member info
"""
query = db.query(StoreUser).filter(
StoreUser.store_id == store.id,
)
if not include_inactive:
query = query.filter(StoreUser.is_active == True)
store_users = query.all()
members = []
for vu in store_users:
members.append(
{
"id": vu.user.id,
"email": vu.user.email,
"username": vu.user.username,
"first_name": vu.user.first_name,
"last_name": vu.user.last_name,
"full_name": vu.user.full_name,
"role_name": vu.role.name if vu.role else "owner",
"role_id": vu.role.id if vu.role else None,
"permissions": vu.get_all_permissions(),
"is_active": vu.is_active,
"is_owner": vu.is_owner,
"invitation_pending": vu.is_invitation_pending,
"invited_at": vu.invitation_sent_at,
"accepted_at": vu.invitation_accepted_at,
"joined_at": vu.invitation_accepted_at or vu.created_at or vu.user.created_at,
}
)
return members
def get_store_roles(self, db: Session, store_id: int) -> list[dict[str, Any]]:
"""
Get all roles for a store.
Creates default preset roles if none exist.
Args:
db: Database session
store_id: Store ID
Returns:
List of role info dicts
"""
roles = db.query(Role).filter(Role.store_id == store_id).all()
# Create default roles if none exist
if not roles:
default_role_names = ["manager", "staff", "support", "viewer", "marketing"]
for role_name in default_role_names:
permissions = list(get_preset_permissions(role_name))
role = Role(
store_id=store_id,
name=role_name,
permissions=permissions,
)
db.add(role) # noqa: PERF006
db.flush() # Flush to get IDs without committing (endpoint commits)
roles = db.query(Role).filter(Role.store_id == store_id).all()
return [
{
"id": role.id,
"name": role.name,
"permissions": role.permissions or [],
"store_id": role.store_id,
"created_at": role.created_at,
"updated_at": role.updated_at,
}
for role in roles
]
# ========================================================================
# Role CRUD
# ========================================================================
PRESET_ROLE_NAMES = {"manager", "staff", "support", "viewer", "marketing"}
def create_custom_role(
self,
db: Session,
store_id: int,
name: str,
permissions: list[str],
actor_user_id: int | None = None,
) -> Role:
"""
Create a custom role for a store.
Args:
db: Database session
store_id: Store ID
name: Role name
permissions: List of permission IDs
actor_user_id: ID of user performing the action (for audit)
Returns:
Created Role object
Raises:
InvalidRoleException: If role name conflicts with a preset
"""
if name.lower() in self.PRESET_ROLE_NAMES:
raise InvalidRoleException(f"Cannot create role with preset name: {name}")
# Check for duplicate name
existing = (
db.query(Role)
.filter(Role.store_id == store_id, Role.name == name)
.first()
)
if existing:
raise InvalidRoleException(f"Role '{name}' already exists for this store")
# Validate permissions exist
valid_ids = permission_discovery_service.get_all_permission_ids()
invalid = set(permissions) - valid_ids
if invalid:
raise InvalidRoleException(f"Invalid permission IDs: {', '.join(sorted(invalid))}")
role = Role(
store_id=store_id,
name=name,
permissions=permissions,
)
db.add(role)
db.flush()
audit_aggregator.log(
db=db,
admin_user_id=actor_user_id,
action="role.create",
target_type="role",
target_id=str(role.id),
details={"name": name, "permissions_count": len(permissions), "store_id": store_id},
)
return role
def update_role(
self,
db: Session,
store_id: int,
role_id: int,
name: str | None = None,
permissions: list[str] | None = None,
actor_user_id: int | None = None,
) -> Role:
"""
Update a role's name and/or permissions.
Args:
db: Database session
store_id: Store ID (for ownership check)
role_id: Role ID to update
name: New role name (optional)
permissions: New permission list (optional)
Returns:
Updated Role object
Raises:
InvalidRoleException: If role not found or name conflicts
"""
role = (
db.query(Role)
.filter(Role.id == role_id, Role.store_id == store_id)
.first()
)
if not role:
raise InvalidRoleException(f"Role {role_id} not found for this store")
if name is not None:
if name.lower() in self.PRESET_ROLE_NAMES and role.name.lower() != name.lower():
raise InvalidRoleException(f"Cannot rename to preset name: {name}")
# Check duplicate
duplicate = (
db.query(Role)
.filter(
Role.store_id == store_id,
Role.name == name,
Role.id != role_id,
)
.first()
)
if duplicate:
raise InvalidRoleException(f"Role '{name}' already exists for this store")
role.name = name
if permissions is not None:
valid_ids = permission_discovery_service.get_all_permission_ids()
invalid = set(permissions) - valid_ids
if invalid:
raise InvalidRoleException(
f"Invalid permission IDs: {', '.join(sorted(invalid))}"
)
old_permissions = role.permissions or []
role.permissions = permissions
db.flush()
details = {"role_name": role.name, "store_id": store_id}
if name is not None:
details["new_name"] = name
if permissions is not None:
added = set(permissions) - set(old_permissions)
removed = set(old_permissions) - set(permissions)
if added:
details["permissions_added"] = sorted(added)
if removed:
details["permissions_removed"] = sorted(removed)
audit_aggregator.log(
db=db,
admin_user_id=actor_user_id,
action="role.update",
target_type="role",
target_id=str(role.id),
details=details,
)
return role
def delete_role(
self,
db: Session,
store_id: int,
role_id: int,
actor_user_id: int | None = None,
) -> None:
"""
Delete a custom role. Preset roles cannot be deleted.
Args:
db: Database session
store_id: Store ID (for ownership check)
role_id: Role ID to delete
Raises:
InvalidRoleException: If role not found or is a preset role
"""
role = (
db.query(Role)
.filter(Role.id == role_id, Role.store_id == store_id)
.first()
)
if not role:
raise InvalidRoleException(f"Role {role_id} not found for this store")
if role.name.lower() in self.PRESET_ROLE_NAMES:
raise InvalidRoleException(f"Cannot delete preset role: {role.name}")
# Check if any team members use this role
members_with_role = (
db.query(StoreUser)
.filter(StoreUser.store_id == store_id, StoreUser.role_id == role_id)
.count()
)
if members_with_role > 0:
raise InvalidRoleException(
f"Cannot delete role: {members_with_role} team member(s) still assigned"
)
role_name = role.name
db.delete(role)
db.flush()
audit_aggregator.log(
db=db,
admin_user_id=actor_user_id,
action="role.delete",
target_type="role",
target_id=str(role_id),
details={"role_name": role_name, "store_id": store_id},
)
# ========================================================================
# Admin Access Validation
# ========================================================================
def validate_admin_store_access(
self,
db: Session,
user_context,
store_id: int,
) -> Store:
"""
Verify an admin user can access the given store.
Super admins can access any store. Platform admins can only access
stores that belong to one of their assigned platforms.
Args:
db: Database session
user_context: UserContext of the admin user
store_id: Store ID to validate access to
Returns:
The Store object if access is granted
Raises:
InvalidRoleException: If store not found or admin lacks access
"""
store = db.query(Store).filter(Store.id == store_id).first()
if not store:
raise InvalidRoleException(f"Store {store_id} not found")
# Super admins (accessible_platform_ids is None) can access all stores
platform_ids = user_context.get_accessible_platform_ids()
if platform_ids is None:
return store
# Platform admins: store must belong to one of their platforms
store_in_platform = (
db.query(StorePlatform)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.platform_id.in_(platform_ids),
StorePlatform.is_active == True,
)
.first()
)
if not store_in_platform:
raise InvalidRoleException(
"You do not have access to this store's roles"
)
return store
# Private helper methods
def _generate_invitation_token(self) -> str:
"""Generate a secure invitation token."""
return secrets.token_urlsafe(32)
def _get_or_create_role(
self,
db: Session,
store: Store,
role_name: str,
custom_permissions: list[str] | None = None,
) -> Role:
"""Get existing role or create new one with preset/custom permissions."""
# Try to find existing role with same name
role = (
db.query(Role)
.filter(
Role.store_id == store.id,
Role.name == role_name,
)
.first()
)
if role and custom_permissions is None:
# Use existing role
return role
# Determine permissions
if custom_permissions:
permissions = custom_permissions
else:
# Get preset permissions
permissions = list(get_preset_permissions(role_name))
if role:
# Update existing role with new permissions
role.permissions = permissions
else:
# Create new role
role = Role(
store_id=store.id,
name=role_name,
permissions=permissions,
)
db.add(role)
db.flush()
return role
def resend_invitation(
self,
db: Session,
store: Store,
user_id: int,
inviter: User,
) -> dict[str, Any]:
"""
Resend invitation to a pending team member.
Generates a new token and resends the email.
Only works for pending invitations (not yet accepted).
"""
store_user = (
db.query(StoreUser)
.filter(
StoreUser.store_id == store.id,
StoreUser.user_id == user_id,
)
.first()
)
if not store_user:
raise UserNotFoundException(str(user_id))
if store_user.invitation_accepted_at is not None:
raise InvalidInvitationTokenException("Invitation already accepted")
# Generate new token and update sent time
new_token = self._generate_invitation_token()
store_user.invitation_token = new_token
store_user.invitation_sent_at = datetime.utcnow()
db.flush()
# Get role name for email
role_name = store_user.role.name if store_user.role else "member"
# Send email
try:
self._send_invitation_email(
db=db,
email=store_user.user.email,
store=store,
token=new_token,
inviter=inviter,
role_name=role_name,
)
except Exception: # noqa: EXC003
logger.exception(
f"Failed to resend invitation email to {store_user.user.email}"
)
logger.info(
f"Resent invitation to {store_user.user.email} for store "
f"{store.store_code} by {inviter.username}"
)
return {
"email": store_user.user.email,
"store_code": store.store_code,
"invitation_sent": True,
}
def _send_invitation_email(
self,
db: Session,
email: str,
store: Store,
token: str,
inviter: User,
role_name: str,
):
"""Send team invitation email."""
from app.modules.messaging.services.email_service import EmailService
acceptance_link = f"/store/invitation/accept?token={token}"
email_service = EmailService(db)
email_service.send_template(
template_code="team_invitation",
to_email=email,
variables={
"invited_by_name": inviter.username,
"store_name": store.name or store.store_code,
"role_name": role_name,
"acceptance_link": acceptance_link,
"expiry_days": "7",
},
store_id=store.id,
user_id=inviter.id,
related_type="store_user",
)
# Create service instance
store_team_service = StoreTeamService()