From 41439eed09b20758ed87f495f1eb1e9ab4a27f9f Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Fri, 14 Nov 2025 21:08:57 +0100 Subject: [PATCH] Vendor team member management features --- Makefile | 6 +- app/api/deps.py | 176 ++++++++++ app/api/v1/vendor/teams.py | 497 ++++++++++++++++++++++++++-- app/core/permissions.py | 203 ++++++++++++ app/exceptions/__init__.py | 4 +- app/exceptions/team.py | 60 +++- app/services/vendor_team_service.py | 465 ++++++++++++++++++++++++++ models/database/user.py | 115 +++++-- models/database/vendor.py | 84 ++++- models/schema/team.py | 261 ++++++++++++++- 10 files changed, 1786 insertions(+), 85 deletions(-) create mode 100644 app/core/permissions.py create mode 100644 app/services/vendor_team_service.py diff --git a/Makefile b/Makefile index 21ab040f..ddb9de71 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,11 @@ dev-full: dev-with-docs # ============================================================================= migrate-create: - @if "$(message)"=="" (echo Error: Please provide a message. Usage: make migrate-create message="your_description") else ($(PYTHON) -m alembic revision --autogenerate -m "$(message)") + @if [ "$(message)" = "" ]; then \ + echo "Error: Please provide a message. Usage: make migrate-create message=\"your_description\""; \ + else \ + $(PYTHON) -m alembic revision --autogenerate -m "$(message)"; \ + fi migrate-create-manual: @if "$(message)"=="" (echo Error: Please provide a message. Usage: make migrate-create-manual message="your_description") else ($(PYTHON) -m alembic revision -m "$(message)") diff --git a/app/api/deps.py b/app/api/deps.py index 6b9f40ba..8a254531 100644 --- a/app/api/deps.py +++ b/app/api/deps.py @@ -498,3 +498,179 @@ def get_user_vendor( # User doesn't have access to this vendor raise UnauthorizedVendorAccessException(vendor_code, current_user.id) + +# ============================================================================ +# PERMISSIONS CHECKING +# ============================================================================ + +def require_vendor_permission(permission: str): + """ + Dependency factory to require a specific vendor permission. + + Usage: + @router.get("/products") + def list_products( + vendor: Vendor = Depends(get_vendor_from_code), + user: User = Depends(require_vendor_permission(VendorPermissions.PRODUCTS_VIEW.value)) + ): + ... + """ + + def permission_checker( + request: Request, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_vendor_from_cookie_or_header), + ) -> User: + # Get vendor from request state (set by middleware) + vendor = getattr(request.state, "vendor", None) + if not vendor: + raise VendorAccessDeniedException("No vendor context") + + # Check if user has permission + if not current_user.has_vendor_permission(vendor.id, permission): + raise InsufficientVendorPermissionsException( + required_permission=permission, + vendor_code=vendor.vendor_code, + ) + + return current_user + + return permission_checker + + +def require_vendor_owner( + request: Request, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_vendor_from_cookie_or_header), +) -> User: + """ + Dependency to require vendor owner role. + + Usage: + @router.delete("/team/{user_id}") + def remove_team_member( + user: User = Depends(require_vendor_owner) + ): + ... + """ + vendor = getattr(request.state, "vendor", None) + if not vendor: + raise VendorAccessDeniedException("No vendor context") + + if not current_user.is_owner_of(vendor.id): + raise VendorOwnerOnlyException( + operation="team management", + vendor_code=vendor.vendor_code, + ) + + return current_user + + +def require_any_vendor_permission(*permissions: str): + """ + Dependency factory to require ANY of the specified permissions. + + Usage: + @router.get("/dashboard") + def dashboard( + user: User = Depends(require_any_vendor_permission( + VendorPermissions.DASHBOARD_VIEW.value, + VendorPermissions.REPORTS_VIEW.value + )) + ): + ... + """ + + def permission_checker( + request: Request, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_vendor_from_cookie_or_header), + ) -> User: + vendor = getattr(request.state, "vendor", None) + if not vendor: + raise VendorAccessDeniedException("No vendor context") + + # Check if user has ANY of the required permissions + has_permission = any( + current_user.has_vendor_permission(vendor.id, perm) + for perm in permissions + ) + + if not has_permission: + raise InsufficientVendorPermissionsException( + required_permission=f"Any of: {', '.join(permissions)}", + vendor_code=vendor.vendor_code, + ) + + return current_user + + return permission_checker + + +def require_all_vendor_permissions(*permissions: str): + """ + Dependency factory to require ALL of the specified permissions. + + Usage: + @router.post("/products/bulk-delete") + def bulk_delete_products( + user: User = Depends(require_all_vendor_permissions( + VendorPermissions.PRODUCTS_VIEW.value, + VendorPermissions.PRODUCTS_DELETE.value + )) + ): + ... + """ + + def permission_checker( + request: Request, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_vendor_from_cookie_or_header), + ) -> User: + vendor = getattr(request.state, "vendor", None) + if not vendor: + raise VendorAccessDeniedException("No vendor context") + + # Check if user has ALL required permissions + missing_permissions = [ + perm for perm in permissions + if not current_user.has_vendor_permission(vendor.id, perm) + ] + + if missing_permissions: + raise InsufficientVendorPermissionsException( + required_permission=f"All of: {', '.join(permissions)}", + vendor_code=vendor.vendor_code, + ) + + return current_user + + return permission_checker + + +def get_user_permissions( + request: Request, + current_user: User = Depends(get_current_vendor_from_cookie_or_header), +) -> list: + """ + Get all permissions for current user in current vendor. + + Returns empty list if no vendor context. + """ + vendor = getattr(request.state, "vendor", None) + if not vendor: + return [] + + # If owner, return all permissions + if current_user.is_owner_of(vendor.id): + from app.core.permissions import VendorPermissions + return [p.value for p in VendorPermissions] + + # Get permissions from vendor membership + for vm in current_user.vendor_memberships: + if vm.vendor_id == vendor.id and vm.is_active: + return vm.get_all_permissions() + + return [] + + diff --git a/app/api/v1/vendor/teams.py b/app/api/v1/vendor/teams.py index 819999e2..16d582c9 100644 --- a/app/api/v1/vendor/teams.py +++ b/app/api/v1/vendor/teams.py @@ -1,73 +1,502 @@ # app/api/v1/vendor/teams.py """ Vendor team member management endpoints. + +Implements complete team management with: +- Team member listing +- Invitation system +- Role management +- Permission checking +- RBAC integration """ import logging -from fastapi import APIRouter, Depends +from typing import List +from fastapi import APIRouter, Depends, Request from sqlalchemy.orm import Session -from app.api.deps import get_current_user from app.core.database import get_db -from middleware.vendor_context import require_vendor_context -from app.services.team_service import team_service +from app.core.permissions import VendorPermissions +from app.api.deps import ( + get_current_vendor_from_cookie_or_header, + require_vendor_owner, + require_vendor_permission, + get_user_permissions +) +from app.services.vendor_team_service import vendor_team_service from models.database.user import User from models.database.vendor import Vendor +from models.schema.team import ( + TeamMemberInvite, + TeamMemberUpdate, + TeamMemberResponse, + TeamMemberListResponse, + InvitationAccept, + InvitationResponse, + InvitationAcceptResponse, + RoleResponse, + RoleListResponse, + UserPermissionsResponse, + TeamStatistics, + BulkRemoveRequest, + BulkRemoveResponse, +) -router = APIRouter(prefix="/teams") +router = APIRouter(prefix="/team") logger = logging.getLogger(__name__) -@router.get("/members") -def get_team_members( - vendor: Vendor = Depends(require_vendor_context()), - current_user: User = Depends(get_current_user), +# ============================================================================ +# Team Member Routes +# ============================================================================ + +@router.get("/members", response_model=TeamMemberListResponse) +def list_team_members( + request: Request, + include_inactive: bool = False, db: Session = Depends(get_db), + current_user: User = Depends(require_vendor_permission( + VendorPermissions.TEAM_VIEW.value + )) ): - """Get all team members for vendor.""" - return team_service.get_team_members(db, vendor.id, current_user) + """ + Get all team members for current vendor. + + **Required Permission:** `team.view` + + **Query Parameters:** + - `include_inactive`: Include inactive team members (default: False) + + **Returns:** + - List of team members with their roles and permissions + - Statistics (total, active, pending) + """ + vendor = request.state.vendor + + members = vendor_team_service.get_team_members( + db=db, + vendor=vendor, + include_inactive=include_inactive + ) + + # Calculate statistics + total = len(members) + active = sum(1 for m in members if m["is_active"]) + pending = sum(1 for m in members if m["invitation_pending"]) + + logger.info( + f"Listed {total} team members for vendor {vendor.vendor_code} " + f"(active: {active}, pending: {pending})" + ) + + return TeamMemberListResponse( + members=members, + total=total, + active_count=active, + pending_invitations=pending + ) -@router.post("/invite") +@router.post("/invite", response_model=InvitationResponse) def invite_team_member( - invitation_data: dict, - vendor: Vendor = Depends(require_vendor_context()), - current_user: User = Depends(get_current_user), + invitation: TeamMemberInvite, + request: Request, db: Session = Depends(get_db), + current_user: User = Depends(require_vendor_owner) # Owner only ): - """Invite a new team member.""" - return team_service.invite_team_member(db, vendor.id, invitation_data, current_user) + """ + Invite a new team member to the vendor. + + **Required:** Vendor owner role + + **Process:** + 1. Create user account (if doesn't exist) + 2. Create VendorUser with invitation token + 3. Send invitation email + + **Request Body:** + - `email`: Email address of invitee + - `first_name`, `last_name`: Optional names + - `role_name`: Preset role (manager, staff, support, viewer, marketing) + - `role_id`: Use existing role (alternative to role_name) + - `custom_permissions`: Override role permissions (requires role_name) + + **Returns:** + - Invitation details + - Confirmation of email sent + """ + vendor = request.state.vendor + + # Determine role approach + if invitation.role_id: + # Use existing role by ID + result = vendor_team_service.invite_team_member( + db=db, + vendor=vendor, + inviter=current_user, + email=invitation.email, + role_id=invitation.role_id + ) + elif invitation.role_name: + # Use role name with optional custom permissions + result = vendor_team_service.invite_team_member( + db=db, + vendor=vendor, + inviter=current_user, + email=invitation.email, + role_name=invitation.role_name, + custom_permissions=invitation.custom_permissions + ) + else: + # Default to Staff role + result = vendor_team_service.invite_team_member( + db=db, + vendor=vendor, + inviter=current_user, + email=invitation.email, + role_name="staff" + ) + + logger.info( + f"Invitation sent: {invitation.email} to {vendor.vendor_code} " + f"by {current_user.username}" + ) + + return InvitationResponse( + message="Invitation sent successfully", + email=result["email"], + role=result["role"], + invitation_sent=True + ) -@router.put("/members/{user_id}") +@router.post("/accept-invitation", response_model=InvitationAcceptResponse) +def accept_invitation( + acceptance: InvitationAccept, + db: Session = Depends(get_db) +): + """ + Accept a team invitation and activate account. + + **No authentication required** - uses invitation token. + + **Request Body:** + - `invitation_token`: Token from invitation email + - `password`: New password (min 8 chars, must have upper, lower, digit) + - `first_name`, `last_name`: User's name + + **Returns:** + - Confirmation message + - Vendor information + - User information + - Assigned role + """ + result = vendor_team_service.accept_invitation( + db=db, + invitation_token=acceptance.invitation_token, + password=acceptance.password, + first_name=acceptance.first_name, + last_name=acceptance.last_name + ) + + logger.info( + f"Invitation accepted: {result['user'].email} " + f"for vendor {result['vendor'].vendor_code}" + ) + + return InvitationAcceptResponse( + message="Invitation accepted successfully. You can now login.", + vendor={ + "id": result["vendor"].id, + "vendor_code": result["vendor"].vendor_code, + "name": result["vendor"].name, + "subdomain": result["vendor"].subdomain + }, + user={ + "id": result["user"].id, + "email": result["user"].email, + "username": result["user"].username, + "full_name": result["user"].full_name + }, + role=result["role"] + ) + + +@router.get("/members/{user_id}", response_model=TeamMemberResponse) +def get_team_member( + user_id: int, + request: Request, + db: Session = Depends(get_db), + current_user: User = Depends(require_vendor_permission( + VendorPermissions.TEAM_VIEW.value + )) +): + """ + Get details of a specific team member. + + **Required Permission:** `team.view` + """ + vendor = request.state.vendor + + members = vendor_team_service.get_team_members( + db=db, + vendor=vendor, + include_inactive=True + ) + + member = next((m for m in members if m["id"] == user_id), None) + if not member: + from app.exceptions import UserNotFoundException + raise UserNotFoundException(str(user_id)) + + return TeamMemberResponse(**member) + + +@router.put("/members/{user_id}", response_model=TeamMemberResponse) def update_team_member( user_id: int, - update_data: dict, - vendor: Vendor = Depends(require_vendor_context()), - current_user: User = Depends(get_current_user), + update_data: TeamMemberUpdate, + request: Request, db: Session = Depends(get_db), + current_user: User = Depends(require_vendor_owner) # Owner only ): - """Update team member role or status.""" - return team_service.update_team_member(db, vendor.id, user_id, update_data, current_user) + """ + Update a team member's role or status. + + **Required:** Vendor owner role + + **Cannot:** + - Change owner's role + - Remove owner + + **Request Body:** + - `role_id`: New role ID (optional) + - `is_active`: Active status (optional) + """ + vendor = request.state.vendor + + vendor_user = vendor_team_service.update_member_role( + db=db, + vendor=vendor, + user_id=user_id, + new_role_id=update_data.role_id, + is_active=update_data.is_active + ) + + logger.info( + f"Team member updated: {user_id} in {vendor.vendor_code} " + f"by {current_user.username}" + ) + + # Return updated member details + members = vendor_team_service.get_team_members(db, vendor, include_inactive=True) + member = next((m for m in members if m["id"] == user_id), None) + + return TeamMemberResponse(**member) @router.delete("/members/{user_id}") def remove_team_member( user_id: int, - vendor: Vendor = Depends(require_vendor_context()), - current_user: User = Depends(get_current_user), + request: Request, db: Session = Depends(get_db), + current_user: User = Depends(require_vendor_owner) # Owner only ): - """Remove team member from vendor.""" - team_service.remove_team_member(db, vendor.id, user_id, current_user) - return {"message": "Team member removed successfully"} + """ + Remove a team member from the vendor. + + **Required:** Vendor owner role + + **Cannot remove:** + - Vendor owner + + **Action:** + - Soft delete (sets is_active = False) + - Member can be re-invited later + """ + vendor = request.state.vendor + + vendor_team_service.remove_team_member( + db=db, + vendor=vendor, + user_id=user_id + ) + + logger.info( + f"Team member removed: {user_id} from {vendor.vendor_code} " + f"by {current_user.username}" + ) + + return { + "message": "Team member removed successfully", + "user_id": user_id + } -@router.get("/roles") -def get_team_roles( - vendor: Vendor = Depends(require_vendor_context()), - current_user: User = Depends(get_current_user), +@router.post("/members/bulk-remove", response_model=BulkRemoveResponse) +def bulk_remove_team_members( + bulk_remove: BulkRemoveRequest, + request: Request, db: Session = Depends(get_db), + current_user: User = Depends(require_vendor_owner) ): - """Get available roles for vendor team.""" - return team_service.get_vendor_roles(db, vendor.id) + """ + Remove multiple team members at once. + + **Required:** Vendor owner role + """ + vendor = request.state.vendor + + success_count = 0 + failed_count = 0 + errors = [] + + for user_id in bulk_remove.user_ids: + try: + vendor_team_service.remove_team_member( + db=db, + vendor=vendor, + user_id=user_id + ) + success_count += 1 + except Exception as e: + failed_count += 1 + errors.append({ + "user_id": user_id, + "error": str(e) + }) + + logger.info( + f"Bulk remove completed: {success_count} removed, {failed_count} failed " + f"in {vendor.vendor_code}" + ) + + return BulkRemoveResponse( + success_count=success_count, + failed_count=failed_count, + errors=errors + ) + + +# ============================================================================ +# Role Management Routes +# ============================================================================ + +@router.get("/roles", response_model=RoleListResponse) +def list_roles( + request: Request, + db: Session = Depends(get_db), + current_user: User = Depends(require_vendor_permission( + VendorPermissions.TEAM_VIEW.value + )) +): + """ + Get all available roles for the vendor. + + **Required Permission:** `team.view` + + **Returns:** + - List of roles with permissions + - Includes both preset and custom roles + """ + vendor = request.state.vendor + + roles = vendor_team_service.get_vendor_roles(db=db, vendor_id=vendor.id) + + return RoleListResponse( + roles=roles, + total=len(roles) + ) + + +# ============================================================================ +# Permission Routes +# ============================================================================ + +@router.get("/me/permissions", response_model=UserPermissionsResponse) +def get_my_permissions( + request: Request, + permissions: List[str] = Depends(get_user_permissions), + current_user: User = Depends(get_current_vendor_from_cookie_or_header) +): + """ + Get current user's permissions in this vendor. + + **Use this endpoint to:** + - Determine what UI elements to show/hide + - Check permissions before making API calls + - Display user's role and capabilities + + **Returns:** + - Complete list of permissions + - Whether user is owner + - Role name (if team member) + """ + vendor = request.state.vendor + + is_owner = current_user.is_owner_of(vendor.id) + role_name = current_user.get_vendor_role(vendor.id) + + return UserPermissionsResponse( + permissions=permissions, + permission_count=len(permissions), + is_owner=is_owner, + role_name=role_name + ) + + +# ============================================================================ +# Statistics Routes +# ============================================================================ + +@router.get("/statistics", response_model=TeamStatistics) +def get_team_statistics( + request: Request, + db: Session = Depends(get_db), + current_user: User = Depends(require_vendor_permission( + VendorPermissions.TEAM_VIEW.value + )) +): + """ + Get team statistics for the vendor. + + **Required Permission:** `team.view` + + **Returns:** + - Total members + - Active/inactive breakdown + - Pending invitations + - Owner count + - Role distribution + """ + vendor = request.state.vendor + + members = vendor_team_service.get_team_members( + db=db, + vendor=vendor, + include_inactive=True + ) + + # Calculate statistics + total = len(members) + active = sum(1 for m in members if m["is_active"]) + inactive = total - active + pending = sum(1 for m in members if m["invitation_pending"]) + owners = sum(1 for m in members if m["is_owner"]) + team_members = total - owners + + # Role breakdown + roles_breakdown = {} + for member in members: + role = member["role_name"] + roles_breakdown[role] = roles_breakdown.get(role, 0) + 1 + + return TeamStatistics( + total_members=total, + active_members=active, + inactive_members=inactive, + pending_invitations=pending, + owners=owners, + team_members=team_members, + roles_breakdown=roles_breakdown + ) diff --git a/app/core/permissions.py b/app/core/permissions.py new file mode 100644 index 00000000..5651abf9 --- /dev/null +++ b/app/core/permissions.py @@ -0,0 +1,203 @@ +# app/core/permissions.py +""" +Permission constants and checking logic for RBAC. + +This module defines: +- Vendor-specific permissions +- Permission groups (for easier role creation) +- Permission checking utilities +""" +from enum import Enum +from typing import List, Set + + +class VendorPermissions(str, Enum): + """ + All available permissions within a vendor context. + + Naming convention: RESOURCE_ACTION + """ + # Dashboard + DASHBOARD_VIEW = "dashboard.view" + + # Products + PRODUCTS_VIEW = "products.view" + PRODUCTS_CREATE = "products.create" + PRODUCTS_EDIT = "products.edit" + PRODUCTS_DELETE = "products.delete" + PRODUCTS_IMPORT = "products.import" + PRODUCTS_EXPORT = "products.export" + + # Stock/Inventory + STOCK_VIEW = "stock.view" + STOCK_EDIT = "stock.edit" + STOCK_TRANSFER = "stock.transfer" + + # Orders + ORDERS_VIEW = "orders.view" + ORDERS_EDIT = "orders.edit" + ORDERS_CANCEL = "orders.cancel" + ORDERS_REFUND = "orders.refund" + + # Customers + CUSTOMERS_VIEW = "customers.view" + CUSTOMERS_EDIT = "customers.edit" + CUSTOMERS_DELETE = "customers.delete" + CUSTOMERS_EXPORT = "customers.export" + + # Marketing + MARKETING_VIEW = "marketing.view" + MARKETING_CREATE = "marketing.create" + MARKETING_SEND = "marketing.send" + + # Reports + REPORTS_VIEW = "reports.view" + REPORTS_FINANCIAL = "reports.financial" + REPORTS_EXPORT = "reports.export" + + # Settings + SETTINGS_VIEW = "settings.view" + SETTINGS_EDIT = "settings.edit" + SETTINGS_THEME = "settings.theme" + SETTINGS_DOMAINS = "settings.domains" + + # Team Management + TEAM_VIEW = "team.view" + TEAM_INVITE = "team.invite" + TEAM_EDIT = "team.edit" + TEAM_REMOVE = "team.remove" + + # Marketplace Imports + IMPORTS_VIEW = "imports.view" + IMPORTS_CREATE = "imports.create" + IMPORTS_CANCEL = "imports.cancel" + + +class PermissionGroups: + """Pre-defined permission groups for common roles.""" + + # Full access (for owners) + OWNER: Set[str] = set(p.value for p in VendorPermissions) + + # Manager - Can do most things except team management and critical settings + MANAGER: Set[str] = { + VendorPermissions.DASHBOARD_VIEW.value, + VendorPermissions.PRODUCTS_VIEW.value, + VendorPermissions.PRODUCTS_CREATE.value, + VendorPermissions.PRODUCTS_EDIT.value, + VendorPermissions.PRODUCTS_DELETE.value, + VendorPermissions.PRODUCTS_IMPORT.value, + VendorPermissions.PRODUCTS_EXPORT.value, + VendorPermissions.STOCK_VIEW.value, + VendorPermissions.STOCK_EDIT.value, + VendorPermissions.STOCK_TRANSFER.value, + VendorPermissions.ORDERS_VIEW.value, + VendorPermissions.ORDERS_EDIT.value, + VendorPermissions.ORDERS_CANCEL.value, + VendorPermissions.ORDERS_REFUND.value, + VendorPermissions.CUSTOMERS_VIEW.value, + VendorPermissions.CUSTOMERS_EDIT.value, + VendorPermissions.CUSTOMERS_EXPORT.value, + VendorPermissions.MARKETING_VIEW.value, + VendorPermissions.MARKETING_CREATE.value, + VendorPermissions.MARKETING_SEND.value, + VendorPermissions.REPORTS_VIEW.value, + VendorPermissions.REPORTS_FINANCIAL.value, + VendorPermissions.REPORTS_EXPORT.value, + VendorPermissions.SETTINGS_VIEW.value, + VendorPermissions.SETTINGS_THEME.value, + VendorPermissions.IMPORTS_VIEW.value, + VendorPermissions.IMPORTS_CREATE.value, + VendorPermissions.IMPORTS_CANCEL.value, + } + + # Staff - Can view and edit products/orders but limited access + STAFF: Set[str] = { + VendorPermissions.DASHBOARD_VIEW.value, + VendorPermissions.PRODUCTS_VIEW.value, + VendorPermissions.PRODUCTS_CREATE.value, + VendorPermissions.PRODUCTS_EDIT.value, + VendorPermissions.STOCK_VIEW.value, + VendorPermissions.STOCK_EDIT.value, + VendorPermissions.ORDERS_VIEW.value, + VendorPermissions.ORDERS_EDIT.value, + VendorPermissions.CUSTOMERS_VIEW.value, + VendorPermissions.CUSTOMERS_EDIT.value, + } + + # Support - Can view and assist with orders/customers + SUPPORT: Set[str] = { + VendorPermissions.DASHBOARD_VIEW.value, + VendorPermissions.PRODUCTS_VIEW.value, + VendorPermissions.ORDERS_VIEW.value, + VendorPermissions.ORDERS_EDIT.value, + VendorPermissions.CUSTOMERS_VIEW.value, + VendorPermissions.CUSTOMERS_EDIT.value, + } + + # Viewer - Read-only access + VIEWER: Set[str] = { + VendorPermissions.DASHBOARD_VIEW.value, + VendorPermissions.PRODUCTS_VIEW.value, + VendorPermissions.STOCK_VIEW.value, + VendorPermissions.ORDERS_VIEW.value, + VendorPermissions.CUSTOMERS_VIEW.value, + VendorPermissions.REPORTS_VIEW.value, + } + + # Marketing - Focused on marketing and customer communication + MARKETING: Set[str] = { + VendorPermissions.DASHBOARD_VIEW.value, + VendorPermissions.CUSTOMERS_VIEW.value, + VendorPermissions.CUSTOMERS_EXPORT.value, + VendorPermissions.MARKETING_VIEW.value, + VendorPermissions.MARKETING_CREATE.value, + VendorPermissions.MARKETING_SEND.value, + VendorPermissions.REPORTS_VIEW.value, + } + + +class PermissionChecker: + """Utility class for permission checking.""" + + @staticmethod + def has_permission(permissions: List[str], required_permission: str) -> bool: + """Check if a permission list contains a required permission.""" + return required_permission in permissions + + @staticmethod + def has_any_permission(permissions: List[str], required_permissions: List[str]) -> bool: + """Check if a permission list contains ANY of the required permissions.""" + return any(perm in permissions for perm in required_permissions) + + @staticmethod + def has_all_permissions(permissions: List[str], required_permissions: List[str]) -> bool: + """Check if a permission list contains ALL of the required permissions.""" + return all(perm in permissions for perm in required_permissions) + + @staticmethod + def get_missing_permissions(permissions: List[str], required_permissions: List[str]) -> List[str]: + """Get list of missing permissions.""" + return [perm for perm in required_permissions if perm not in permissions] + + +# Helper function to get permissions for a role preset +def get_preset_permissions(preset_name: str) -> Set[str]: + """ + Get permissions for a preset role. + + Args: + preset_name: Name of the preset (manager, staff, support, viewer, marketing) + + Returns: + Set of permission strings + """ + presets = { + "owner": PermissionGroups.OWNER, + "manager": PermissionGroups.MANAGER, + "staff": PermissionGroups.STAFF, + "support": PermissionGroups.SUPPORT, + "viewer": PermissionGroups.VIEWER, + "marketing": PermissionGroups.MARKETING, + } + return presets.get(preset_name.lower(), set()) diff --git a/app/exceptions/__init__.py b/app/exceptions/__init__.py index bfa4a3b1..0816ac7a 100644 --- a/app/exceptions/__init__.py +++ b/app/exceptions/__init__.py @@ -146,6 +146,7 @@ from .team import ( MaxTeamMembersReachedException, TeamValidationException, InvalidInvitationDataException, + InvalidInvitationTokenException, ) # Product exceptions @@ -215,6 +216,7 @@ __all__ = [ "MaxTeamMembersReachedException", "TeamValidationException", "InvalidInvitationDataException", + "InvalidInvitationTokenException", # Inventory exceptions "InventoryNotFoundException", @@ -306,4 +308,4 @@ __all__ = [ "InvalidAdminActionException", "BulkOperationException", "ConfirmationRequiredException", -] \ No newline at end of file +] diff --git a/app/exceptions/team.py b/app/exceptions/team.py index 5a976023..dd3e6306 100644 --- a/app/exceptions/team.py +++ b/app/exceptions/team.py @@ -147,10 +147,10 @@ class InvalidRoleException(ValidationException): """Raised when role data is invalid.""" def __init__( - self, - message: str = "Invalid role data", - field: Optional[str] = None, - details: Optional[Dict[str, Any]] = None, + self, + message: str = "Invalid role data", + field: Optional[str] = None, + details: Optional[Dict[str, Any]] = None, ): super().__init__( message=message, @@ -164,10 +164,10 @@ class InsufficientPermissionsException(AuthorizationException): """Raised when user lacks required permissions for an action.""" def __init__( - self, - required_permission: str, - user_id: Optional[int] = None, - action: Optional[str] = None, + self, + required_permission: str, + user_id: Optional[int] = None, + action: Optional[str] = None, ): details = {"required_permission": required_permission} if user_id: @@ -202,10 +202,10 @@ class TeamValidationException(ValidationException): """Raised when team operation validation fails.""" def __init__( - self, - message: str = "Team operation validation failed", - field: Optional[str] = None, - validation_errors: Optional[Dict[str, str]] = None, + self, + message: str = "Team operation validation failed", + field: Optional[str] = None, + validation_errors: Optional[Dict[str, str]] = None, ): details = {} if validation_errors: @@ -223,10 +223,10 @@ class InvalidInvitationDataException(ValidationException): """Raised when team invitation data is invalid.""" def __init__( - self, - message: str = "Invalid invitation data", - field: Optional[str] = None, - details: Optional[Dict[str, Any]] = None, + self, + message: str = "Invalid invitation data", + field: Optional[str] = None, + details: Optional[Dict[str, Any]] = None, ): super().__init__( message=message, @@ -234,3 +234,31 @@ class InvalidInvitationDataException(ValidationException): details=details, ) self.error_code = "INVALID_INVITATION_DATA" + + +# ============================================================================ +# NEW: Add InvalidInvitationTokenException +# ============================================================================ + +class InvalidInvitationTokenException(ValidationException): + """Raised when invitation token is invalid, expired, or already used. + + This is a general exception for any invitation token validation failure. + Use this when checking invitation tokens during the acceptance flow. + """ + + def __init__( + self, + message: str = "Invalid or expired invitation token", + invitation_token: Optional[str] = None + ): + details = {} + if invitation_token: + details["invitation_token"] = invitation_token + + super().__init__( + message=message, + field="invitation_token", + details=details, + ) + self.error_code = "INVALID_INVITATION_TOKEN" diff --git a/app/services/vendor_team_service.py b/app/services/vendor_team_service.py new file mode 100644 index 00000000..763ea8f1 --- /dev/null +++ b/app/services/vendor_team_service.py @@ -0,0 +1,465 @@ +# app/services/vendor_team_service.py +""" +Vendor team management service. + +Handles: +- Team member invitations +- Invitation acceptance +- Role assignment +- Permission management +""" +import logging +import secrets +from datetime import datetime, timedelta +from typing import Optional, List, Dict, Any + +from sqlalchemy.orm import Session + +from app.core.permissions import get_preset_permissions +from app.exceptions import ( + TeamMemberAlreadyExistsException, + InvalidInvitationTokenException, + TeamInvitationAlreadyAcceptedException, + MaxTeamMembersReachedException, + UserNotFoundException, + VendorNotFoundException, + CannotRemoveOwnerException, +) +from models.database.user import User +from models.database.vendor import Vendor, VendorUser, VendorUserType, Role +from middleware.auth import AuthManager + +logger = logging.getLogger(__name__) + + +class VendorTeamService: + """Service for managing vendor team members.""" + + def __init__(self): + self.auth_manager = AuthManager() + self.max_team_members = 50 # Configure as needed + + def invite_team_member( + self, + db: Session, + vendor: Vendor, + inviter: User, + email: str, + role_name: str, + custom_permissions: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """ + Invite a new team member to a vendor. + + Creates: + 1. User account (if doesn't exist) + 2. Role (if custom permissions provided) + 3. VendorUser relationship with invitation token + + Args: + db: Database session + vendor: Vendor to invite to + inviter: User sending the invitation + email: Email of person to invite + role_name: Role name (manager, staff, support, etc.) + custom_permissions: Optional custom permissions (overrides preset) + + Returns: + Dict with invitation details + """ + try: + # Check team size limit + current_team_size = db.query(VendorUser).filter( + VendorUser.vendor_id == vendor.id, + VendorUser.is_active == True, + ).count() + + if current_team_size >= self.max_team_members: + raise MaxTeamMembersReachedException( + self.max_team_members, + vendor.vendor_code, + ) + + # Check if user already exists + user = db.query(User).filter(User.email == email).first() + + if user: + # Check if already a member + existing_membership = db.query(VendorUser).filter( + VendorUser.vendor_id == vendor.id, + VendorUser.user_id == user.id, + ).first() + + if existing_membership: + if existing_membership.is_active: + raise TeamMemberAlreadyExistsException(email, vendor.vendor_code) + # Reactivate old membership + existing_membership.is_active = False # Will be activated on acceptance + existing_membership.invitation_token = self._generate_invitation_token() + existing_membership.invitation_sent_at = datetime.utcnow() + existing_membership.invitation_accepted_at = None + db.commit() + + logger.info(f"Re-invited user {email} to vendor {vendor.vendor_code}") + return { + "invitation_token": existing_membership.invitation_token, + "email": email, + "existing_user": True, + } + else: + # Create new user account (inactive until invitation accepted) + username = email.split('@')[0] + # Ensure unique username + base_username = username + counter = 1 + while db.query(User).filter(User.username == username).first(): + username = f"{base_username}{counter}" + counter += 1 + + # Generate temporary password (user will set real one on activation) + temp_password = secrets.token_urlsafe(16) + + user = User( + email=email, + username=username, + hashed_password=self.auth_manager.hash_password(temp_password), + role="vendor", # Platform role + is_active=False, # Will be activated when invitation accepted + is_email_verified=False, + ) + db.add(user) + db.flush() # Get user.id + + logger.info(f"Created new user account for invitation: {email}") + + # Get or create role + role = self._get_or_create_role( + db=db, + vendor=vendor, + role_name=role_name, + custom_permissions=custom_permissions, + ) + + # Create vendor membership with invitation + invitation_token = self._generate_invitation_token() + + vendor_user = VendorUser( + vendor_id=vendor.id, + user_id=user.id, + user_type=VendorUserType.TEAM_MEMBER.value, + role_id=role.id, + invited_by=inviter.id, + invitation_token=invitation_token, + invitation_sent_at=datetime.utcnow(), + is_active=False, # Will be activated on acceptance + ) + db.add(vendor_user) + db.commit() + + logger.info( + f"Invited {email} to vendor {vendor.vendor_code} " + f"as {role_name} by {inviter.username}" + ) + + # TODO: Send invitation email + # self._send_invitation_email(email, vendor, invitation_token) + + return { + "invitation_token": invitation_token, + "email": email, + "role": role_name, + "existing_user": user.is_active, + } + + except (TeamMemberAlreadyExistsException, MaxTeamMembersReachedException): + raise + except Exception as e: + db.rollback() + logger.error(f"Error inviting team member: {str(e)}") + raise + + def accept_invitation( + self, + db: Session, + invitation_token: str, + password: str, + first_name: Optional[str] = None, + last_name: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Accept a team invitation and activate account. + + Args: + db: Database session + invitation_token: Invitation token from email + password: New password to set + first_name: Optional first name + last_name: Optional last name + + Returns: + Dict with user and vendor info + """ + try: + # Find invitation + vendor_user = db.query(VendorUser).filter( + VendorUser.invitation_token == invitation_token, + ).first() + + if not vendor_user: + raise InvalidInvitationTokenException() + + # Check if already accepted + if vendor_user.invitation_accepted_at is not None: + raise TeamInvitationAlreadyAcceptedException() + + # Check token expiration (7 days) + if vendor_user.invitation_sent_at: + expiry_date = vendor_user.invitation_sent_at + timedelta(days=7) + if datetime.utcnow() > expiry_date: + raise InvalidInvitationTokenException("Invitation has expired") + + user = vendor_user.user + vendor = vendor_user.vendor + + # Update user + user.hashed_password = self.auth_manager.hash_password(password) + user.is_active = True + user.is_email_verified = True + if first_name: + user.first_name = first_name + if last_name: + user.last_name = last_name + + # Activate membership + vendor_user.is_active = True + vendor_user.invitation_accepted_at = datetime.utcnow() + vendor_user.invitation_token = None # Clear token + + db.commit() + + logger.info( + f"User {user.email} accepted invitation to vendor {vendor.vendor_code}" + ) + + return { + "user": user, + "vendor": vendor, + "role": vendor_user.role.name if vendor_user.role else "member", + } + + except (InvalidInvitationTokenException, TeamInvitationAlreadyAcceptedException): + raise + except Exception as e: + db.rollback() + logger.error(f"Error accepting invitation: {str(e)}") + raise + + def remove_team_member( + self, + db: Session, + vendor: Vendor, + user_id: int, + ) -> bool: + """ + Remove a team member from a vendor. + + Cannot remove owner. + + Args: + db: Database session + vendor: Vendor to remove from + user_id: User ID to remove + + Returns: + True if removed + """ + try: + vendor_user = db.query(VendorUser).filter( + VendorUser.vendor_id == vendor.id, + VendorUser.user_id == user_id, + ).first() + + if not vendor_user: + raise UserNotFoundException(str(user_id)) + + # Cannot remove owner + if vendor_user.is_owner: + raise CannotRemoveOwnerException(vendor.vendor_code) + + # Soft delete - just deactivate + vendor_user.is_active = False + db.commit() + + logger.info(f"Removed user {user_id} from vendor {vendor.vendor_code}") + return True + + except (UserNotFoundException, CannotRemoveOwnerException): + raise + except Exception as e: + db.rollback() + logger.error(f"Error removing team member: {str(e)}") + raise + + def update_member_role( + self, + db: Session, + vendor: Vendor, + user_id: int, + new_role_name: str, + custom_permissions: Optional[List[str]] = None, + ) -> VendorUser: + """ + Update a team member's role. + + Args: + db: Database session + vendor: Vendor + user_id: User ID + new_role_name: New role name + custom_permissions: Optional custom permissions + + Returns: + Updated VendorUser + """ + try: + vendor_user = db.query(VendorUser).filter( + VendorUser.vendor_id == vendor.id, + VendorUser.user_id == user_id, + ).first() + + if not vendor_user: + raise UserNotFoundException(str(user_id)) + + # Cannot change owner's role + if vendor_user.is_owner: + raise CannotRemoveOwnerException(vendor.vendor_code) + + # Get or create new role + new_role = self._get_or_create_role( + db=db, + vendor=vendor, + role_name=new_role_name, + custom_permissions=custom_permissions, + ) + + vendor_user.role_id = new_role.id + db.commit() + + logger.info( + f"Updated role for user {user_id} in vendor {vendor.vendor_code} " + f"to {new_role_name}" + ) + + return vendor_user + + except (UserNotFoundException, CannotRemoveOwnerException): + raise + except Exception as e: + db.rollback() + logger.error(f"Error updating member role: {str(e)}") + raise + + def get_team_members( + self, + db: Session, + vendor: Vendor, + include_inactive: bool = False, + ) -> List[Dict[str, Any]]: + """ + Get all team members for a vendor. + + Args: + db: Database session + vendor: Vendor + include_inactive: Include inactive members + + Returns: + List of team member info + """ + query = db.query(VendorUser).filter( + VendorUser.vendor_id == vendor.id, + ) + + if not include_inactive: + query = query.filter(VendorUser.is_active == True) + + vendor_users = query.all() + + members = [] + for vu in vendor_users: + members.append({ + "id": vu.user.id, + "email": vu.user.email, + "username": vu.user.username, + "full_name": vu.user.full_name, + "user_type": vu.user_type, + "role": vu.role.name if vu.role else "owner", + "permissions": vu.get_all_permissions(), + "is_active": vu.is_active, + "is_owner": vu.is_owner, + "invitation_pending": vu.is_invitation_pending, + "invited_at": vu.invitation_sent_at, + "accepted_at": vu.invitation_accepted_at, + }) + + return members + + # Private helper methods + + def _generate_invitation_token(self) -> str: + """Generate a secure invitation token.""" + return secrets.token_urlsafe(32) + + def _get_or_create_role( + self, + db: Session, + vendor: Vendor, + role_name: str, + custom_permissions: Optional[List[str]] = None, + ) -> Role: + """Get existing role or create new one with preset/custom permissions.""" + # Try to find existing role with same name + role = db.query(Role).filter( + Role.vendor_id == vendor.id, + Role.name == role_name, + ).first() + + if role and custom_permissions is None: + # Use existing role + return role + + # Determine permissions + if custom_permissions: + permissions = custom_permissions + else: + # Get preset permissions + permissions = list(get_preset_permissions(role_name)) + + if role: + # Update existing role with new permissions + role.permissions = permissions + else: + # Create new role + role = Role( + vendor_id=vendor.id, + name=role_name, + permissions=permissions, + ) + db.add(role) + + db.flush() + return role + + def _send_invitation_email(self, email: str, vendor: Vendor, token: str): + """Send invitation email (TODO: implement).""" + # TODO: Implement email sending + # Should include: + # - Link to accept invitation: /vendor/invitation/accept?token={token} + # - Vendor name + # - Inviter name + # - Expiry date + pass + + +# Create service instance +vendor_team_service = VendorTeamService() diff --git a/models/database/user.py b/models/database/user.py index 303eed1c..48650405 100644 --- a/models/database/user.py +++ b/models/database/user.py @@ -1,39 +1,56 @@ -# models/database/user.py +# models/database/user.py - IMPROVED VERSION """ User model with authentication support. -This module defines the User model which includes fields for user details, -authentication information, and relationships to other models such as Vendor and Customer. -""" -from sqlalchemy import Boolean, Column, DateTime, Integer, String -from sqlalchemy.orm import relationship +ROLE CLARIFICATION: +- User.role should ONLY contain platform-level roles: + * "admin" - Platform administrator (full system access) + * "vendor" - Any user who owns or is part of a vendor team + +- Vendor-specific roles (manager, staff, etc.) are stored in VendorUser.role +- Customers are NOT in the User table - they use the Customer model +""" +from sqlalchemy import Boolean, Column, DateTime, Integer, String, Enum +from sqlalchemy.orm import relationship +import enum -# Import Base from the central database module instead of creating a new one from app.core.database import Base from models.database.base import TimestampMixin +class UserRole(str, enum.Enum): + """Platform-level user roles.""" + ADMIN = "admin" # Platform administrator + VENDOR = "vendor" # Vendor owner or team member + + class User(Base, TimestampMixin): - """Represents a user in the system.""" + """Represents a platform user (admins and vendors only).""" - __tablename__ = "users" # Name of the table in the database + __tablename__ = "users" - id = Column(Integer, primary_key=True, index=True) # Primary key and indexed column for user ID - email = Column(String, unique=True, index=True, nullable=False) # Unique, indexed, non-nullable email column - username = Column(String, unique=True, index=True, nullable=False) # Unique, indexed, non-nullable username column - first_name = Column(String) # Optional first name column - last_name = Column(String) # Optional last name column - hashed_password = Column(String, nullable=False) # Non-nullable hashed password column - role = Column(String, nullable=False, default="user") # Role of the user (default is 'user') //TODO: Change to customer, vendor, admin - is_active = Column(Boolean, default=True, nullable=False) # Active status of the user (default is True) - last_login = Column(DateTime, nullable=True) # Optional last login timestamp column + id = Column(Integer, primary_key=True, index=True) + email = Column(String, unique=True, index=True, nullable=False) + username = Column(String, unique=True, index=True, nullable=False) + first_name = Column(String) + last_name = Column(String) + hashed_password = Column(String, nullable=False) + + # Platform-level role only (admin or vendor) + role = Column(String, nullable=False, default=UserRole.VENDOR.value) + + is_active = Column(Boolean, default=True, nullable=False) + is_email_verified = Column(Boolean, default=False, nullable=False) + last_login = Column(DateTime, nullable=True) # Relationships - marketplace_import_jobs = relationship("MarketplaceImportJob", - back_populates="user") # Relationship with import jobs - owned_vendors = relationship("Vendor", back_populates="owner") # Relationship with vendors owned by this user - vendor_memberships = relationship("VendorUser", foreign_keys="[VendorUser.user_id]", - back_populates="user") # Relationship with vendor memberships + marketplace_import_jobs = relationship("MarketplaceImportJob", back_populates="user") + owned_vendors = relationship("Vendor", back_populates="owner") + vendor_memberships = relationship( + "VendorUser", + foreign_keys="[VendorUser.user_id]", + back_populates="user" + ) def __repr__(self): """String representation of the User object.""" @@ -45,3 +62,55 @@ class User(Base, TimestampMixin): if self.first_name and self.last_name: return f"{self.first_name} {self.last_name}" return self.username + + @property + def is_admin(self) -> bool: + """Check if user is a platform admin.""" + return self.role == UserRole.ADMIN.value + + @property + def is_vendor(self) -> bool: + """Check if user is a vendor (owner or team member).""" + return self.role == UserRole.VENDOR.value + + def is_owner_of(self, vendor_id: int) -> bool: + """Check if user is the owner of a specific vendor.""" + return any(v.id == vendor_id for v in self.owned_vendors) + + def is_member_of(self, vendor_id: int) -> bool: + """Check if user is a member of a specific vendor (owner or team).""" + # Check if owner + if self.is_owner_of(vendor_id): + return True + # Check if team member + return any( + vm.vendor_id == vendor_id and vm.is_active + for vm in self.vendor_memberships + ) + + def get_vendor_role(self, vendor_id: int) -> str: + """Get user's role within a specific vendor.""" + # Check if owner + if self.is_owner_of(vendor_id): + return "owner" + + # Check team membership + for vm in self.vendor_memberships: + if vm.vendor_id == vendor_id and vm.is_active: + return vm.role.name if vm.role else "member" + + return None + + def has_vendor_permission(self, vendor_id: int, permission: str) -> bool: + """Check if user has a specific permission in a vendor.""" + # Owners have all permissions + if self.is_owner_of(vendor_id): + return True + + # Check team member permissions + for vm in self.vendor_memberships: + if vm.vendor_id == vendor_id and vm.is_active: + if vm.role and permission in vm.role.permissions: + return True + + return False diff --git a/models/database/vendor.py b/models/database/vendor.py index 8e8a89e9..f1c997f2 100644 --- a/models/database/vendor.py +++ b/models/database/vendor.py @@ -5,14 +5,14 @@ Vendor model representing entities that sell products or services. This module defines the Vendor model along with its relationships to other models such as User (owner), Product, Customer, Order, and MarketplaceImportJob. """ -from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text, JSON +from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text, JSON, DateTime from sqlalchemy.orm import relationship # Import Base from the central database module instead of creating a new one from app.core.database import Base from models.database.base import TimestampMixin from app.core.config import settings - +import enum class Vendor(Base, TimestampMixin): """Represents a vendor in the system.""" @@ -177,10 +177,21 @@ class Vendor(Base, TimestampMixin): return domains -class VendorUser(Base, TimestampMixin): - """Represents a user's role within a specific vendor.""" +class VendorUserType(str, enum.Enum): + """Types of vendor users.""" + OWNER = "owner" # Vendor owner (full access to vendor area) + TEAM_MEMBER = "member" # Team member (role-based access to vendor area) - __tablename__ = "vendor_users" # Name of the table in the database + +class VendorUser(Base, TimestampMixin): + """ + Represents a user's membership in a vendor. + + - Owner: Created automatically when vendor is created + - Team Member: Invited by owner via email + """ + + __tablename__ = "vendor_users" id = Column(Integer, primary_key=True, index=True) """Unique identifier for each VendorUser entry.""" @@ -191,15 +202,23 @@ class VendorUser(Base, TimestampMixin): user_id = Column(Integer, ForeignKey("users.id"), nullable=False) """Foreign key linking to the associated User.""" - role_id = Column(Integer, ForeignKey("roles.id"), nullable=False) + # Distinguish between owner and team member + user_type = Column(String, nullable=False, default=VendorUserType.TEAM_MEMBER.value) + + # Role for team members (NULL for owners - they have all permissions) + role_id = Column(Integer, ForeignKey("roles.id"), nullable=True) """Foreign key linking to the associated Role.""" invited_by = Column(Integer, ForeignKey("users.id")) """Foreign key linking to the user who invited this VendorUser.""" + invitation_token = Column(String, nullable=True, index=True) # For email activation + invitation_sent_at = Column(DateTime, nullable=True) + invitation_accepted_at = Column(DateTime, nullable=True) - is_active = Column(Boolean, default=True, nullable=False) + is_active = Column(Boolean, default=False, nullable=False) # False until invitation accepted """Indicates whether the VendorUser role is active.""" + # Relationships vendor = relationship("Vendor", back_populates="vendor_users") """Relationship to the Vendor model, representing the associated vendor.""" @@ -216,10 +235,57 @@ class VendorUser(Base, TimestampMixin): """Return a string representation of the VendorUser instance. Returns: - str: A string that includes the vendor_id and user_id of the VendorUser instance. + str: A string that includes the vendor_id, the user_id and the user_type of the VendorUser instance. """ - return f"" + return f"" + @property + def is_owner(self) -> bool: + """Check if this is an owner membership.""" + return self.user_type == VendorUserType.OWNER.value + + @property + def is_team_member(self) -> bool: + """Check if this is a team member (not owner).""" + return self.user_type == VendorUserType.TEAM_MEMBER.value + + @property + def is_invitation_pending(self) -> bool: + """Check if invitation is still pending.""" + return self.invitation_token is not None and self.invitation_accepted_at is None + + def has_permission(self, permission: str) -> bool: + """ + Check if user has a specific permission. + + Owners always have all permissions. + Team members check their role's permissions. + """ + # Owners have all permissions + if self.is_owner: + return True + + # Inactive users have no permissions + if not self.is_active: + return False + + # Check role permissions + if self.role and self.role.permissions: + return permission in self.role.permissions + + return False + + def get_all_permissions(self) -> list: + """Get all permissions this user has.""" + if self.is_owner: + # Return all possible permissions + from app.core.permissions import VendorPermissions + return list(VendorPermissions.__members__.values()) + + if self.role and self.role.permissions: + return self.role.permissions + + return [] class Role(Base, TimestampMixin): """Represents a role within a vendor's system.""" diff --git a/models/schema/team.py b/models/schema/team.py index ad854e3c..81db283b 100644 --- a/models/schema/team.py +++ b/models/schema/team.py @@ -1 +1,260 @@ -# Team management models +# models/schema/team.py +""" +Pydantic schemas for vendor team management. + +This module defines request/response schemas for: +- Team member listing +- Team member invitation +- Team member updates +- Role management +""" + +from datetime import datetime +from typing import Optional, List +from pydantic import BaseModel, EmailStr, Field, validator + + +# ============================================================================ +# Role Schemas +# ============================================================================ + +class RoleBase(BaseModel): + """Base role schema.""" + name: str = Field(..., min_length=1, max_length=100, description="Role name") + permissions: List[str] = Field(default_factory=list, description="List of permission strings") + + +class RoleCreate(RoleBase): + """Schema for creating a role.""" + pass + + +class RoleUpdate(BaseModel): + """Schema for updating a role.""" + name: Optional[str] = Field(None, min_length=1, max_length=100) + permissions: Optional[List[str]] = None + + +class RoleResponse(RoleBase): + """Schema for role response.""" + id: int + vendor_id: int + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True # Pydantic v2 (use orm_mode = True for v1) + + +class RoleListResponse(BaseModel): + """Schema for role list response.""" + roles: List[RoleResponse] + total: int + + +# ============================================================================ +# Team Member Schemas +# ============================================================================ + +class TeamMemberBase(BaseModel): + """Base team member schema.""" + email: EmailStr = Field(..., description="Team member email address") + first_name: Optional[str] = Field(None, max_length=100) + last_name: Optional[str] = Field(None, max_length=100) + + +class TeamMemberInvite(TeamMemberBase): + """Schema for inviting a team member.""" + role_id: Optional[int] = Field(None, description="Role ID to assign (for preset roles)") + role_name: Optional[str] = Field(None, description="Role name (manager, staff, support, etc.)") + custom_permissions: Optional[List[str]] = Field( + None, + description="Custom permissions (overrides role preset)" + ) + + @validator('role_name') + def validate_role_name(cls, v): + """Validate role name is in allowed presets.""" + if v is not None: + allowed_roles = ['manager', 'staff', 'support', 'viewer', 'marketing'] + if v.lower() not in allowed_roles: + raise ValueError( + f"Role name must be one of: {', '.join(allowed_roles)}" + ) + return v.lower() if v else v + + @validator('custom_permissions') + def validate_custom_permissions(cls, v, values): + """Ensure either role_id/role_name OR custom_permissions is provided.""" + if v is not None and len(v) > 0: + # If custom permissions provided, role_name should be provided too + if 'role_name' not in values or not values['role_name']: + raise ValueError( + "role_name is required when providing custom_permissions" + ) + return v + + +class TeamMemberUpdate(BaseModel): + """Schema for updating a team member.""" + role_id: Optional[int] = Field(None, description="New role ID") + is_active: Optional[bool] = Field(None, description="Active status") + + +class TeamMemberResponse(BaseModel): + """Schema for team member response.""" + id: int = Field(..., description="User ID") + email: EmailStr + username: str + first_name: Optional[str] + last_name: Optional[str] + full_name: str + user_type: str = Field(..., description="'owner' or 'member'") + role_name: str = Field(..., description="Role name") + role_id: Optional[int] + permissions: List[str] = Field(default_factory=list, description="User's permissions") + is_active: bool + is_owner: bool + invitation_pending: bool = Field( + default=False, + description="True if invitation not yet accepted" + ) + invited_at: Optional[datetime] = Field(None, description="When invitation was sent") + accepted_at: Optional[datetime] = Field(None, description="When invitation was accepted") + joined_at: datetime = Field(..., description="When user joined vendor") + + class Config: + from_attributes = True + + +class TeamMemberListResponse(BaseModel): + """Schema for team member list response.""" + members: List[TeamMemberResponse] + total: int + active_count: int + pending_invitations: int + + +# ============================================================================ +# Invitation Schemas +# ============================================================================ + +class InvitationAccept(BaseModel): + """Schema for accepting a team invitation.""" + invitation_token: str = Field(..., min_length=32, description="Invitation token from email") + password: str = Field( + ..., + min_length=8, + max_length=128, + description="Password for new account" + ) + first_name: str = Field(..., min_length=1, max_length=100) + last_name: str = Field(..., min_length=1, max_length=100) + + @validator('password') + def validate_password_strength(cls, v): + """Validate password meets minimum requirements.""" + if len(v) < 8: + raise ValueError("Password must be at least 8 characters long") + + has_upper = any(c.isupper() for c in v) + has_lower = any(c.islower() for c in v) + has_digit = any(c.isdigit() for c in v) + + if not (has_upper and has_lower and has_digit): + raise ValueError( + "Password must contain at least one uppercase letter, " + "one lowercase letter, and one digit" + ) + + return v + + +class InvitationResponse(BaseModel): + """Schema for invitation response.""" + message: str + email: EmailStr + role: str + invitation_token: Optional[str] = Field( + None, + description="Token (only returned in dev/test environments)" + ) + invitation_sent: bool = Field(default=True) + + +class InvitationAcceptResponse(BaseModel): + """Schema for invitation acceptance response.""" + message: str + vendor: dict = Field(..., description="Vendor information") + user: dict = Field(..., description="User information") + role: str + + +# ============================================================================ +# Team Statistics Schema +# ============================================================================ + +class TeamStatistics(BaseModel): + """Schema for team statistics.""" + total_members: int + active_members: int + inactive_members: int + pending_invitations: int + owners: int + team_members: int + roles_breakdown: dict = Field( + default_factory=dict, + description="Count of members per role" + ) + + +# ============================================================================ +# Bulk Operations Schemas +# ============================================================================ + +class BulkRemoveRequest(BaseModel): + """Schema for bulk removing team members.""" + user_ids: List[int] = Field(..., min_items=1, description="List of user IDs to remove") + + +class BulkRemoveResponse(BaseModel): + """Schema for bulk remove response.""" + success_count: int + failed_count: int + errors: List[dict] = Field(default_factory=list) + + +# ============================================================================ +# Permission Check Schemas +# ============================================================================ + +class PermissionCheckRequest(BaseModel): + """Schema for checking permissions.""" + permissions: List[str] = Field(..., min_items=1, description="Permissions to check") + + +class PermissionCheckResponse(BaseModel): + """Schema for permission check response.""" + has_all: bool = Field(..., description="True if user has all permissions") + has_any: bool = Field(..., description="True if user has any permission") + granted: List[str] = Field(default_factory=list, description="Permissions user has") + denied: List[str] = Field(default_factory=list, description="Permissions user lacks") + + +class UserPermissionsResponse(BaseModel): + """Schema for user's permissions response.""" + permissions: List[str] = Field(default_factory=list) + permission_count: int + is_owner: bool + role_name: Optional[str] = None + + +# ============================================================================ +# Error Response Schema +# ============================================================================ + +class TeamErrorResponse(BaseModel): + """Schema for team operation errors.""" + error_code: str + message: str + details: Optional[dict] = None