# app/modules/tenancy/services/store_team_service.py """ Store team management service. Handles: - Team member invitations - Invitation acceptance - Role assignment - Permission management """ import logging import secrets from datetime import datetime, timedelta from typing import Any from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from app.modules.tenancy.services.permission_discovery_service import ( permission_discovery_service, ) def get_preset_permissions(preset_name: str) -> set[str]: """Get permissions for a preset role.""" return permission_discovery_service.get_preset_permissions(preset_name) from app.modules.billing.exceptions import TierLimitExceededException from app.modules.core.services.audit_aggregator import audit_aggregator from app.modules.tenancy.exceptions import ( CannotRemoveOwnerException, InvalidInvitationTokenException, InvalidRoleException, TeamInvitationAlreadyAcceptedException, TeamMemberAlreadyExistsException, UserNotFoundException, ) from app.modules.tenancy.models import Role, Store, StorePlatform, StoreUser, User from middleware.auth import AuthManager logger = logging.getLogger(__name__) class StoreTeamService: """Service for managing store team members.""" def __init__(self): self.auth_manager = AuthManager() def invite_team_member( self, db: Session, store: Store, inviter: User, email: str, role_name: str, first_name: str | None = None, last_name: str | None = None, custom_permissions: list[str] | None = None, ) -> dict[str, Any]: """ Invite a new team member to a store. Creates: 1. User account (if doesn't exist) 2. Role (if custom permissions provided) 3. StoreUser relationship with invitation token Args: db: Database session store: Store to invite to inviter: User sending the invitation email: Email of person to invite role_name: Role name (manager, staff, support, etc.) custom_permissions: Optional custom permissions (overrides preset) Returns: Dict with invitation details """ try: # Check team size limit from subscription (skip if no subscription) try: from app.modules.billing.services.usage_service import usage_service limit_check = usage_service.check_limit(db, store.id, "team_members") if limit_check.limit is not None and not limit_check.can_proceed: raise TierLimitExceededException( message=limit_check.message or "Team member limit reached", limit_type="team_members", current=limit_check.current, limit=limit_check.limit, ) except TierLimitExceededException: raise except Exception as e: # noqa: EXC003 logger.warning(f"Could not check team limit (proceeding): {e}") # Check if user already exists user = db.query(User).filter(User.email == email).first() if user: # Check if already a member existing_membership = ( db.query(StoreUser) .filter( StoreUser.store_id == store.id, StoreUser.user_id == user.id, ) .first() ) if existing_membership: if existing_membership.is_active: raise TeamMemberAlreadyExistsException( email, store.store_code ) # Reactivate old membership existing_membership.is_active = ( False # Will be activated on acceptance ) existing_membership.invitation_token = ( self._generate_invitation_token() ) existing_membership.invitation_sent_at = datetime.utcnow() existing_membership.invitation_accepted_at = None db.flush() logger.info( f"Re-invited user {email} to store {store.store_code}" ) return { "invitation_token": existing_membership.invitation_token, "email": email, "existing_user": True, } else: # Create new user account (inactive until invitation accepted) username = email.split("@")[0] # Ensure unique username base_username = username counter = 1 while db.query(User).filter(User.username == username).first(): username = f"{base_username}{counter}" counter += 1 # Generate temporary password (user will set real one on activation) temp_password = secrets.token_urlsafe(16) user = User( email=email, username=username, hashed_password=self.auth_manager.hash_password(temp_password), first_name=first_name, last_name=last_name, role="store_member", is_active=False, # Will be activated when invitation accepted is_email_verified=False, ) db.add(user) db.flush() # Get user.id logger.info(f"Created new user account for invitation: {email}") # Get or create role role = self._get_or_create_role( db=db, store=store, role_name=role_name, custom_permissions=custom_permissions, ) # Create store membership with invitation invitation_token = self._generate_invitation_token() store_user = StoreUser( store_id=store.id, user_id=user.id, role_id=role.id, invited_by=inviter.id, invitation_token=invitation_token, invitation_sent_at=datetime.utcnow(), is_active=False, # Will be activated on acceptance ) db.add(store_user) db.flush() logger.info( f"Invited {email} to store {store.store_code} " f"as {role_name} by {inviter.username}" ) try: self._send_invitation_email( db=db, email=email, store=store, token=invitation_token, inviter=inviter, role_name=role_name, ) except Exception: # noqa: EXC003 logger.exception(f"Failed to send invitation email to {email}") audit_aggregator.log( db=db, admin_user_id=inviter.id, action="member.invite", target_type="store_user", target_id=str(store_user.id), details={ "email": email, "role": role_name, "store_id": store.id, "store_code": store.store_code, }, ) return { "invitation_token": invitation_token, "email": email, "role": role_name, "existing_user": user.is_active, } except (TeamMemberAlreadyExistsException, TierLimitExceededException): raise except SQLAlchemyError as e: logger.error(f"Error inviting team member: {str(e)}") raise def accept_invitation( self, db: Session, invitation_token: str, password: str, first_name: str | None = None, last_name: str | None = None, ) -> dict[str, Any]: """ Accept a team invitation and activate account. Args: db: Database session invitation_token: Invitation token from email password: New password to set first_name: Optional first name last_name: Optional last name Returns: Dict with user and store info """ try: # Find invitation store_user = ( db.query(StoreUser) .filter( StoreUser.invitation_token == invitation_token, ) .first() ) if not store_user: raise InvalidInvitationTokenException() # Check if already accepted if store_user.invitation_accepted_at is not None: raise TeamInvitationAlreadyAcceptedException() # Check token expiration (7 days) if store_user.invitation_sent_at: expiry_date = store_user.invitation_sent_at + timedelta(days=7) if datetime.utcnow() > expiry_date: raise InvalidInvitationTokenException("Invitation has expired") user = store_user.user store = store_user.store # Update user user.hashed_password = self.auth_manager.hash_password(password) user.is_active = True user.is_email_verified = True if first_name: user.first_name = first_name if last_name: user.last_name = last_name # Activate membership store_user.is_active = True store_user.invitation_accepted_at = datetime.utcnow() store_user.invitation_token = None # Clear token db.flush() logger.info( f"User {user.email} accepted invitation to store {store.store_code}" ) return { "user": user, "store": store, "role": store_user.role.name if store_user.role else "member", } except ( InvalidInvitationTokenException, TeamInvitationAlreadyAcceptedException, ): raise except SQLAlchemyError as e: logger.error(f"Error accepting invitation: {str(e)}") raise def remove_team_member( self, db: Session, store: Store, user_id: int, actor_user_id: int | None = None, ) -> bool: """ Remove a team member from a store. Cannot remove owner. Args: db: Database session store: Store to remove from user_id: User ID to remove Returns: True if removed """ try: store_user = ( db.query(StoreUser) .filter( StoreUser.store_id == store.id, StoreUser.user_id == user_id, ) .first() ) if not store_user: raise UserNotFoundException(str(user_id)) # Cannot remove owner if store_user.is_owner: raise CannotRemoveOwnerException(user_id, store.id) from app.core.soft_delete import soft_delete soft_delete(db, store_user, deleted_by_id=actor_user_id) logger.info(f"Removed user {user_id} from store {store.store_code}") if actor_user_id is not None: audit_aggregator.log( db=db, admin_user_id=actor_user_id, action="member.remove", target_type="store_user", target_id=str(store_user.id), details={ "user_id": user_id, "store_id": store.id, "store_code": store.store_code, }, ) return True except (UserNotFoundException, CannotRemoveOwnerException): raise except SQLAlchemyError as e: logger.error(f"Error removing team member: {str(e)}") raise def update_member_role( self, db: Session, store: Store, user_id: int, new_role_name: str, custom_permissions: list[str] | None = None, actor_user_id: int | None = None, ) -> StoreUser: """ Update a team member's role. Args: db: Database session store: Store user_id: User ID new_role_name: New role name custom_permissions: Optional custom permissions Returns: Updated StoreUser """ try: store_user = ( db.query(StoreUser) .filter( StoreUser.store_id == store.id, StoreUser.user_id == user_id, ) .first() ) if not store_user: raise UserNotFoundException(str(user_id)) # Cannot change owner's role if store_user.is_owner: raise CannotRemoveOwnerException(user_id, store.id) # Get or create new role new_role = self._get_or_create_role( db=db, store=store, role_name=new_role_name, custom_permissions=custom_permissions, ) old_role_name = store_user.role.name if store_user.role else "none" store_user.role_id = new_role.id db.flush() db.refresh(store_user) logger.info( f"Updated role for user {user_id} in store {store.store_code} " f"to {new_role_name}" ) if actor_user_id is not None: audit_aggregator.log( db=db, admin_user_id=actor_user_id, action="member.role_change", target_type="store_user", target_id=str(store_user.id), details={ "user_id": user_id, "store_id": store.id, "old_role": old_role_name, "new_role": new_role_name, }, ) return store_user except (UserNotFoundException, CannotRemoveOwnerException): raise except SQLAlchemyError as e: logger.error(f"Error updating member role: {str(e)}") raise def update_member( self, db: Session, store: Store, user_id: int, role_id: int | None = None, is_active: bool | None = None, actor_user_id: int | None = None, ) -> StoreUser: """ Update a team member's role (by ID) and/or active status. Args: db: Database session store: Store user_id: User ID role_id: New role ID (must belong to this store) is_active: New active status actor_user_id: Actor performing the update Returns: Updated StoreUser """ store_user = ( db.query(StoreUser) .filter(StoreUser.store_id == store.id, StoreUser.user_id == user_id) .first() ) if not store_user: raise UserNotFoundException(str(user_id)) if role_id is not None: role = ( db.query(Role) .filter(Role.id == role_id, Role.store_id == store.id) .first() ) if not role: raise InvalidRoleException(f"Role {role_id} not found in store {store.id}") self.update_member_role( db=db, store=store, user_id=user_id, new_role_name=role.name, actor_user_id=actor_user_id, ) if is_active is not None: store_user.is_active = is_active db.flush() db.refresh(store_user) return store_user def get_team_members( self, db: Session, store: Store, include_inactive: bool = False, ) -> list[dict[str, Any]]: """ Get all team members for a store. Args: db: Database session store: Store include_inactive: Include inactive members Returns: List of team member info """ query = db.query(StoreUser).filter( StoreUser.store_id == store.id, ) if not include_inactive: query = query.filter(StoreUser.is_active == True) store_users = query.all() members = [] for vu in store_users: members.append( { "id": vu.user.id, "email": vu.user.email, "username": vu.user.username, "first_name": vu.user.first_name, "last_name": vu.user.last_name, "full_name": vu.user.full_name, "role_name": vu.role.name if vu.role else "owner", "role_id": vu.role.id if vu.role else None, "permissions": vu.get_all_permissions(), "is_active": vu.is_active, "is_owner": vu.is_owner, "invitation_pending": vu.is_invitation_pending, "invited_at": vu.invitation_sent_at, "accepted_at": vu.invitation_accepted_at, "joined_at": vu.invitation_accepted_at or vu.created_at or vu.user.created_at, } ) return members def get_store_roles(self, db: Session, store_id: int) -> list[dict[str, Any]]: """ Get all roles for a store. Creates default preset roles if none exist. Args: db: Database session store_id: Store ID Returns: List of role info dicts """ roles = db.query(Role).filter(Role.store_id == store_id).all() # Create default roles if none exist if not roles: default_role_names = ["manager", "staff", "support", "viewer", "marketing"] for role_name in default_role_names: permissions = list(get_preset_permissions(role_name)) role = Role( store_id=store_id, name=role_name, permissions=permissions, ) db.add(role) # noqa: PERF006 db.flush() # Flush to get IDs without committing (endpoint commits) roles = db.query(Role).filter(Role.store_id == store_id).all() return [ { "id": role.id, "name": role.name, "permissions": role.permissions or [], "store_id": role.store_id, "created_at": role.created_at, "updated_at": role.updated_at, } for role in roles ] # ======================================================================== # Role CRUD # ======================================================================== PRESET_ROLE_NAMES = {"manager", "staff", "support", "viewer", "marketing"} def create_custom_role( self, db: Session, store_id: int, name: str, permissions: list[str], actor_user_id: int | None = None, ) -> Role: """ Create a custom role for a store. Args: db: Database session store_id: Store ID name: Role name permissions: List of permission IDs actor_user_id: ID of user performing the action (for audit) Returns: Created Role object Raises: InvalidRoleException: If role name conflicts with a preset """ if name.lower() in self.PRESET_ROLE_NAMES: raise InvalidRoleException(f"Cannot create role with preset name: {name}") # Check for duplicate name existing = ( db.query(Role) .filter(Role.store_id == store_id, Role.name == name) .first() ) if existing: raise InvalidRoleException(f"Role '{name}' already exists for this store") # Validate permissions exist valid_ids = permission_discovery_service.get_all_permission_ids() invalid = set(permissions) - valid_ids if invalid: raise InvalidRoleException(f"Invalid permission IDs: {', '.join(sorted(invalid))}") role = Role( store_id=store_id, name=name, permissions=permissions, ) db.add(role) db.flush() audit_aggregator.log( db=db, admin_user_id=actor_user_id, action="role.create", target_type="role", target_id=str(role.id), details={"name": name, "permissions_count": len(permissions), "store_id": store_id}, ) return role def update_role( self, db: Session, store_id: int, role_id: int, name: str | None = None, permissions: list[str] | None = None, actor_user_id: int | None = None, ) -> Role: """ Update a role's name and/or permissions. Args: db: Database session store_id: Store ID (for ownership check) role_id: Role ID to update name: New role name (optional) permissions: New permission list (optional) Returns: Updated Role object Raises: InvalidRoleException: If role not found or name conflicts """ role = ( db.query(Role) .filter(Role.id == role_id, Role.store_id == store_id) .first() ) if not role: raise InvalidRoleException(f"Role {role_id} not found for this store") if name is not None: if name.lower() in self.PRESET_ROLE_NAMES and role.name.lower() != name.lower(): raise InvalidRoleException(f"Cannot rename to preset name: {name}") # Check duplicate duplicate = ( db.query(Role) .filter( Role.store_id == store_id, Role.name == name, Role.id != role_id, ) .first() ) if duplicate: raise InvalidRoleException(f"Role '{name}' already exists for this store") role.name = name if permissions is not None: valid_ids = permission_discovery_service.get_all_permission_ids() invalid = set(permissions) - valid_ids if invalid: raise InvalidRoleException( f"Invalid permission IDs: {', '.join(sorted(invalid))}" ) old_permissions = role.permissions or [] role.permissions = permissions db.flush() details = {"role_name": role.name, "store_id": store_id} if name is not None: details["new_name"] = name if permissions is not None: added = set(permissions) - set(old_permissions) removed = set(old_permissions) - set(permissions) if added: details["permissions_added"] = sorted(added) if removed: details["permissions_removed"] = sorted(removed) audit_aggregator.log( db=db, admin_user_id=actor_user_id, action="role.update", target_type="role", target_id=str(role.id), details=details, ) return role def delete_role( self, db: Session, store_id: int, role_id: int, actor_user_id: int | None = None, ) -> None: """ Delete a custom role. Preset roles cannot be deleted. Args: db: Database session store_id: Store ID (for ownership check) role_id: Role ID to delete Raises: InvalidRoleException: If role not found or is a preset role """ role = ( db.query(Role) .filter(Role.id == role_id, Role.store_id == store_id) .first() ) if not role: raise InvalidRoleException(f"Role {role_id} not found for this store") if role.name.lower() in self.PRESET_ROLE_NAMES: raise InvalidRoleException(f"Cannot delete preset role: {role.name}") # Check if any team members use this role members_with_role = ( db.query(StoreUser) .filter(StoreUser.store_id == store_id, StoreUser.role_id == role_id) .count() ) if members_with_role > 0: raise InvalidRoleException( f"Cannot delete role: {members_with_role} team member(s) still assigned" ) role_name = role.name db.delete(role) db.flush() audit_aggregator.log( db=db, admin_user_id=actor_user_id, action="role.delete", target_type="role", target_id=str(role_id), details={"role_name": role_name, "store_id": store_id}, ) # ======================================================================== # Admin Access Validation # ======================================================================== def validate_admin_store_access( self, db: Session, user_context, store_id: int, ) -> Store: """ Verify an admin user can access the given store. Super admins can access any store. Platform admins can only access stores that belong to one of their assigned platforms. Args: db: Database session user_context: UserContext of the admin user store_id: Store ID to validate access to Returns: The Store object if access is granted Raises: InvalidRoleException: If store not found or admin lacks access """ store = db.query(Store).filter(Store.id == store_id).first() if not store: raise InvalidRoleException(f"Store {store_id} not found") # Super admins (accessible_platform_ids is None) can access all stores platform_ids = user_context.get_accessible_platform_ids() if platform_ids is None: return store # Platform admins: store must belong to one of their platforms store_in_platform = ( db.query(StorePlatform) .filter( StorePlatform.store_id == store_id, StorePlatform.platform_id.in_(platform_ids), StorePlatform.is_active == True, ) .first() ) if not store_in_platform: raise InvalidRoleException( "You do not have access to this store's roles" ) return store # Private helper methods def _generate_invitation_token(self) -> str: """Generate a secure invitation token.""" return secrets.token_urlsafe(32) def _get_or_create_role( self, db: Session, store: Store, role_name: str, custom_permissions: list[str] | None = None, ) -> Role: """Get existing role or create new one with preset/custom permissions.""" # Try to find existing role with same name role = ( db.query(Role) .filter( Role.store_id == store.id, Role.name == role_name, ) .first() ) if role and custom_permissions is None: # Use existing role return role # Determine permissions if custom_permissions: permissions = custom_permissions else: # Get preset permissions permissions = list(get_preset_permissions(role_name)) if role: # Update existing role with new permissions role.permissions = permissions else: # Create new role role = Role( store_id=store.id, name=role_name, permissions=permissions, ) db.add(role) db.flush() return role def resend_invitation( self, db: Session, store: Store, user_id: int, inviter: User, ) -> dict[str, Any]: """ Resend invitation to a pending team member. Generates a new token and resends the email. Only works for pending invitations (not yet accepted). """ store_user = ( db.query(StoreUser) .filter( StoreUser.store_id == store.id, StoreUser.user_id == user_id, ) .first() ) if not store_user: raise UserNotFoundException(str(user_id)) if store_user.invitation_accepted_at is not None: raise InvalidInvitationTokenException("Invitation already accepted") # Generate new token and update sent time new_token = self._generate_invitation_token() store_user.invitation_token = new_token store_user.invitation_sent_at = datetime.utcnow() db.flush() # Get role name for email role_name = store_user.role.name if store_user.role else "member" # Send email try: self._send_invitation_email( db=db, email=store_user.user.email, store=store, token=new_token, inviter=inviter, role_name=role_name, ) except Exception: # noqa: EXC003 logger.exception( f"Failed to resend invitation email to {store_user.user.email}" ) logger.info( f"Resent invitation to {store_user.user.email} for store " f"{store.store_code} by {inviter.username}" ) return { "email": store_user.user.email, "store_code": store.store_code, "invitation_sent": True, } def _send_invitation_email( self, db: Session, email: str, store: Store, token: str, inviter: User, role_name: str, ): """Send team invitation email.""" from app.core.config import settings as app_settings from app.modules.messaging.services.email_service import EmailService base_url = app_settings.app_base_url.rstrip("/") acceptance_link = f"{base_url}/store/{store.store_code}/invitation/accept?token={token}" email_service = EmailService(db) email_service.send_template( template_code="team_invitation", to_email=email, variables={ "invited_by_name": inviter.username, "store_name": store.name or store.store_code, "role_name": role_name, "acceptance_link": acceptance_link, "expiry_days": "7", }, store_id=store.id, user_id=inviter.id, related_type="store_user", ) # Create service instance store_team_service = StoreTeamService()