Templates Migration: - Migrate admin templates to modules (tenancy, billing, monitoring, marketplace, etc.) - Migrate vendor templates to modules (tenancy, billing, orders, messaging, etc.) - Migrate storefront templates to modules (catalog, customers, orders, cart, checkout, cms) - Migrate public templates to modules (billing, marketplace, cms) - Keep shared templates in app/templates/ (base.html, errors/, partials/, macros/) - Migrate letzshop partials to marketplace module Static Files Migration: - Migrate admin JS to modules: tenancy (23 files), core (5 files), monitoring (1 file) - Migrate vendor JS to modules: tenancy (4 files), core (2 files) - Migrate shared JS: vendor-selector.js to core, media-picker.js to cms - Migrate storefront JS: storefront-layout.js to core - Keep framework JS in static/ (api-client, utils, money, icons, log-config, lib/) - Update all template references to use module_static paths Naming Consistency: - Rename static/platform/ to static/public/ - Rename app/templates/platform/ to app/templates/public/ - Update all extends and static references Documentation: - Update module-system.md with shared templates documentation - Update frontend-structure.md with new module JS organization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1145 lines
35 KiB
Python
1145 lines
35 KiB
Python
# app/modules/tenancy/exceptions.py
|
|
"""
|
|
Tenancy module exceptions.
|
|
|
|
Exceptions for platform, company, vendor, admin user, team, and domain management.
|
|
"""
|
|
|
|
from typing import Any
|
|
|
|
from app.exceptions.base import (
|
|
AuthenticationException,
|
|
AuthorizationException,
|
|
BusinessLogicException,
|
|
ConflictException,
|
|
ExternalServiceException,
|
|
ResourceNotFoundException,
|
|
ValidationException,
|
|
WizamartException,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Authentication Exceptions
|
|
# =============================================================================
|
|
|
|
|
|
class InvalidCredentialsException(AuthenticationException):
|
|
"""Raised when login credentials are invalid."""
|
|
|
|
def __init__(self, message: str = "Invalid username or password"):
|
|
super().__init__(
|
|
message=message,
|
|
error_code="INVALID_CREDENTIALS",
|
|
)
|
|
|
|
|
|
class TokenExpiredException(AuthenticationException):
|
|
"""Raised when JWT token has expired."""
|
|
|
|
def __init__(self, message: str = "Token has expired"):
|
|
super().__init__(
|
|
message=message,
|
|
error_code="TOKEN_EXPIRED",
|
|
)
|
|
|
|
|
|
class InvalidTokenException(AuthenticationException):
|
|
"""Raised when JWT token is invalid or malformed."""
|
|
|
|
def __init__(self, message: str = "Invalid token"):
|
|
super().__init__(
|
|
message=message,
|
|
error_code="INVALID_TOKEN",
|
|
)
|
|
|
|
|
|
class InsufficientPermissionsException(AuthorizationException):
|
|
"""Raised when user lacks required permissions for an action."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Insufficient permissions for this action",
|
|
required_permission: str | None = None,
|
|
):
|
|
details = {}
|
|
if required_permission:
|
|
details["required_permission"] = required_permission
|
|
|
|
super().__init__(
|
|
message=message,
|
|
error_code="INSUFFICIENT_PERMISSIONS",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class UserNotActiveException(AuthorizationException):
|
|
"""Raised when user account is not active."""
|
|
|
|
def __init__(self, message: str = "User account is not active"):
|
|
super().__init__(
|
|
message=message,
|
|
error_code="USER_NOT_ACTIVE",
|
|
)
|
|
|
|
|
|
class AdminRequiredException(AuthorizationException):
|
|
"""Raised when admin privileges are required."""
|
|
|
|
def __init__(self, message: str = "Admin privileges required"):
|
|
super().__init__(
|
|
message=message,
|
|
error_code="ADMIN_REQUIRED",
|
|
)
|
|
|
|
|
|
class UserAlreadyExistsException(ConflictException):
|
|
"""Raised when trying to register with existing username/email."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "User already exists",
|
|
field: str | None = None,
|
|
):
|
|
details = {}
|
|
if field:
|
|
details["field"] = field
|
|
|
|
super().__init__(
|
|
message=message,
|
|
error_code="USER_ALREADY_EXISTS",
|
|
details=details,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Platform Exceptions
|
|
# =============================================================================
|
|
|
|
|
|
class PlatformNotFoundException(WizamartException):
|
|
"""Raised when a platform is not found."""
|
|
|
|
def __init__(self, code: str):
|
|
super().__init__(
|
|
message=f"Platform not found: {code}",
|
|
error_code="PLATFORM_NOT_FOUND",
|
|
status_code=404,
|
|
details={"platform_code": code},
|
|
)
|
|
|
|
|
|
class PlatformInactiveException(WizamartException):
|
|
"""Raised when trying to access an inactive platform."""
|
|
|
|
def __init__(self, code: str):
|
|
super().__init__(
|
|
message=f"Platform is inactive: {code}",
|
|
error_code="PLATFORM_INACTIVE",
|
|
status_code=403,
|
|
details={"platform_code": code},
|
|
)
|
|
|
|
|
|
class PlatformUpdateException(WizamartException):
|
|
"""Raised when platform update fails."""
|
|
|
|
def __init__(self, code: str, reason: str):
|
|
super().__init__(
|
|
message=f"Failed to update platform {code}: {reason}",
|
|
error_code="PLATFORM_UPDATE_FAILED",
|
|
status_code=400,
|
|
details={"platform_code": code, "reason": reason},
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Vendor Exceptions
|
|
# =============================================================================
|
|
|
|
|
|
class VendorNotFoundException(ResourceNotFoundException):
|
|
"""Raised when a vendor is not found."""
|
|
|
|
def __init__(self, vendor_identifier: str, identifier_type: str = "code"):
|
|
if identifier_type.lower() == "id":
|
|
message = f"Vendor with ID '{vendor_identifier}' not found"
|
|
else:
|
|
message = f"Vendor with code '{vendor_identifier}' not found"
|
|
|
|
super().__init__(
|
|
resource_type="Vendor",
|
|
identifier=vendor_identifier,
|
|
message=message,
|
|
error_code="VENDOR_NOT_FOUND",
|
|
)
|
|
|
|
|
|
class VendorAlreadyExistsException(ConflictException):
|
|
"""Raised when trying to create a vendor that already exists."""
|
|
|
|
def __init__(self, vendor_code: str):
|
|
super().__init__(
|
|
message=f"Vendor with code '{vendor_code}' already exists",
|
|
error_code="VENDOR_ALREADY_EXISTS",
|
|
details={"vendor_code": vendor_code},
|
|
)
|
|
|
|
|
|
class VendorNotActiveException(BusinessLogicException):
|
|
"""Raised when trying to perform operations on inactive vendor."""
|
|
|
|
def __init__(self, vendor_code: str):
|
|
super().__init__(
|
|
message=f"Vendor '{vendor_code}' is not active",
|
|
error_code="VENDOR_NOT_ACTIVE",
|
|
details={"vendor_code": vendor_code},
|
|
)
|
|
|
|
|
|
class VendorNotVerifiedException(BusinessLogicException):
|
|
"""Raised when trying to perform operations requiring verified vendor."""
|
|
|
|
def __init__(self, vendor_code: str):
|
|
super().__init__(
|
|
message=f"Vendor '{vendor_code}' is not verified",
|
|
error_code="VENDOR_NOT_VERIFIED",
|
|
details={"vendor_code": vendor_code},
|
|
)
|
|
|
|
|
|
class UnauthorizedVendorAccessException(AuthorizationException):
|
|
"""Raised when user tries to access vendor they don't own."""
|
|
|
|
def __init__(self, vendor_code: str, user_id: int | None = None):
|
|
details = {"vendor_code": vendor_code}
|
|
if user_id:
|
|
details["user_id"] = user_id
|
|
|
|
super().__init__(
|
|
message=f"Unauthorized access to vendor '{vendor_code}'",
|
|
error_code="UNAUTHORIZED_VENDOR_ACCESS",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class InvalidVendorDataException(ValidationException):
|
|
"""Raised when vendor data is invalid or incomplete."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Invalid vendor data",
|
|
field: str | None = None,
|
|
details: dict[str, Any] | None = None,
|
|
):
|
|
super().__init__(
|
|
message=message,
|
|
field=field,
|
|
details=details,
|
|
)
|
|
self.error_code = "INVALID_VENDOR_DATA"
|
|
|
|
|
|
class VendorValidationException(ValidationException):
|
|
"""Raised when vendor validation fails."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Vendor validation failed",
|
|
field: str | None = None,
|
|
validation_errors: dict[str, str] | None = None,
|
|
):
|
|
details = {}
|
|
if validation_errors:
|
|
details["validation_errors"] = validation_errors
|
|
|
|
super().__init__(
|
|
message=message,
|
|
field=field,
|
|
details=details,
|
|
)
|
|
self.error_code = "VENDOR_VALIDATION_FAILED"
|
|
|
|
|
|
class MaxVendorsReachedException(BusinessLogicException):
|
|
"""Raised when user tries to create more vendors than allowed."""
|
|
|
|
def __init__(self, max_vendors: int, user_id: int | None = None):
|
|
details = {"max_vendors": max_vendors}
|
|
if user_id:
|
|
details["user_id"] = user_id
|
|
|
|
super().__init__(
|
|
message=f"Maximum number of vendors reached ({max_vendors})",
|
|
error_code="MAX_VENDORS_REACHED",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class VendorAccessDeniedException(AuthorizationException):
|
|
"""Raised when no vendor context is available for an authenticated endpoint."""
|
|
|
|
def __init__(self, message: str = "No vendor context available"):
|
|
super().__init__(
|
|
message=message,
|
|
error_code="VENDOR_ACCESS_DENIED",
|
|
)
|
|
|
|
|
|
class VendorOwnerOnlyException(AuthorizationException):
|
|
"""Raised when operation requires vendor owner role."""
|
|
|
|
def __init__(self, operation: str, vendor_code: str | None = None):
|
|
details = {"operation": operation}
|
|
if vendor_code:
|
|
details["vendor_code"] = vendor_code
|
|
|
|
super().__init__(
|
|
message=f"Operation '{operation}' requires vendor owner role",
|
|
error_code="VENDOR_OWNER_ONLY",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class InsufficientVendorPermissionsException(AuthorizationException):
|
|
"""Raised when user lacks required vendor permission."""
|
|
|
|
def __init__(self, required_permission: str, vendor_code: str | None = None):
|
|
details = {"required_permission": required_permission}
|
|
if vendor_code:
|
|
details["vendor_code"] = vendor_code
|
|
|
|
super().__init__(
|
|
message=f"Permission required: {required_permission}",
|
|
error_code="INSUFFICIENT_VENDOR_PERMISSIONS",
|
|
details=details,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Company Exceptions
|
|
# =============================================================================
|
|
|
|
|
|
class CompanyNotFoundException(ResourceNotFoundException):
|
|
"""Raised when a company is not found."""
|
|
|
|
def __init__(self, company_identifier: str | int, identifier_type: str = "id"):
|
|
if identifier_type.lower() == "id":
|
|
message = f"Company with ID '{company_identifier}' not found"
|
|
else:
|
|
message = f"Company with name '{company_identifier}' not found"
|
|
|
|
super().__init__(
|
|
resource_type="Company",
|
|
identifier=str(company_identifier),
|
|
message=message,
|
|
error_code="COMPANY_NOT_FOUND",
|
|
)
|
|
|
|
|
|
class CompanyAlreadyExistsException(ConflictException):
|
|
"""Raised when trying to create a company that already exists."""
|
|
|
|
def __init__(self, company_name: str):
|
|
super().__init__(
|
|
message=f"Company with name '{company_name}' already exists",
|
|
error_code="COMPANY_ALREADY_EXISTS",
|
|
details={"company_name": company_name},
|
|
)
|
|
|
|
|
|
class CompanyNotActiveException(BusinessLogicException):
|
|
"""Raised when trying to perform operations on inactive company."""
|
|
|
|
def __init__(self, company_id: int):
|
|
super().__init__(
|
|
message=f"Company with ID '{company_id}' is not active",
|
|
error_code="COMPANY_NOT_ACTIVE",
|
|
details={"company_id": company_id},
|
|
)
|
|
|
|
|
|
class CompanyNotVerifiedException(BusinessLogicException):
|
|
"""Raised when trying to perform operations requiring verified company."""
|
|
|
|
def __init__(self, company_id: int):
|
|
super().__init__(
|
|
message=f"Company with ID '{company_id}' is not verified",
|
|
error_code="COMPANY_NOT_VERIFIED",
|
|
details={"company_id": company_id},
|
|
)
|
|
|
|
|
|
class UnauthorizedCompanyAccessException(AuthorizationException):
|
|
"""Raised when user tries to access company they don't own."""
|
|
|
|
def __init__(self, company_id: int, user_id: int | None = None):
|
|
details = {"company_id": company_id}
|
|
if user_id:
|
|
details["user_id"] = user_id
|
|
|
|
super().__init__(
|
|
message=f"Unauthorized access to company with ID '{company_id}'",
|
|
error_code="UNAUTHORIZED_COMPANY_ACCESS",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class InvalidCompanyDataException(ValidationException):
|
|
"""Raised when company data is invalid or incomplete."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Invalid company data",
|
|
field: str | None = None,
|
|
details: dict[str, Any] | None = None,
|
|
):
|
|
super().__init__(
|
|
message=message,
|
|
field=field,
|
|
details=details,
|
|
)
|
|
self.error_code = "INVALID_COMPANY_DATA"
|
|
|
|
|
|
class CompanyValidationException(ValidationException):
|
|
"""Raised when company validation fails."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Company validation failed",
|
|
field: str | None = None,
|
|
validation_errors: dict[str, str] | None = None,
|
|
):
|
|
details = {}
|
|
if validation_errors:
|
|
details["validation_errors"] = validation_errors
|
|
|
|
super().__init__(
|
|
message=message,
|
|
field=field,
|
|
details=details,
|
|
)
|
|
self.error_code = "COMPANY_VALIDATION_FAILED"
|
|
|
|
|
|
class CompanyHasVendorsException(BusinessLogicException):
|
|
"""Raised when trying to delete a company that still has active vendors."""
|
|
|
|
def __init__(self, company_id: int, vendor_count: int):
|
|
super().__init__(
|
|
message=f"Cannot delete company with ID '{company_id}' because it has {vendor_count} associated vendor(s)",
|
|
error_code="COMPANY_HAS_VENDORS",
|
|
details={"company_id": company_id, "vendor_count": vendor_count},
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Admin User Exceptions
|
|
# =============================================================================
|
|
|
|
|
|
class UserNotFoundException(ResourceNotFoundException):
|
|
"""Raised when user is not found in admin operations."""
|
|
|
|
def __init__(self, user_identifier: str, identifier_type: str = "ID"):
|
|
if identifier_type.lower() == "username":
|
|
message = f"User with username '{user_identifier}' not found"
|
|
elif identifier_type.lower() == "email":
|
|
message = f"User with email '{user_identifier}' not found"
|
|
else:
|
|
message = f"User with ID '{user_identifier}' not found"
|
|
|
|
super().__init__(
|
|
resource_type="User",
|
|
identifier=user_identifier,
|
|
message=message,
|
|
error_code="USER_NOT_FOUND",
|
|
)
|
|
|
|
|
|
class UserStatusChangeException(BusinessLogicException):
|
|
"""Raised when user status cannot be changed."""
|
|
|
|
def __init__(
|
|
self,
|
|
user_id: int,
|
|
current_status: str,
|
|
attempted_action: str,
|
|
reason: str | None = None,
|
|
):
|
|
message = f"Cannot {attempted_action} user {user_id} (current status: {current_status})"
|
|
if reason:
|
|
message += f": {reason}"
|
|
|
|
super().__init__(
|
|
message=message,
|
|
error_code="USER_STATUS_CHANGE_FAILED",
|
|
details={
|
|
"user_id": user_id,
|
|
"current_status": current_status,
|
|
"attempted_action": attempted_action,
|
|
"reason": reason,
|
|
},
|
|
)
|
|
|
|
|
|
class AdminOperationException(BusinessLogicException):
|
|
"""Raised when admin operation fails."""
|
|
|
|
def __init__(
|
|
self,
|
|
operation: str,
|
|
reason: str,
|
|
target_type: str | None = None,
|
|
target_id: str | None = None,
|
|
):
|
|
message = f"Admin operation '{operation}' failed: {reason}"
|
|
|
|
details = {
|
|
"operation": operation,
|
|
"reason": reason,
|
|
}
|
|
|
|
if target_type:
|
|
details["target_type"] = target_type
|
|
if target_id:
|
|
details["target_id"] = target_id
|
|
|
|
super().__init__(
|
|
message=message,
|
|
error_code="ADMIN_OPERATION_FAILED",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class CannotModifyAdminException(AuthorizationException):
|
|
"""Raised when trying to modify another admin user."""
|
|
|
|
def __init__(self, target_user_id: int, admin_user_id: int):
|
|
super().__init__(
|
|
message=f"Cannot modify admin user {target_user_id}",
|
|
error_code="CANNOT_MODIFY_ADMIN",
|
|
details={
|
|
"target_user_id": target_user_id,
|
|
"admin_user_id": admin_user_id,
|
|
},
|
|
)
|
|
|
|
|
|
class CannotModifySelfException(BusinessLogicException):
|
|
"""Raised when admin tries to modify their own status."""
|
|
|
|
def __init__(self, user_id: int, operation: str):
|
|
super().__init__(
|
|
message=f"Cannot perform '{operation}' on your own account",
|
|
error_code="CANNOT_MODIFY_SELF",
|
|
details={
|
|
"user_id": user_id,
|
|
"operation": operation,
|
|
},
|
|
)
|
|
|
|
|
|
class InvalidAdminActionException(ValidationException):
|
|
"""Raised when admin action is invalid."""
|
|
|
|
def __init__(
|
|
self,
|
|
action: str,
|
|
reason: str,
|
|
valid_actions: list | None = None,
|
|
):
|
|
details = {
|
|
"action": action,
|
|
"reason": reason,
|
|
}
|
|
|
|
if valid_actions:
|
|
details["valid_actions"] = valid_actions
|
|
|
|
super().__init__(
|
|
message=f"Invalid admin action '{action}': {reason}",
|
|
details=details,
|
|
)
|
|
self.error_code = "INVALID_ADMIN_ACTION"
|
|
|
|
|
|
class BulkOperationException(BusinessLogicException):
|
|
"""Raised when bulk admin operation fails."""
|
|
|
|
def __init__(
|
|
self,
|
|
operation: str,
|
|
total_items: int,
|
|
failed_items: int,
|
|
errors: dict[str, Any] | None = None,
|
|
):
|
|
message = f"Bulk {operation} completed with errors: {failed_items}/{total_items} failed"
|
|
|
|
details = {
|
|
"operation": operation,
|
|
"total_items": total_items,
|
|
"failed_items": failed_items,
|
|
"success_items": total_items - failed_items,
|
|
}
|
|
|
|
if errors:
|
|
details["errors"] = errors
|
|
|
|
super().__init__(
|
|
message=message,
|
|
error_code="BULK_OPERATION_PARTIAL_FAILURE",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class ConfirmationRequiredException(BusinessLogicException):
|
|
"""Raised when a destructive operation requires explicit confirmation."""
|
|
|
|
def __init__(
|
|
self,
|
|
operation: str,
|
|
message: str | None = None,
|
|
confirmation_param: str = "confirm",
|
|
):
|
|
if not message:
|
|
message = f"Operation '{operation}' requires confirmation parameter: {confirmation_param}=true"
|
|
|
|
super().__init__(
|
|
message=message,
|
|
error_code="CONFIRMATION_REQUIRED",
|
|
details={
|
|
"operation": operation,
|
|
"confirmation_param": confirmation_param,
|
|
},
|
|
)
|
|
|
|
|
|
class VendorVerificationException(BusinessLogicException):
|
|
"""Raised when vendor verification fails."""
|
|
|
|
def __init__(
|
|
self,
|
|
vendor_id: int,
|
|
reason: str,
|
|
current_verification_status: bool | None = None,
|
|
):
|
|
details = {
|
|
"vendor_id": vendor_id,
|
|
"reason": reason,
|
|
}
|
|
|
|
if current_verification_status is not None:
|
|
details["current_verification_status"] = current_verification_status
|
|
|
|
super().__init__(
|
|
message=f"Vendor verification failed for vendor {vendor_id}: {reason}",
|
|
error_code="VENDOR_VERIFICATION_FAILED",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class UserCannotBeDeletedException(BusinessLogicException):
|
|
"""Raised when a user cannot be deleted due to ownership constraints."""
|
|
|
|
def __init__(self, user_id: int, reason: str, owned_count: int = 0):
|
|
details = {
|
|
"user_id": user_id,
|
|
"reason": reason,
|
|
}
|
|
if owned_count > 0:
|
|
details["owned_companies_count"] = owned_count
|
|
|
|
super().__init__(
|
|
message=f"Cannot delete user {user_id}: {reason}",
|
|
error_code="USER_CANNOT_BE_DELETED",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class UserRoleChangeException(BusinessLogicException):
|
|
"""Raised when user role cannot be changed."""
|
|
|
|
def __init__(self, user_id: int, current_role: str, target_role: str, reason: str):
|
|
super().__init__(
|
|
message=f"Cannot change user {user_id} role from {current_role} to {target_role}: {reason}",
|
|
error_code="USER_ROLE_CHANGE_FAILED",
|
|
details={
|
|
"user_id": user_id,
|
|
"current_role": current_role,
|
|
"target_role": target_role,
|
|
"reason": reason,
|
|
},
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Team Exceptions
|
|
# =============================================================================
|
|
|
|
|
|
class TeamMemberNotFoundException(ResourceNotFoundException):
|
|
"""Raised when a team member is not found."""
|
|
|
|
def __init__(self, user_id: int, vendor_id: int | None = None):
|
|
details = {"user_id": user_id}
|
|
if vendor_id:
|
|
details["vendor_id"] = vendor_id
|
|
message = f"Team member with user ID '{user_id}' not found in vendor {vendor_id}"
|
|
else:
|
|
message = f"Team member with user ID '{user_id}' not found"
|
|
|
|
super().__init__(
|
|
resource_type="TeamMember",
|
|
identifier=str(user_id),
|
|
message=message,
|
|
error_code="TEAM_MEMBER_NOT_FOUND",
|
|
)
|
|
self.details.update(details)
|
|
|
|
|
|
class TeamMemberAlreadyExistsException(ConflictException):
|
|
"""Raised when trying to add a user who is already a team member."""
|
|
|
|
def __init__(self, user_id: int, vendor_id: int):
|
|
super().__init__(
|
|
message=f"User {user_id} is already a team member of vendor {vendor_id}",
|
|
error_code="TEAM_MEMBER_ALREADY_EXISTS",
|
|
details={
|
|
"user_id": user_id,
|
|
"vendor_id": vendor_id,
|
|
},
|
|
)
|
|
|
|
|
|
class TeamInvitationNotFoundException(ResourceNotFoundException):
|
|
"""Raised when a team invitation is not found."""
|
|
|
|
def __init__(self, invitation_token: str):
|
|
super().__init__(
|
|
resource_type="TeamInvitation",
|
|
identifier=invitation_token,
|
|
message=f"Team invitation with token '{invitation_token}' not found or expired",
|
|
error_code="TEAM_INVITATION_NOT_FOUND",
|
|
)
|
|
|
|
|
|
class TeamInvitationExpiredException(BusinessLogicException):
|
|
"""Raised when trying to accept an expired invitation."""
|
|
|
|
def __init__(self, invitation_token: str):
|
|
super().__init__(
|
|
message="Team invitation has expired",
|
|
error_code="TEAM_INVITATION_EXPIRED",
|
|
details={"invitation_token": invitation_token},
|
|
)
|
|
|
|
|
|
class TeamInvitationAlreadyAcceptedException(ConflictException):
|
|
"""Raised when trying to accept an already accepted invitation."""
|
|
|
|
def __init__(self, invitation_token: str):
|
|
super().__init__(
|
|
message="Team invitation has already been accepted",
|
|
error_code="TEAM_INVITATION_ALREADY_ACCEPTED",
|
|
details={"invitation_token": invitation_token},
|
|
)
|
|
|
|
|
|
class UnauthorizedTeamActionException(AuthorizationException):
|
|
"""Raised when user tries to perform team action without permission."""
|
|
|
|
def __init__(
|
|
self,
|
|
action: str,
|
|
user_id: int | None = None,
|
|
required_permission: str | None = None,
|
|
):
|
|
details = {"action": action}
|
|
if user_id:
|
|
details["user_id"] = user_id
|
|
if required_permission:
|
|
details["required_permission"] = required_permission
|
|
|
|
super().__init__(
|
|
message=f"Unauthorized to perform action: {action}",
|
|
error_code="UNAUTHORIZED_TEAM_ACTION",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class CannotRemoveOwnerException(BusinessLogicException):
|
|
"""Raised when trying to remove the vendor owner from team."""
|
|
|
|
def __init__(self, user_id: int, vendor_id: int):
|
|
super().__init__(
|
|
message="Cannot remove vendor owner from team",
|
|
error_code="CANNOT_REMOVE_OWNER",
|
|
details={
|
|
"user_id": user_id,
|
|
"vendor_id": vendor_id,
|
|
},
|
|
)
|
|
|
|
|
|
class CannotModifyOwnRoleException(BusinessLogicException):
|
|
"""Raised when user tries to modify their own role."""
|
|
|
|
def __init__(self, user_id: int):
|
|
super().__init__(
|
|
message="Cannot modify your own role",
|
|
error_code="CANNOT_MODIFY_OWN_ROLE",
|
|
details={"user_id": user_id},
|
|
)
|
|
|
|
|
|
class RoleNotFoundException(ResourceNotFoundException):
|
|
"""Raised when a role is not found."""
|
|
|
|
def __init__(self, role_id: int, vendor_id: int | None = None):
|
|
details = {"role_id": role_id}
|
|
if vendor_id:
|
|
details["vendor_id"] = vendor_id
|
|
message = f"Role with ID '{role_id}' not found in vendor {vendor_id}"
|
|
else:
|
|
message = f"Role with ID '{role_id}' not found"
|
|
|
|
super().__init__(
|
|
resource_type="Role",
|
|
identifier=str(role_id),
|
|
message=message,
|
|
error_code="ROLE_NOT_FOUND",
|
|
)
|
|
self.details.update(details)
|
|
|
|
|
|
class InvalidRoleException(ValidationException):
|
|
"""Raised when role data is invalid."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Invalid role data",
|
|
field: str | None = None,
|
|
details: dict[str, Any] | None = None,
|
|
):
|
|
super().__init__(
|
|
message=message,
|
|
field=field,
|
|
details=details,
|
|
)
|
|
self.error_code = "INVALID_ROLE_DATA"
|
|
|
|
|
|
class InsufficientTeamPermissionsException(AuthorizationException):
|
|
"""Raised when user lacks required team permissions for an action."""
|
|
|
|
def __init__(
|
|
self,
|
|
required_permission: str,
|
|
user_id: int | None = None,
|
|
action: str | None = None,
|
|
):
|
|
details = {"required_permission": required_permission}
|
|
if user_id:
|
|
details["user_id"] = user_id
|
|
if action:
|
|
details["action"] = action
|
|
|
|
message = f"Insufficient team permissions. Required: {required_permission}"
|
|
|
|
super().__init__(
|
|
message=message,
|
|
error_code="INSUFFICIENT_TEAM_PERMISSIONS",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class MaxTeamMembersReachedException(BusinessLogicException):
|
|
"""Raised when vendor has reached maximum team members limit."""
|
|
|
|
def __init__(self, max_members: int, vendor_id: int):
|
|
super().__init__(
|
|
message=f"Maximum number of team members reached ({max_members})",
|
|
error_code="MAX_TEAM_MEMBERS_REACHED",
|
|
details={
|
|
"max_members": max_members,
|
|
"vendor_id": vendor_id,
|
|
},
|
|
)
|
|
|
|
|
|
class TeamValidationException(ValidationException):
|
|
"""Raised when team operation validation fails."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Team operation validation failed",
|
|
field: str | None = None,
|
|
validation_errors: dict[str, str] | None = None,
|
|
):
|
|
details = {}
|
|
if validation_errors:
|
|
details["validation_errors"] = validation_errors
|
|
|
|
super().__init__(
|
|
message=message,
|
|
field=field,
|
|
details=details,
|
|
)
|
|
self.error_code = "TEAM_VALIDATION_FAILED"
|
|
|
|
|
|
class InvalidInvitationDataException(ValidationException):
|
|
"""Raised when team invitation data is invalid."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Invalid invitation data",
|
|
field: str | None = None,
|
|
details: dict[str, Any] | None = None,
|
|
):
|
|
super().__init__(
|
|
message=message,
|
|
field=field,
|
|
details=details,
|
|
)
|
|
self.error_code = "INVALID_INVITATION_DATA"
|
|
|
|
|
|
class InvalidInvitationTokenException(ValidationException):
|
|
"""Raised when invitation token is invalid, expired, or already used."""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Invalid or expired invitation token",
|
|
invitation_token: str | None = None,
|
|
):
|
|
details = {}
|
|
if invitation_token:
|
|
details["invitation_token"] = invitation_token
|
|
|
|
super().__init__(
|
|
message=message,
|
|
field="invitation_token",
|
|
details=details,
|
|
)
|
|
self.error_code = "INVALID_INVITATION_TOKEN"
|
|
|
|
|
|
# =============================================================================
|
|
# Vendor Domain Exceptions
|
|
# =============================================================================
|
|
|
|
|
|
class VendorDomainNotFoundException(ResourceNotFoundException):
|
|
"""Raised when a vendor domain is not found."""
|
|
|
|
def __init__(self, domain_identifier: str, identifier_type: str = "ID"):
|
|
if identifier_type.lower() == "domain":
|
|
message = f"Domain '{domain_identifier}' not found"
|
|
else:
|
|
message = f"Domain with ID '{domain_identifier}' not found"
|
|
|
|
super().__init__(
|
|
resource_type="VendorDomain",
|
|
identifier=domain_identifier,
|
|
message=message,
|
|
error_code="VENDOR_DOMAIN_NOT_FOUND",
|
|
)
|
|
|
|
|
|
class VendorDomainAlreadyExistsException(ConflictException):
|
|
"""Raised when trying to add a domain that already exists."""
|
|
|
|
def __init__(self, domain: str, existing_vendor_id: int | None = None):
|
|
details = {"domain": domain}
|
|
if existing_vendor_id:
|
|
details["existing_vendor_id"] = existing_vendor_id
|
|
|
|
super().__init__(
|
|
message=f"Domain '{domain}' is already registered",
|
|
error_code="VENDOR_DOMAIN_ALREADY_EXISTS",
|
|
details=details,
|
|
)
|
|
|
|
|
|
class InvalidDomainFormatException(ValidationException):
|
|
"""Raised when domain format is invalid."""
|
|
|
|
def __init__(self, domain: str, reason: str = "Invalid domain format"):
|
|
super().__init__(
|
|
message=f"{reason}: {domain}",
|
|
field="domain",
|
|
details={"domain": domain, "reason": reason},
|
|
)
|
|
self.error_code = "INVALID_DOMAIN_FORMAT"
|
|
|
|
|
|
class ReservedDomainException(ValidationException):
|
|
"""Raised when trying to use a reserved domain."""
|
|
|
|
def __init__(self, domain: str, reserved_part: str):
|
|
super().__init__(
|
|
message=f"Domain cannot use reserved subdomain: {reserved_part}",
|
|
field="domain",
|
|
details={"domain": domain, "reserved_part": reserved_part},
|
|
)
|
|
self.error_code = "RESERVED_DOMAIN"
|
|
|
|
|
|
class DomainNotVerifiedException(BusinessLogicException):
|
|
"""Raised when trying to activate an unverified domain."""
|
|
|
|
def __init__(self, domain_id: int, domain: str):
|
|
super().__init__(
|
|
message=f"Domain '{domain}' must be verified before activation",
|
|
error_code="DOMAIN_NOT_VERIFIED",
|
|
details={"domain_id": domain_id, "domain": domain},
|
|
)
|
|
|
|
|
|
class DomainVerificationFailedException(BusinessLogicException):
|
|
"""Raised when domain verification fails."""
|
|
|
|
def __init__(self, domain: str, reason: str):
|
|
super().__init__(
|
|
message=f"Domain verification failed for '{domain}': {reason}",
|
|
error_code="DOMAIN_VERIFICATION_FAILED",
|
|
details={"domain": domain, "reason": reason},
|
|
)
|
|
|
|
|
|
class DomainAlreadyVerifiedException(BusinessLogicException):
|
|
"""Raised when trying to verify an already verified domain."""
|
|
|
|
def __init__(self, domain_id: int, domain: str):
|
|
super().__init__(
|
|
message=f"Domain '{domain}' is already verified",
|
|
error_code="DOMAIN_ALREADY_VERIFIED",
|
|
details={"domain_id": domain_id, "domain": domain},
|
|
)
|
|
|
|
|
|
class MultiplePrimaryDomainsException(BusinessLogicException):
|
|
"""Raised when trying to set multiple primary domains."""
|
|
|
|
def __init__(self, vendor_id: int):
|
|
super().__init__(
|
|
message="Vendor can only have one primary domain",
|
|
error_code="MULTIPLE_PRIMARY_DOMAINS",
|
|
details={"vendor_id": vendor_id},
|
|
)
|
|
|
|
|
|
class DNSVerificationException(ExternalServiceException):
|
|
"""Raised when DNS verification service fails."""
|
|
|
|
def __init__(self, domain: str, reason: str):
|
|
super().__init__(
|
|
service_name="DNS",
|
|
message=f"DNS verification failed for '{domain}': {reason}",
|
|
error_code="DNS_VERIFICATION_ERROR",
|
|
details={"domain": domain, "reason": reason},
|
|
)
|
|
|
|
|
|
class MaxDomainsReachedException(BusinessLogicException):
|
|
"""Raised when vendor tries to add more domains than allowed."""
|
|
|
|
def __init__(self, vendor_id: int, max_domains: int):
|
|
super().__init__(
|
|
message=f"Maximum number of domains reached ({max_domains})",
|
|
error_code="MAX_DOMAINS_REACHED",
|
|
details={"vendor_id": vendor_id, "max_domains": max_domains},
|
|
)
|
|
|
|
|
|
class UnauthorizedDomainAccessException(BusinessLogicException):
|
|
"""Raised when trying to access domain that doesn't belong to vendor."""
|
|
|
|
def __init__(self, domain_id: int, vendor_id: int):
|
|
super().__init__(
|
|
message=f"Unauthorized access to domain {domain_id}",
|
|
error_code="UNAUTHORIZED_DOMAIN_ACCESS",
|
|
details={"domain_id": domain_id, "vendor_id": vendor_id},
|
|
)
|
|
|
|
|
|
__all__ = [
|
|
# Auth
|
|
"InvalidCredentialsException",
|
|
"TokenExpiredException",
|
|
"InvalidTokenException",
|
|
"InsufficientPermissionsException",
|
|
"UserNotActiveException",
|
|
"AdminRequiredException",
|
|
"UserAlreadyExistsException",
|
|
# Platform
|
|
"PlatformNotFoundException",
|
|
"PlatformInactiveException",
|
|
"PlatformUpdateException",
|
|
# Vendor
|
|
"VendorNotFoundException",
|
|
"VendorAlreadyExistsException",
|
|
"VendorNotActiveException",
|
|
"VendorNotVerifiedException",
|
|
"UnauthorizedVendorAccessException",
|
|
"InvalidVendorDataException",
|
|
"VendorValidationException",
|
|
"MaxVendorsReachedException",
|
|
"VendorAccessDeniedException",
|
|
"VendorOwnerOnlyException",
|
|
"InsufficientVendorPermissionsException",
|
|
# Company
|
|
"CompanyNotFoundException",
|
|
"CompanyAlreadyExistsException",
|
|
"CompanyNotActiveException",
|
|
"CompanyNotVerifiedException",
|
|
"UnauthorizedCompanyAccessException",
|
|
"InvalidCompanyDataException",
|
|
"CompanyValidationException",
|
|
"CompanyHasVendorsException",
|
|
# Admin User
|
|
"UserNotFoundException",
|
|
"UserStatusChangeException",
|
|
"AdminOperationException",
|
|
"CannotModifyAdminException",
|
|
"CannotModifySelfException",
|
|
"InvalidAdminActionException",
|
|
"BulkOperationException",
|
|
"ConfirmationRequiredException",
|
|
"VendorVerificationException",
|
|
"UserCannotBeDeletedException",
|
|
"UserRoleChangeException",
|
|
# Team
|
|
"TeamMemberNotFoundException",
|
|
"TeamMemberAlreadyExistsException",
|
|
"TeamInvitationNotFoundException",
|
|
"TeamInvitationExpiredException",
|
|
"TeamInvitationAlreadyAcceptedException",
|
|
"UnauthorizedTeamActionException",
|
|
"CannotRemoveOwnerException",
|
|
"CannotModifyOwnRoleException",
|
|
"RoleNotFoundException",
|
|
"InvalidRoleException",
|
|
"InsufficientTeamPermissionsException",
|
|
"MaxTeamMembersReachedException",
|
|
"TeamValidationException",
|
|
"InvalidInvitationDataException",
|
|
"InvalidInvitationTokenException",
|
|
# Vendor Domain
|
|
"VendorDomainNotFoundException",
|
|
"VendorDomainAlreadyExistsException",
|
|
"InvalidDomainFormatException",
|
|
"ReservedDomainException",
|
|
"DomainNotVerifiedException",
|
|
"DomainVerificationFailedException",
|
|
"DomainAlreadyVerifiedException",
|
|
"MultiplePrimaryDomainsException",
|
|
"DNSVerificationException",
|
|
"MaxDomainsReachedException",
|
|
"UnauthorizedDomainAccessException",
|
|
]
|