Files
orion/app/exceptions/marketplace.py

201 lines
5.9 KiB
Python

# app/exceptions/marketplace.py
"""
Marketplace import specific exceptions.
"""
from typing import Any, Dict, Optional
from .base import (
ResourceNotFoundException,
ValidationException,
BusinessLogicException,
AuthorizationException,
ExternalServiceException
)
class MarketplaceImportException(BusinessLogicException):
"""Base exception for marketplace import operations."""
def __init__(
self,
message: str,
error_code: str = "MARKETPLACE_IMPORT_ERROR",
marketplace: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
if not details:
details = {}
if marketplace:
details["marketplace"] = marketplace
super().__init__(
message=message,
error_code=error_code,
details=details,
)
class ImportJobNotFoundException(ResourceNotFoundException):
"""Raised when import job is not found."""
def __init__(self, job_id: int):
super().__init__(
resource_type="ImportJob",
identifier=str(job_id),
message=f"Import job with ID '{job_id}' not found",
error_code="IMPORT_JOB_NOT_FOUND",
)
class ImportJobNotOwnedException(AuthorizationException):
"""Raised when user tries to access import job they don't own."""
def __init__(self, job_id: int, user_id: Optional[int] = None):
details = {"job_id": job_id}
if user_id:
details["user_id"] = user_id
super().__init__(
message=f"Unauthorized access to import job '{job_id}'",
error_code="IMPORT_JOB_NOT_OWNED",
details=details,
)
class InvalidImportDataException(ValidationException):
"""Raised when import data is invalid."""
def __init__(
self,
message: str = "Invalid import data",
field: Optional[str] = None,
row_number: Optional[int] = None,
details: Optional[Dict[str, Any]] = None,
):
if not details:
details = {}
if row_number:
details["row_number"] = row_number
super().__init__(
message=message,
field=field,
details=details,
)
self.error_code = "INVALID_IMPORT_DATA"
class ImportJobCannotBeCancelledException(BusinessLogicException):
"""Raised when trying to cancel job that cannot be cancelled."""
def __init__(self, job_id: int, current_status: str):
super().__init__(
message=f"Import job '{job_id}' cannot be cancelled (current status: {current_status})",
error_code="IMPORT_JOB_CANNOT_BE_CANCELLED",
details={
"job_id": job_id,
"current_status": current_status,
},
)
class ImportJobCannotBeDeletedException(BusinessLogicException):
"""Raised when trying to delete job that cannot be deleted."""
def __init__(self, job_id: int, current_status: str):
super().__init__(
message=f"Import job '{job_id}' cannot be deleted (current status: {current_status})",
error_code="IMPORT_JOB_CANNOT_BE_DELETED",
details={
"job_id": job_id,
"current_status": current_status,
},
)
class MarketplaceConnectionException(ExternalServiceException):
"""Raised when marketplace connection fails."""
def __init__(self, marketplace: str, message: str = "Failed to connect to marketplace"):
super().__init__(
service=marketplace,
message=f"{message}: {marketplace}",
error_code="MARKETPLACE_CONNECTION_FAILED",
)
class MarketplaceDataParsingException(ValidationException):
"""Raised when marketplace data cannot be parsed."""
def __init__(
self,
marketplace: str,
message: str = "Failed to parse marketplace data",
details: Optional[Dict[str, Any]] = None,
):
if not details:
details = {}
details["marketplace"] = marketplace
super().__init__(
message=f"{message} from {marketplace}",
details=details,
)
self.error_code = "MARKETPLACE_DATA_PARSING_FAILED"
class ImportRateLimitException(BusinessLogicException):
"""Raised when import rate limit is exceeded."""
def __init__(
self,
max_imports: int,
time_window: str,
retry_after: Optional[int] = None,
):
details = {
"max_imports": max_imports,
"time_window": time_window,
}
if retry_after:
details["retry_after"] = retry_after
super().__init__(
message=f"Import rate limit exceeded: {max_imports} imports per {time_window}",
error_code="IMPORT_RATE_LIMIT_EXCEEDED",
details=details,
)
class InvalidMarketplaceException(ValidationException):
"""Raised when marketplace is not supported."""
def __init__(self, marketplace: str, supported_marketplaces: Optional[list] = None):
details = {"marketplace": marketplace}
if supported_marketplaces:
details["supported_marketplaces"] = supported_marketplaces
super().__init__(
message=f"Unsupported marketplace: {marketplace}",
field="marketplace",
details=details,
)
self.error_code = "INVALID_MARKETPLACE"
class ImportJobAlreadyProcessingException(BusinessLogicException):
"""Raised when trying to start import while another is already processing."""
def __init__(self, shop_code: str, existing_job_id: int):
super().__init__(
message=f"Import already in progress for shop '{shop_code}'",
error_code="IMPORT_JOB_ALREADY_PROCESSING",
details={
"shop_code": shop_code,
"existing_job_id": existing_job_id,
},
)