Vendor team member management features

This commit is contained in:
2025-11-14 21:08:57 +01:00
parent af23f5b88f
commit 41439eed09
10 changed files with 1786 additions and 85 deletions

View File

@@ -51,7 +51,11 @@ dev-full: dev-with-docs
# =============================================================================
migrate-create:
@if "$(message)"=="" (echo Error: Please provide a message. Usage: make migrate-create message="your_description") else ($(PYTHON) -m alembic revision --autogenerate -m "$(message)")
@if [ "$(message)" = "" ]; then \
echo "Error: Please provide a message. Usage: make migrate-create message=\"your_description\""; \
else \
$(PYTHON) -m alembic revision --autogenerate -m "$(message)"; \
fi
migrate-create-manual:
@if "$(message)"=="" (echo Error: Please provide a message. Usage: make migrate-create-manual message="your_description") else ($(PYTHON) -m alembic revision -m "$(message)")

View File

@@ -498,3 +498,179 @@ def get_user_vendor(
# User doesn't have access to this vendor
raise UnauthorizedVendorAccessException(vendor_code, current_user.id)
# ============================================================================
# PERMISSIONS CHECKING
# ============================================================================
def require_vendor_permission(permission: str):
"""
Dependency factory to require a specific vendor permission.
Usage:
@router.get("/products")
def list_products(
vendor: Vendor = Depends(get_vendor_from_code),
user: User = Depends(require_vendor_permission(VendorPermissions.PRODUCTS_VIEW.value))
):
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
# Get vendor from request state (set by middleware)
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorAccessDeniedException("No vendor context")
# Check if user has permission
if not current_user.has_vendor_permission(vendor.id, permission):
raise InsufficientVendorPermissionsException(
required_permission=permission,
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_vendor_owner(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
"""
Dependency to require vendor owner role.
Usage:
@router.delete("/team/{user_id}")
def remove_team_member(
user: User = Depends(require_vendor_owner)
):
...
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorAccessDeniedException("No vendor context")
if not current_user.is_owner_of(vendor.id):
raise VendorOwnerOnlyException(
operation="team management",
vendor_code=vendor.vendor_code,
)
return current_user
def require_any_vendor_permission(*permissions: str):
"""
Dependency factory to require ANY of the specified permissions.
Usage:
@router.get("/dashboard")
def dashboard(
user: User = Depends(require_any_vendor_permission(
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.REPORTS_VIEW.value
))
):
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorAccessDeniedException("No vendor context")
# Check if user has ANY of the required permissions
has_permission = any(
current_user.has_vendor_permission(vendor.id, perm)
for perm in permissions
)
if not has_permission:
raise InsufficientVendorPermissionsException(
required_permission=f"Any of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def require_all_vendor_permissions(*permissions: str):
"""
Dependency factory to require ALL of the specified permissions.
Usage:
@router.post("/products/bulk-delete")
def bulk_delete_products(
user: User = Depends(require_all_vendor_permissions(
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.PRODUCTS_DELETE.value
))
):
...
"""
def permission_checker(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> User:
vendor = getattr(request.state, "vendor", None)
if not vendor:
raise VendorAccessDeniedException("No vendor context")
# Check if user has ALL required permissions
missing_permissions = [
perm for perm in permissions
if not current_user.has_vendor_permission(vendor.id, perm)
]
if missing_permissions:
raise InsufficientVendorPermissionsException(
required_permission=f"All of: {', '.join(permissions)}",
vendor_code=vendor.vendor_code,
)
return current_user
return permission_checker
def get_user_permissions(
request: Request,
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
) -> list:
"""
Get all permissions for current user in current vendor.
Returns empty list if no vendor context.
"""
vendor = getattr(request.state, "vendor", None)
if not vendor:
return []
# If owner, return all permissions
if current_user.is_owner_of(vendor.id):
from app.core.permissions import VendorPermissions
return [p.value for p in VendorPermissions]
# Get permissions from vendor membership
for vm in current_user.vendor_memberships:
if vm.vendor_id == vendor.id and vm.is_active:
return vm.get_all_permissions()
return []

View File

@@ -1,73 +1,502 @@
# app/api/v1/vendor/teams.py
"""
Vendor team member management endpoints.
Implements complete team management with:
- Team member listing
- Invitation system
- Role management
- Permission checking
- RBAC integration
"""
import logging
from fastapi import APIRouter, Depends
from typing import List
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from app.api.deps import get_current_user
from app.core.database import get_db
from middleware.vendor_context import require_vendor_context
from app.services.team_service import team_service
from app.core.permissions import VendorPermissions
from app.api.deps import (
get_current_vendor_from_cookie_or_header,
require_vendor_owner,
require_vendor_permission,
get_user_permissions
)
from app.services.vendor_team_service import vendor_team_service
from models.database.user import User
from models.database.vendor import Vendor
from models.schema.team import (
TeamMemberInvite,
TeamMemberUpdate,
TeamMemberResponse,
TeamMemberListResponse,
InvitationAccept,
InvitationResponse,
InvitationAcceptResponse,
RoleResponse,
RoleListResponse,
UserPermissionsResponse,
TeamStatistics,
BulkRemoveRequest,
BulkRemoveResponse,
)
router = APIRouter(prefix="/teams")
router = APIRouter(prefix="/team")
logger = logging.getLogger(__name__)
@router.get("/members")
def get_team_members(
vendor: Vendor = Depends(require_vendor_context()),
current_user: User = Depends(get_current_user),
# ============================================================================
# Team Member Routes
# ============================================================================
@router.get("/members", response_model=TeamMemberListResponse)
def list_team_members(
request: Request,
include_inactive: bool = False,
db: Session = Depends(get_db),
current_user: User = Depends(require_vendor_permission(
VendorPermissions.TEAM_VIEW.value
))
):
"""Get all team members for vendor."""
return team_service.get_team_members(db, vendor.id, current_user)
"""
Get all team members for current vendor.
**Required Permission:** `team.view`
**Query Parameters:**
- `include_inactive`: Include inactive team members (default: False)
**Returns:**
- List of team members with their roles and permissions
- Statistics (total, active, pending)
"""
vendor = request.state.vendor
members = vendor_team_service.get_team_members(
db=db,
vendor=vendor,
include_inactive=include_inactive
)
# Calculate statistics
total = len(members)
active = sum(1 for m in members if m["is_active"])
pending = sum(1 for m in members if m["invitation_pending"])
logger.info(
f"Listed {total} team members for vendor {vendor.vendor_code} "
f"(active: {active}, pending: {pending})"
)
return TeamMemberListResponse(
members=members,
total=total,
active_count=active,
pending_invitations=pending
)
@router.post("/invite")
@router.post("/invite", response_model=InvitationResponse)
def invite_team_member(
invitation_data: dict,
vendor: Vendor = Depends(require_vendor_context()),
current_user: User = Depends(get_current_user),
invitation: TeamMemberInvite,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_vendor_owner) # Owner only
):
"""Invite a new team member."""
return team_service.invite_team_member(db, vendor.id, invitation_data, current_user)
"""
Invite a new team member to the vendor.
**Required:** Vendor owner role
**Process:**
1. Create user account (if doesn't exist)
2. Create VendorUser with invitation token
3. Send invitation email
**Request Body:**
- `email`: Email address of invitee
- `first_name`, `last_name`: Optional names
- `role_name`: Preset role (manager, staff, support, viewer, marketing)
- `role_id`: Use existing role (alternative to role_name)
- `custom_permissions`: Override role permissions (requires role_name)
**Returns:**
- Invitation details
- Confirmation of email sent
"""
vendor = request.state.vendor
# Determine role approach
if invitation.role_id:
# Use existing role by ID
result = vendor_team_service.invite_team_member(
db=db,
vendor=vendor,
inviter=current_user,
email=invitation.email,
role_id=invitation.role_id
)
elif invitation.role_name:
# Use role name with optional custom permissions
result = vendor_team_service.invite_team_member(
db=db,
vendor=vendor,
inviter=current_user,
email=invitation.email,
role_name=invitation.role_name,
custom_permissions=invitation.custom_permissions
)
else:
# Default to Staff role
result = vendor_team_service.invite_team_member(
db=db,
vendor=vendor,
inviter=current_user,
email=invitation.email,
role_name="staff"
)
logger.info(
f"Invitation sent: {invitation.email} to {vendor.vendor_code} "
f"by {current_user.username}"
)
return InvitationResponse(
message="Invitation sent successfully",
email=result["email"],
role=result["role"],
invitation_sent=True
)
@router.put("/members/{user_id}")
@router.post("/accept-invitation", response_model=InvitationAcceptResponse)
def accept_invitation(
acceptance: InvitationAccept,
db: Session = Depends(get_db)
):
"""
Accept a team invitation and activate account.
**No authentication required** - uses invitation token.
**Request Body:**
- `invitation_token`: Token from invitation email
- `password`: New password (min 8 chars, must have upper, lower, digit)
- `first_name`, `last_name`: User's name
**Returns:**
- Confirmation message
- Vendor information
- User information
- Assigned role
"""
result = vendor_team_service.accept_invitation(
db=db,
invitation_token=acceptance.invitation_token,
password=acceptance.password,
first_name=acceptance.first_name,
last_name=acceptance.last_name
)
logger.info(
f"Invitation accepted: {result['user'].email} "
f"for vendor {result['vendor'].vendor_code}"
)
return InvitationAcceptResponse(
message="Invitation accepted successfully. You can now login.",
vendor={
"id": result["vendor"].id,
"vendor_code": result["vendor"].vendor_code,
"name": result["vendor"].name,
"subdomain": result["vendor"].subdomain
},
user={
"id": result["user"].id,
"email": result["user"].email,
"username": result["user"].username,
"full_name": result["user"].full_name
},
role=result["role"]
)
@router.get("/members/{user_id}", response_model=TeamMemberResponse)
def get_team_member(
user_id: int,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_vendor_permission(
VendorPermissions.TEAM_VIEW.value
))
):
"""
Get details of a specific team member.
**Required Permission:** `team.view`
"""
vendor = request.state.vendor
members = vendor_team_service.get_team_members(
db=db,
vendor=vendor,
include_inactive=True
)
member = next((m for m in members if m["id"] == user_id), None)
if not member:
from app.exceptions import UserNotFoundException
raise UserNotFoundException(str(user_id))
return TeamMemberResponse(**member)
@router.put("/members/{user_id}", response_model=TeamMemberResponse)
def update_team_member(
user_id: int,
update_data: dict,
vendor: Vendor = Depends(require_vendor_context()),
current_user: User = Depends(get_current_user),
update_data: TeamMemberUpdate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_vendor_owner) # Owner only
):
"""Update team member role or status."""
return team_service.update_team_member(db, vendor.id, user_id, update_data, current_user)
"""
Update a team member's role or status.
**Required:** Vendor owner role
**Cannot:**
- Change owner's role
- Remove owner
**Request Body:**
- `role_id`: New role ID (optional)
- `is_active`: Active status (optional)
"""
vendor = request.state.vendor
vendor_user = vendor_team_service.update_member_role(
db=db,
vendor=vendor,
user_id=user_id,
new_role_id=update_data.role_id,
is_active=update_data.is_active
)
logger.info(
f"Team member updated: {user_id} in {vendor.vendor_code} "
f"by {current_user.username}"
)
# Return updated member details
members = vendor_team_service.get_team_members(db, vendor, include_inactive=True)
member = next((m for m in members if m["id"] == user_id), None)
return TeamMemberResponse(**member)
@router.delete("/members/{user_id}")
def remove_team_member(
user_id: int,
vendor: Vendor = Depends(require_vendor_context()),
current_user: User = Depends(get_current_user),
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_vendor_owner) # Owner only
):
"""Remove team member from vendor."""
team_service.remove_team_member(db, vendor.id, user_id, current_user)
return {"message": "Team member removed successfully"}
"""
Remove a team member from the vendor.
**Required:** Vendor owner role
**Cannot remove:**
- Vendor owner
**Action:**
- Soft delete (sets is_active = False)
- Member can be re-invited later
"""
vendor = request.state.vendor
vendor_team_service.remove_team_member(
db=db,
vendor=vendor,
user_id=user_id
)
logger.info(
f"Team member removed: {user_id} from {vendor.vendor_code} "
f"by {current_user.username}"
)
return {
"message": "Team member removed successfully",
"user_id": user_id
}
@router.get("/roles")
def get_team_roles(
vendor: Vendor = Depends(require_vendor_context()),
current_user: User = Depends(get_current_user),
@router.post("/members/bulk-remove", response_model=BulkRemoveResponse)
def bulk_remove_team_members(
bulk_remove: BulkRemoveRequest,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_vendor_owner)
):
"""Get available roles for vendor team."""
return team_service.get_vendor_roles(db, vendor.id)
"""
Remove multiple team members at once.
**Required:** Vendor owner role
"""
vendor = request.state.vendor
success_count = 0
failed_count = 0
errors = []
for user_id in bulk_remove.user_ids:
try:
vendor_team_service.remove_team_member(
db=db,
vendor=vendor,
user_id=user_id
)
success_count += 1
except Exception as e:
failed_count += 1
errors.append({
"user_id": user_id,
"error": str(e)
})
logger.info(
f"Bulk remove completed: {success_count} removed, {failed_count} failed "
f"in {vendor.vendor_code}"
)
return BulkRemoveResponse(
success_count=success_count,
failed_count=failed_count,
errors=errors
)
# ============================================================================
# Role Management Routes
# ============================================================================
@router.get("/roles", response_model=RoleListResponse)
def list_roles(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_vendor_permission(
VendorPermissions.TEAM_VIEW.value
))
):
"""
Get all available roles for the vendor.
**Required Permission:** `team.view`
**Returns:**
- List of roles with permissions
- Includes both preset and custom roles
"""
vendor = request.state.vendor
roles = vendor_team_service.get_vendor_roles(db=db, vendor_id=vendor.id)
return RoleListResponse(
roles=roles,
total=len(roles)
)
# ============================================================================
# Permission Routes
# ============================================================================
@router.get("/me/permissions", response_model=UserPermissionsResponse)
def get_my_permissions(
request: Request,
permissions: List[str] = Depends(get_user_permissions),
current_user: User = Depends(get_current_vendor_from_cookie_or_header)
):
"""
Get current user's permissions in this vendor.
**Use this endpoint to:**
- Determine what UI elements to show/hide
- Check permissions before making API calls
- Display user's role and capabilities
**Returns:**
- Complete list of permissions
- Whether user is owner
- Role name (if team member)
"""
vendor = request.state.vendor
is_owner = current_user.is_owner_of(vendor.id)
role_name = current_user.get_vendor_role(vendor.id)
return UserPermissionsResponse(
permissions=permissions,
permission_count=len(permissions),
is_owner=is_owner,
role_name=role_name
)
# ============================================================================
# Statistics Routes
# ============================================================================
@router.get("/statistics", response_model=TeamStatistics)
def get_team_statistics(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_vendor_permission(
VendorPermissions.TEAM_VIEW.value
))
):
"""
Get team statistics for the vendor.
**Required Permission:** `team.view`
**Returns:**
- Total members
- Active/inactive breakdown
- Pending invitations
- Owner count
- Role distribution
"""
vendor = request.state.vendor
members = vendor_team_service.get_team_members(
db=db,
vendor=vendor,
include_inactive=True
)
# Calculate statistics
total = len(members)
active = sum(1 for m in members if m["is_active"])
inactive = total - active
pending = sum(1 for m in members if m["invitation_pending"])
owners = sum(1 for m in members if m["is_owner"])
team_members = total - owners
# Role breakdown
roles_breakdown = {}
for member in members:
role = member["role_name"]
roles_breakdown[role] = roles_breakdown.get(role, 0) + 1
return TeamStatistics(
total_members=total,
active_members=active,
inactive_members=inactive,
pending_invitations=pending,
owners=owners,
team_members=team_members,
roles_breakdown=roles_breakdown
)

203
app/core/permissions.py Normal file
View File

@@ -0,0 +1,203 @@
# app/core/permissions.py
"""
Permission constants and checking logic for RBAC.
This module defines:
- Vendor-specific permissions
- Permission groups (for easier role creation)
- Permission checking utilities
"""
from enum import Enum
from typing import List, Set
class VendorPermissions(str, Enum):
"""
All available permissions within a vendor context.
Naming convention: RESOURCE_ACTION
"""
# Dashboard
DASHBOARD_VIEW = "dashboard.view"
# Products
PRODUCTS_VIEW = "products.view"
PRODUCTS_CREATE = "products.create"
PRODUCTS_EDIT = "products.edit"
PRODUCTS_DELETE = "products.delete"
PRODUCTS_IMPORT = "products.import"
PRODUCTS_EXPORT = "products.export"
# Stock/Inventory
STOCK_VIEW = "stock.view"
STOCK_EDIT = "stock.edit"
STOCK_TRANSFER = "stock.transfer"
# Orders
ORDERS_VIEW = "orders.view"
ORDERS_EDIT = "orders.edit"
ORDERS_CANCEL = "orders.cancel"
ORDERS_REFUND = "orders.refund"
# Customers
CUSTOMERS_VIEW = "customers.view"
CUSTOMERS_EDIT = "customers.edit"
CUSTOMERS_DELETE = "customers.delete"
CUSTOMERS_EXPORT = "customers.export"
# Marketing
MARKETING_VIEW = "marketing.view"
MARKETING_CREATE = "marketing.create"
MARKETING_SEND = "marketing.send"
# Reports
REPORTS_VIEW = "reports.view"
REPORTS_FINANCIAL = "reports.financial"
REPORTS_EXPORT = "reports.export"
# Settings
SETTINGS_VIEW = "settings.view"
SETTINGS_EDIT = "settings.edit"
SETTINGS_THEME = "settings.theme"
SETTINGS_DOMAINS = "settings.domains"
# Team Management
TEAM_VIEW = "team.view"
TEAM_INVITE = "team.invite"
TEAM_EDIT = "team.edit"
TEAM_REMOVE = "team.remove"
# Marketplace Imports
IMPORTS_VIEW = "imports.view"
IMPORTS_CREATE = "imports.create"
IMPORTS_CANCEL = "imports.cancel"
class PermissionGroups:
"""Pre-defined permission groups for common roles."""
# Full access (for owners)
OWNER: Set[str] = set(p.value for p in VendorPermissions)
# Manager - Can do most things except team management and critical settings
MANAGER: Set[str] = {
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.PRODUCTS_CREATE.value,
VendorPermissions.PRODUCTS_EDIT.value,
VendorPermissions.PRODUCTS_DELETE.value,
VendorPermissions.PRODUCTS_IMPORT.value,
VendorPermissions.PRODUCTS_EXPORT.value,
VendorPermissions.STOCK_VIEW.value,
VendorPermissions.STOCK_EDIT.value,
VendorPermissions.STOCK_TRANSFER.value,
VendorPermissions.ORDERS_VIEW.value,
VendorPermissions.ORDERS_EDIT.value,
VendorPermissions.ORDERS_CANCEL.value,
VendorPermissions.ORDERS_REFUND.value,
VendorPermissions.CUSTOMERS_VIEW.value,
VendorPermissions.CUSTOMERS_EDIT.value,
VendorPermissions.CUSTOMERS_EXPORT.value,
VendorPermissions.MARKETING_VIEW.value,
VendorPermissions.MARKETING_CREATE.value,
VendorPermissions.MARKETING_SEND.value,
VendorPermissions.REPORTS_VIEW.value,
VendorPermissions.REPORTS_FINANCIAL.value,
VendorPermissions.REPORTS_EXPORT.value,
VendorPermissions.SETTINGS_VIEW.value,
VendorPermissions.SETTINGS_THEME.value,
VendorPermissions.IMPORTS_VIEW.value,
VendorPermissions.IMPORTS_CREATE.value,
VendorPermissions.IMPORTS_CANCEL.value,
}
# Staff - Can view and edit products/orders but limited access
STAFF: Set[str] = {
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.PRODUCTS_CREATE.value,
VendorPermissions.PRODUCTS_EDIT.value,
VendorPermissions.STOCK_VIEW.value,
VendorPermissions.STOCK_EDIT.value,
VendorPermissions.ORDERS_VIEW.value,
VendorPermissions.ORDERS_EDIT.value,
VendorPermissions.CUSTOMERS_VIEW.value,
VendorPermissions.CUSTOMERS_EDIT.value,
}
# Support - Can view and assist with orders/customers
SUPPORT: Set[str] = {
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.ORDERS_VIEW.value,
VendorPermissions.ORDERS_EDIT.value,
VendorPermissions.CUSTOMERS_VIEW.value,
VendorPermissions.CUSTOMERS_EDIT.value,
}
# Viewer - Read-only access
VIEWER: Set[str] = {
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.PRODUCTS_VIEW.value,
VendorPermissions.STOCK_VIEW.value,
VendorPermissions.ORDERS_VIEW.value,
VendorPermissions.CUSTOMERS_VIEW.value,
VendorPermissions.REPORTS_VIEW.value,
}
# Marketing - Focused on marketing and customer communication
MARKETING: Set[str] = {
VendorPermissions.DASHBOARD_VIEW.value,
VendorPermissions.CUSTOMERS_VIEW.value,
VendorPermissions.CUSTOMERS_EXPORT.value,
VendorPermissions.MARKETING_VIEW.value,
VendorPermissions.MARKETING_CREATE.value,
VendorPermissions.MARKETING_SEND.value,
VendorPermissions.REPORTS_VIEW.value,
}
class PermissionChecker:
"""Utility class for permission checking."""
@staticmethod
def has_permission(permissions: List[str], required_permission: str) -> bool:
"""Check if a permission list contains a required permission."""
return required_permission in permissions
@staticmethod
def has_any_permission(permissions: List[str], required_permissions: List[str]) -> bool:
"""Check if a permission list contains ANY of the required permissions."""
return any(perm in permissions for perm in required_permissions)
@staticmethod
def has_all_permissions(permissions: List[str], required_permissions: List[str]) -> bool:
"""Check if a permission list contains ALL of the required permissions."""
return all(perm in permissions for perm in required_permissions)
@staticmethod
def get_missing_permissions(permissions: List[str], required_permissions: List[str]) -> List[str]:
"""Get list of missing permissions."""
return [perm for perm in required_permissions if perm not in permissions]
# Helper function to get permissions for a role preset
def get_preset_permissions(preset_name: str) -> Set[str]:
"""
Get permissions for a preset role.
Args:
preset_name: Name of the preset (manager, staff, support, viewer, marketing)
Returns:
Set of permission strings
"""
presets = {
"owner": PermissionGroups.OWNER,
"manager": PermissionGroups.MANAGER,
"staff": PermissionGroups.STAFF,
"support": PermissionGroups.SUPPORT,
"viewer": PermissionGroups.VIEWER,
"marketing": PermissionGroups.MARKETING,
}
return presets.get(preset_name.lower(), set())

View File

@@ -146,6 +146,7 @@ from .team import (
MaxTeamMembersReachedException,
TeamValidationException,
InvalidInvitationDataException,
InvalidInvitationTokenException,
)
# Product exceptions
@@ -215,6 +216,7 @@ __all__ = [
"MaxTeamMembersReachedException",
"TeamValidationException",
"InvalidInvitationDataException",
"InvalidInvitationTokenException",
# Inventory exceptions
"InventoryNotFoundException",
@@ -306,4 +308,4 @@ __all__ = [
"InvalidAdminActionException",
"BulkOperationException",
"ConfirmationRequiredException",
]
]

View File

@@ -147,10 +147,10 @@ class InvalidRoleException(ValidationException):
"""Raised when role data is invalid."""
def __init__(
self,
message: str = "Invalid role data",
field: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
self,
message: str = "Invalid role data",
field: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
super().__init__(
message=message,
@@ -164,10 +164,10 @@ class InsufficientPermissionsException(AuthorizationException):
"""Raised when user lacks required permissions for an action."""
def __init__(
self,
required_permission: str,
user_id: Optional[int] = None,
action: Optional[str] = None,
self,
required_permission: str,
user_id: Optional[int] = None,
action: Optional[str] = None,
):
details = {"required_permission": required_permission}
if user_id:
@@ -202,10 +202,10 @@ class TeamValidationException(ValidationException):
"""Raised when team operation validation fails."""
def __init__(
self,
message: str = "Team operation validation failed",
field: Optional[str] = None,
validation_errors: Optional[Dict[str, str]] = None,
self,
message: str = "Team operation validation failed",
field: Optional[str] = None,
validation_errors: Optional[Dict[str, str]] = None,
):
details = {}
if validation_errors:
@@ -223,10 +223,10 @@ class InvalidInvitationDataException(ValidationException):
"""Raised when team invitation data is invalid."""
def __init__(
self,
message: str = "Invalid invitation data",
field: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
self,
message: str = "Invalid invitation data",
field: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
super().__init__(
message=message,
@@ -234,3 +234,31 @@ class InvalidInvitationDataException(ValidationException):
details=details,
)
self.error_code = "INVALID_INVITATION_DATA"
# ============================================================================
# NEW: Add InvalidInvitationTokenException
# ============================================================================
class InvalidInvitationTokenException(ValidationException):
"""Raised when invitation token is invalid, expired, or already used.
This is a general exception for any invitation token validation failure.
Use this when checking invitation tokens during the acceptance flow.
"""
def __init__(
self,
message: str = "Invalid or expired invitation token",
invitation_token: Optional[str] = None
):
details = {}
if invitation_token:
details["invitation_token"] = invitation_token
super().__init__(
message=message,
field="invitation_token",
details=details,
)
self.error_code = "INVALID_INVITATION_TOKEN"

View File

@@ -0,0 +1,465 @@
# app/services/vendor_team_service.py
"""
Vendor team management service.
Handles:
- Team member invitations
- Invitation acceptance
- Role assignment
- Permission management
"""
import logging
import secrets
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session
from app.core.permissions import get_preset_permissions
from app.exceptions import (
TeamMemberAlreadyExistsException,
InvalidInvitationTokenException,
TeamInvitationAlreadyAcceptedException,
MaxTeamMembersReachedException,
UserNotFoundException,
VendorNotFoundException,
CannotRemoveOwnerException,
)
from models.database.user import User
from models.database.vendor import Vendor, VendorUser, VendorUserType, Role
from middleware.auth import AuthManager
logger = logging.getLogger(__name__)
class VendorTeamService:
"""Service for managing vendor team members."""
def __init__(self):
self.auth_manager = AuthManager()
self.max_team_members = 50 # Configure as needed
def invite_team_member(
self,
db: Session,
vendor: Vendor,
inviter: User,
email: str,
role_name: str,
custom_permissions: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Invite a new team member to a vendor.
Creates:
1. User account (if doesn't exist)
2. Role (if custom permissions provided)
3. VendorUser relationship with invitation token
Args:
db: Database session
vendor: Vendor to invite to
inviter: User sending the invitation
email: Email of person to invite
role_name: Role name (manager, staff, support, etc.)
custom_permissions: Optional custom permissions (overrides preset)
Returns:
Dict with invitation details
"""
try:
# Check team size limit
current_team_size = db.query(VendorUser).filter(
VendorUser.vendor_id == vendor.id,
VendorUser.is_active == True,
).count()
if current_team_size >= self.max_team_members:
raise MaxTeamMembersReachedException(
self.max_team_members,
vendor.vendor_code,
)
# Check if user already exists
user = db.query(User).filter(User.email == email).first()
if user:
# Check if already a member
existing_membership = db.query(VendorUser).filter(
VendorUser.vendor_id == vendor.id,
VendorUser.user_id == user.id,
).first()
if existing_membership:
if existing_membership.is_active:
raise TeamMemberAlreadyExistsException(email, vendor.vendor_code)
# Reactivate old membership
existing_membership.is_active = False # Will be activated on acceptance
existing_membership.invitation_token = self._generate_invitation_token()
existing_membership.invitation_sent_at = datetime.utcnow()
existing_membership.invitation_accepted_at = None
db.commit()
logger.info(f"Re-invited user {email} to vendor {vendor.vendor_code}")
return {
"invitation_token": existing_membership.invitation_token,
"email": email,
"existing_user": True,
}
else:
# Create new user account (inactive until invitation accepted)
username = email.split('@')[0]
# Ensure unique username
base_username = username
counter = 1
while db.query(User).filter(User.username == username).first():
username = f"{base_username}{counter}"
counter += 1
# Generate temporary password (user will set real one on activation)
temp_password = secrets.token_urlsafe(16)
user = User(
email=email,
username=username,
hashed_password=self.auth_manager.hash_password(temp_password),
role="vendor", # Platform role
is_active=False, # Will be activated when invitation accepted
is_email_verified=False,
)
db.add(user)
db.flush() # Get user.id
logger.info(f"Created new user account for invitation: {email}")
# Get or create role
role = self._get_or_create_role(
db=db,
vendor=vendor,
role_name=role_name,
custom_permissions=custom_permissions,
)
# Create vendor membership with invitation
invitation_token = self._generate_invitation_token()
vendor_user = VendorUser(
vendor_id=vendor.id,
user_id=user.id,
user_type=VendorUserType.TEAM_MEMBER.value,
role_id=role.id,
invited_by=inviter.id,
invitation_token=invitation_token,
invitation_sent_at=datetime.utcnow(),
is_active=False, # Will be activated on acceptance
)
db.add(vendor_user)
db.commit()
logger.info(
f"Invited {email} to vendor {vendor.vendor_code} "
f"as {role_name} by {inviter.username}"
)
# TODO: Send invitation email
# self._send_invitation_email(email, vendor, invitation_token)
return {
"invitation_token": invitation_token,
"email": email,
"role": role_name,
"existing_user": user.is_active,
}
except (TeamMemberAlreadyExistsException, MaxTeamMembersReachedException):
raise
except Exception as e:
db.rollback()
logger.error(f"Error inviting team member: {str(e)}")
raise
def accept_invitation(
self,
db: Session,
invitation_token: str,
password: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
) -> Dict[str, Any]:
"""
Accept a team invitation and activate account.
Args:
db: Database session
invitation_token: Invitation token from email
password: New password to set
first_name: Optional first name
last_name: Optional last name
Returns:
Dict with user and vendor info
"""
try:
# Find invitation
vendor_user = db.query(VendorUser).filter(
VendorUser.invitation_token == invitation_token,
).first()
if not vendor_user:
raise InvalidInvitationTokenException()
# Check if already accepted
if vendor_user.invitation_accepted_at is not None:
raise TeamInvitationAlreadyAcceptedException()
# Check token expiration (7 days)
if vendor_user.invitation_sent_at:
expiry_date = vendor_user.invitation_sent_at + timedelta(days=7)
if datetime.utcnow() > expiry_date:
raise InvalidInvitationTokenException("Invitation has expired")
user = vendor_user.user
vendor = vendor_user.vendor
# Update user
user.hashed_password = self.auth_manager.hash_password(password)
user.is_active = True
user.is_email_verified = True
if first_name:
user.first_name = first_name
if last_name:
user.last_name = last_name
# Activate membership
vendor_user.is_active = True
vendor_user.invitation_accepted_at = datetime.utcnow()
vendor_user.invitation_token = None # Clear token
db.commit()
logger.info(
f"User {user.email} accepted invitation to vendor {vendor.vendor_code}"
)
return {
"user": user,
"vendor": vendor,
"role": vendor_user.role.name if vendor_user.role else "member",
}
except (InvalidInvitationTokenException, TeamInvitationAlreadyAcceptedException):
raise
except Exception as e:
db.rollback()
logger.error(f"Error accepting invitation: {str(e)}")
raise
def remove_team_member(
self,
db: Session,
vendor: Vendor,
user_id: int,
) -> bool:
"""
Remove a team member from a vendor.
Cannot remove owner.
Args:
db: Database session
vendor: Vendor to remove from
user_id: User ID to remove
Returns:
True if removed
"""
try:
vendor_user = db.query(VendorUser).filter(
VendorUser.vendor_id == vendor.id,
VendorUser.user_id == user_id,
).first()
if not vendor_user:
raise UserNotFoundException(str(user_id))
# Cannot remove owner
if vendor_user.is_owner:
raise CannotRemoveOwnerException(vendor.vendor_code)
# Soft delete - just deactivate
vendor_user.is_active = False
db.commit()
logger.info(f"Removed user {user_id} from vendor {vendor.vendor_code}")
return True
except (UserNotFoundException, CannotRemoveOwnerException):
raise
except Exception as e:
db.rollback()
logger.error(f"Error removing team member: {str(e)}")
raise
def update_member_role(
self,
db: Session,
vendor: Vendor,
user_id: int,
new_role_name: str,
custom_permissions: Optional[List[str]] = None,
) -> VendorUser:
"""
Update a team member's role.
Args:
db: Database session
vendor: Vendor
user_id: User ID
new_role_name: New role name
custom_permissions: Optional custom permissions
Returns:
Updated VendorUser
"""
try:
vendor_user = db.query(VendorUser).filter(
VendorUser.vendor_id == vendor.id,
VendorUser.user_id == user_id,
).first()
if not vendor_user:
raise UserNotFoundException(str(user_id))
# Cannot change owner's role
if vendor_user.is_owner:
raise CannotRemoveOwnerException(vendor.vendor_code)
# Get or create new role
new_role = self._get_or_create_role(
db=db,
vendor=vendor,
role_name=new_role_name,
custom_permissions=custom_permissions,
)
vendor_user.role_id = new_role.id
db.commit()
logger.info(
f"Updated role for user {user_id} in vendor {vendor.vendor_code} "
f"to {new_role_name}"
)
return vendor_user
except (UserNotFoundException, CannotRemoveOwnerException):
raise
except Exception as e:
db.rollback()
logger.error(f"Error updating member role: {str(e)}")
raise
def get_team_members(
self,
db: Session,
vendor: Vendor,
include_inactive: bool = False,
) -> List[Dict[str, Any]]:
"""
Get all team members for a vendor.
Args:
db: Database session
vendor: Vendor
include_inactive: Include inactive members
Returns:
List of team member info
"""
query = db.query(VendorUser).filter(
VendorUser.vendor_id == vendor.id,
)
if not include_inactive:
query = query.filter(VendorUser.is_active == True)
vendor_users = query.all()
members = []
for vu in vendor_users:
members.append({
"id": vu.user.id,
"email": vu.user.email,
"username": vu.user.username,
"full_name": vu.user.full_name,
"user_type": vu.user_type,
"role": vu.role.name if vu.role else "owner",
"permissions": vu.get_all_permissions(),
"is_active": vu.is_active,
"is_owner": vu.is_owner,
"invitation_pending": vu.is_invitation_pending,
"invited_at": vu.invitation_sent_at,
"accepted_at": vu.invitation_accepted_at,
})
return members
# Private helper methods
def _generate_invitation_token(self) -> str:
"""Generate a secure invitation token."""
return secrets.token_urlsafe(32)
def _get_or_create_role(
self,
db: Session,
vendor: Vendor,
role_name: str,
custom_permissions: Optional[List[str]] = None,
) -> Role:
"""Get existing role or create new one with preset/custom permissions."""
# Try to find existing role with same name
role = db.query(Role).filter(
Role.vendor_id == vendor.id,
Role.name == role_name,
).first()
if role and custom_permissions is None:
# Use existing role
return role
# Determine permissions
if custom_permissions:
permissions = custom_permissions
else:
# Get preset permissions
permissions = list(get_preset_permissions(role_name))
if role:
# Update existing role with new permissions
role.permissions = permissions
else:
# Create new role
role = Role(
vendor_id=vendor.id,
name=role_name,
permissions=permissions,
)
db.add(role)
db.flush()
return role
def _send_invitation_email(self, email: str, vendor: Vendor, token: str):
"""Send invitation email (TODO: implement)."""
# TODO: Implement email sending
# Should include:
# - Link to accept invitation: /vendor/invitation/accept?token={token}
# - Vendor name
# - Inviter name
# - Expiry date
pass
# Create service instance
vendor_team_service = VendorTeamService()

View File

@@ -1,39 +1,56 @@
# models/database/user.py
# models/database/user.py - IMPROVED VERSION
"""
User model with authentication support.
This module defines the User model which includes fields for user details,
authentication information, and relationships to other models such as Vendor and Customer.
"""
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy.orm import relationship
ROLE CLARIFICATION:
- User.role should ONLY contain platform-level roles:
* "admin" - Platform administrator (full system access)
* "vendor" - Any user who owns or is part of a vendor team
- Vendor-specific roles (manager, staff, etc.) are stored in VendorUser.role
- Customers are NOT in the User table - they use the Customer model
"""
from sqlalchemy import Boolean, Column, DateTime, Integer, String, Enum
from sqlalchemy.orm import relationship
import enum
# Import Base from the central database module instead of creating a new one
from app.core.database import Base
from models.database.base import TimestampMixin
class UserRole(str, enum.Enum):
"""Platform-level user roles."""
ADMIN = "admin" # Platform administrator
VENDOR = "vendor" # Vendor owner or team member
class User(Base, TimestampMixin):
"""Represents a user in the system."""
"""Represents a platform user (admins and vendors only)."""
__tablename__ = "users" # Name of the table in the database
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True) # Primary key and indexed column for user ID
email = Column(String, unique=True, index=True, nullable=False) # Unique, indexed, non-nullable email column
username = Column(String, unique=True, index=True, nullable=False) # Unique, indexed, non-nullable username column
first_name = Column(String) # Optional first name column
last_name = Column(String) # Optional last name column
hashed_password = Column(String, nullable=False) # Non-nullable hashed password column
role = Column(String, nullable=False, default="user") # Role of the user (default is 'user') //TODO: Change to customer, vendor, admin
is_active = Column(Boolean, default=True, nullable=False) # Active status of the user (default is True)
last_login = Column(DateTime, nullable=True) # Optional last login timestamp column
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
first_name = Column(String)
last_name = Column(String)
hashed_password = Column(String, nullable=False)
# Platform-level role only (admin or vendor)
role = Column(String, nullable=False, default=UserRole.VENDOR.value)
is_active = Column(Boolean, default=True, nullable=False)
is_email_verified = Column(Boolean, default=False, nullable=False)
last_login = Column(DateTime, nullable=True)
# Relationships
marketplace_import_jobs = relationship("MarketplaceImportJob",
back_populates="user") # Relationship with import jobs
owned_vendors = relationship("Vendor", back_populates="owner") # Relationship with vendors owned by this user
vendor_memberships = relationship("VendorUser", foreign_keys="[VendorUser.user_id]",
back_populates="user") # Relationship with vendor memberships
marketplace_import_jobs = relationship("MarketplaceImportJob", back_populates="user")
owned_vendors = relationship("Vendor", back_populates="owner")
vendor_memberships = relationship(
"VendorUser",
foreign_keys="[VendorUser.user_id]",
back_populates="user"
)
def __repr__(self):
"""String representation of the User object."""
@@ -45,3 +62,55 @@ class User(Base, TimestampMixin):
if self.first_name and self.last_name:
return f"{self.first_name} {self.last_name}"
return self.username
@property
def is_admin(self) -> bool:
"""Check if user is a platform admin."""
return self.role == UserRole.ADMIN.value
@property
def is_vendor(self) -> bool:
"""Check if user is a vendor (owner or team member)."""
return self.role == UserRole.VENDOR.value
def is_owner_of(self, vendor_id: int) -> bool:
"""Check if user is the owner of a specific vendor."""
return any(v.id == vendor_id for v in self.owned_vendors)
def is_member_of(self, vendor_id: int) -> bool:
"""Check if user is a member of a specific vendor (owner or team)."""
# Check if owner
if self.is_owner_of(vendor_id):
return True
# Check if team member
return any(
vm.vendor_id == vendor_id and vm.is_active
for vm in self.vendor_memberships
)
def get_vendor_role(self, vendor_id: int) -> str:
"""Get user's role within a specific vendor."""
# Check if owner
if self.is_owner_of(vendor_id):
return "owner"
# Check team membership
for vm in self.vendor_memberships:
if vm.vendor_id == vendor_id and vm.is_active:
return vm.role.name if vm.role else "member"
return None
def has_vendor_permission(self, vendor_id: int, permission: str) -> bool:
"""Check if user has a specific permission in a vendor."""
# Owners have all permissions
if self.is_owner_of(vendor_id):
return True
# Check team member permissions
for vm in self.vendor_memberships:
if vm.vendor_id == vendor_id and vm.is_active:
if vm.role and permission in vm.role.permissions:
return True
return False

View File

@@ -5,14 +5,14 @@ Vendor model representing entities that sell products or services.
This module defines the Vendor model along with its relationships to
other models such as User (owner), Product, Customer, Order, and MarketplaceImportJob.
"""
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text, JSON
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text, JSON, DateTime
from sqlalchemy.orm import relationship
# Import Base from the central database module instead of creating a new one
from app.core.database import Base
from models.database.base import TimestampMixin
from app.core.config import settings
import enum
class Vendor(Base, TimestampMixin):
"""Represents a vendor in the system."""
@@ -177,10 +177,21 @@ class Vendor(Base, TimestampMixin):
return domains
class VendorUser(Base, TimestampMixin):
"""Represents a user's role within a specific vendor."""
class VendorUserType(str, enum.Enum):
"""Types of vendor users."""
OWNER = "owner" # Vendor owner (full access to vendor area)
TEAM_MEMBER = "member" # Team member (role-based access to vendor area)
__tablename__ = "vendor_users" # Name of the table in the database
class VendorUser(Base, TimestampMixin):
"""
Represents a user's membership in a vendor.
- Owner: Created automatically when vendor is created
- Team Member: Invited by owner via email
"""
__tablename__ = "vendor_users"
id = Column(Integer, primary_key=True, index=True)
"""Unique identifier for each VendorUser entry."""
@@ -191,15 +202,23 @@ class VendorUser(Base, TimestampMixin):
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
"""Foreign key linking to the associated User."""
role_id = Column(Integer, ForeignKey("roles.id"), nullable=False)
# Distinguish between owner and team member
user_type = Column(String, nullable=False, default=VendorUserType.TEAM_MEMBER.value)
# Role for team members (NULL for owners - they have all permissions)
role_id = Column(Integer, ForeignKey("roles.id"), nullable=True)
"""Foreign key linking to the associated Role."""
invited_by = Column(Integer, ForeignKey("users.id"))
"""Foreign key linking to the user who invited this VendorUser."""
invitation_token = Column(String, nullable=True, index=True) # For email activation
invitation_sent_at = Column(DateTime, nullable=True)
invitation_accepted_at = Column(DateTime, nullable=True)
is_active = Column(Boolean, default=True, nullable=False)
is_active = Column(Boolean, default=False, nullable=False) # False until invitation accepted
"""Indicates whether the VendorUser role is active."""
# Relationships
vendor = relationship("Vendor", back_populates="vendor_users")
"""Relationship to the Vendor model, representing the associated vendor."""
@@ -216,10 +235,57 @@ class VendorUser(Base, TimestampMixin):
"""Return a string representation of the VendorUser instance.
Returns:
str: A string that includes the vendor_id and user_id of the VendorUser instance.
str: A string that includes the vendor_id, the user_id and the user_type of the VendorUser instance.
"""
return f"<VendorUser(vendor_id={self.vendor_id}, user_id={self.user_id})>"
return f"<VendorUser(vendor_id={self.vendor_id}, user_id={self.user_id}, type={self.user_type})>"
@property
def is_owner(self) -> bool:
"""Check if this is an owner membership."""
return self.user_type == VendorUserType.OWNER.value
@property
def is_team_member(self) -> bool:
"""Check if this is a team member (not owner)."""
return self.user_type == VendorUserType.TEAM_MEMBER.value
@property
def is_invitation_pending(self) -> bool:
"""Check if invitation is still pending."""
return self.invitation_token is not None and self.invitation_accepted_at is None
def has_permission(self, permission: str) -> bool:
"""
Check if user has a specific permission.
Owners always have all permissions.
Team members check their role's permissions.
"""
# Owners have all permissions
if self.is_owner:
return True
# Inactive users have no permissions
if not self.is_active:
return False
# Check role permissions
if self.role and self.role.permissions:
return permission in self.role.permissions
return False
def get_all_permissions(self) -> list:
"""Get all permissions this user has."""
if self.is_owner:
# Return all possible permissions
from app.core.permissions import VendorPermissions
return list(VendorPermissions.__members__.values())
if self.role and self.role.permissions:
return self.role.permissions
return []
class Role(Base, TimestampMixin):
"""Represents a role within a vendor's system."""

View File

@@ -1 +1,260 @@
# Team management models
# models/schema/team.py
"""
Pydantic schemas for vendor team management.
This module defines request/response schemas for:
- Team member listing
- Team member invitation
- Team member updates
- Role management
"""
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field, validator
# ============================================================================
# Role Schemas
# ============================================================================
class RoleBase(BaseModel):
"""Base role schema."""
name: str = Field(..., min_length=1, max_length=100, description="Role name")
permissions: List[str] = Field(default_factory=list, description="List of permission strings")
class RoleCreate(RoleBase):
"""Schema for creating a role."""
pass
class RoleUpdate(BaseModel):
"""Schema for updating a role."""
name: Optional[str] = Field(None, min_length=1, max_length=100)
permissions: Optional[List[str]] = None
class RoleResponse(RoleBase):
"""Schema for role response."""
id: int
vendor_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True # Pydantic v2 (use orm_mode = True for v1)
class RoleListResponse(BaseModel):
"""Schema for role list response."""
roles: List[RoleResponse]
total: int
# ============================================================================
# Team Member Schemas
# ============================================================================
class TeamMemberBase(BaseModel):
"""Base team member schema."""
email: EmailStr = Field(..., description="Team member email address")
first_name: Optional[str] = Field(None, max_length=100)
last_name: Optional[str] = Field(None, max_length=100)
class TeamMemberInvite(TeamMemberBase):
"""Schema for inviting a team member."""
role_id: Optional[int] = Field(None, description="Role ID to assign (for preset roles)")
role_name: Optional[str] = Field(None, description="Role name (manager, staff, support, etc.)")
custom_permissions: Optional[List[str]] = Field(
None,
description="Custom permissions (overrides role preset)"
)
@validator('role_name')
def validate_role_name(cls, v):
"""Validate role name is in allowed presets."""
if v is not None:
allowed_roles = ['manager', 'staff', 'support', 'viewer', 'marketing']
if v.lower() not in allowed_roles:
raise ValueError(
f"Role name must be one of: {', '.join(allowed_roles)}"
)
return v.lower() if v else v
@validator('custom_permissions')
def validate_custom_permissions(cls, v, values):
"""Ensure either role_id/role_name OR custom_permissions is provided."""
if v is not None and len(v) > 0:
# If custom permissions provided, role_name should be provided too
if 'role_name' not in values or not values['role_name']:
raise ValueError(
"role_name is required when providing custom_permissions"
)
return v
class TeamMemberUpdate(BaseModel):
"""Schema for updating a team member."""
role_id: Optional[int] = Field(None, description="New role ID")
is_active: Optional[bool] = Field(None, description="Active status")
class TeamMemberResponse(BaseModel):
"""Schema for team member response."""
id: int = Field(..., description="User ID")
email: EmailStr
username: str
first_name: Optional[str]
last_name: Optional[str]
full_name: str
user_type: str = Field(..., description="'owner' or 'member'")
role_name: str = Field(..., description="Role name")
role_id: Optional[int]
permissions: List[str] = Field(default_factory=list, description="User's permissions")
is_active: bool
is_owner: bool
invitation_pending: bool = Field(
default=False,
description="True if invitation not yet accepted"
)
invited_at: Optional[datetime] = Field(None, description="When invitation was sent")
accepted_at: Optional[datetime] = Field(None, description="When invitation was accepted")
joined_at: datetime = Field(..., description="When user joined vendor")
class Config:
from_attributes = True
class TeamMemberListResponse(BaseModel):
"""Schema for team member list response."""
members: List[TeamMemberResponse]
total: int
active_count: int
pending_invitations: int
# ============================================================================
# Invitation Schemas
# ============================================================================
class InvitationAccept(BaseModel):
"""Schema for accepting a team invitation."""
invitation_token: str = Field(..., min_length=32, description="Invitation token from email")
password: str = Field(
...,
min_length=8,
max_length=128,
description="Password for new account"
)
first_name: str = Field(..., min_length=1, max_length=100)
last_name: str = Field(..., min_length=1, max_length=100)
@validator('password')
def validate_password_strength(cls, v):
"""Validate password meets minimum requirements."""
if len(v) < 8:
raise ValueError("Password must be at least 8 characters long")
has_upper = any(c.isupper() for c in v)
has_lower = any(c.islower() for c in v)
has_digit = any(c.isdigit() for c in v)
if not (has_upper and has_lower and has_digit):
raise ValueError(
"Password must contain at least one uppercase letter, "
"one lowercase letter, and one digit"
)
return v
class InvitationResponse(BaseModel):
"""Schema for invitation response."""
message: str
email: EmailStr
role: str
invitation_token: Optional[str] = Field(
None,
description="Token (only returned in dev/test environments)"
)
invitation_sent: bool = Field(default=True)
class InvitationAcceptResponse(BaseModel):
"""Schema for invitation acceptance response."""
message: str
vendor: dict = Field(..., description="Vendor information")
user: dict = Field(..., description="User information")
role: str
# ============================================================================
# Team Statistics Schema
# ============================================================================
class TeamStatistics(BaseModel):
"""Schema for team statistics."""
total_members: int
active_members: int
inactive_members: int
pending_invitations: int
owners: int
team_members: int
roles_breakdown: dict = Field(
default_factory=dict,
description="Count of members per role"
)
# ============================================================================
# Bulk Operations Schemas
# ============================================================================
class BulkRemoveRequest(BaseModel):
"""Schema for bulk removing team members."""
user_ids: List[int] = Field(..., min_items=1, description="List of user IDs to remove")
class BulkRemoveResponse(BaseModel):
"""Schema for bulk remove response."""
success_count: int
failed_count: int
errors: List[dict] = Field(default_factory=list)
# ============================================================================
# Permission Check Schemas
# ============================================================================
class PermissionCheckRequest(BaseModel):
"""Schema for checking permissions."""
permissions: List[str] = Field(..., min_items=1, description="Permissions to check")
class PermissionCheckResponse(BaseModel):
"""Schema for permission check response."""
has_all: bool = Field(..., description="True if user has all permissions")
has_any: bool = Field(..., description="True if user has any permission")
granted: List[str] = Field(default_factory=list, description="Permissions user has")
denied: List[str] = Field(default_factory=list, description="Permissions user lacks")
class UserPermissionsResponse(BaseModel):
"""Schema for user's permissions response."""
permissions: List[str] = Field(default_factory=list)
permission_count: int
is_owner: bool
role_name: Optional[str] = None
# ============================================================================
# Error Response Schema
# ============================================================================
class TeamErrorResponse(BaseModel):
"""Schema for team operation errors."""
error_code: str
message: str
details: Optional[dict] = None