Files
orion/app/modules/marketplace/docs/architecture.md
Samir Boulahtit f141cc4e6a docs: migrate module documentation to single source of truth
Move 39 documentation files from top-level docs/ into each module's
docs/ folder, accessible via symlinks from docs/modules/. Create
data-model.md files for 10 modules with full schema documentation.
Replace originals with redirect stubs. Remove empty guide stubs.

Modules migrated: tenancy, billing, loyalty, marketplace, orders,
messaging, cms, catalog, inventory, hosting, prospecting.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 23:38:37 +01:00

41 KiB

Multi-Marketplace Integration Architecture

Executive Summary

This document defines the complete architecture for integrating Orion with multiple external marketplaces (Letzshop, Amazon, eBay) and digital product suppliers (CodesWholesale). The integration is bidirectional, supporting both inbound flows (products, orders) and outbound flows (inventory sync, fulfillment status).

Key Capabilities:

Capability Description
Multi-Marketplace Products Import and normalize products from multiple sources
Multi-Language Support Translations with language fallback
Unified Order Management Orders from all channels in one place
Digital Product Fulfillment On-demand license key retrieval from suppliers
Inventory Sync Real-time and scheduled stock updates to marketplaces
Fulfillment Sync Order status and tracking back to marketplaces

System Overview

graph TB
    subgraph "External Systems"
        LS[Letzshop<br/>CSV + GraphQL]
        AZ[Amazon<br/>API]
        EB[eBay<br/>API]
        CW[CodesWholesale<br/>Digital Supplier API]
        WS[Store Storefront<br/>Orion Shop]
    end

    subgraph "Integration Layer"
        subgraph "Inbound Adapters"
            PI[Product Importers]
            OI[Order Importers]
            DPI[Digital Product Importer]
        end
        subgraph "Outbound Adapters"
            IS[Inventory Sync]
            FS[Fulfillment Sync]
            PE[Product Export]
        end
    end

    subgraph "Orion Core"
        MP[Marketplace Products]
        P[Store Products]
        O[Unified Orders]
        I[Inventory]
        F[Fulfillment]
        DL[Digital License Pool]
    end

    LS -->|CSV Import| PI
    AZ -->|API Pull| PI
    EB -->|API Pull| PI
    CW -->|Catalog Sync| DPI

    LS -->|GraphQL Poll| OI
    AZ -->|API Poll| OI
    EB -->|API Poll| OI
    WS -->|Direct| O

    PI --> MP
    DPI --> MP
    MP --> P
    OI --> O

    I -->|Real-time/Scheduled| IS
    F -->|Status Update| FS
    P -->|CSV Export| PE

    IS --> LS
    IS --> AZ
    IS --> EB
    FS --> LS
    FS --> AZ
    FS --> EB
    PE --> LS

    CW -->|On-demand Keys| DL
    DL --> F

Integration Phases

Phase Scope Priority Dependencies
Phase 1 Product Import High None
Phase 2 Order Import High Phase 1
Phase 3 Order Fulfillment Sync High Phase 2
Phase 4 Inventory Sync Medium Phase 1

Marketplace Capabilities Matrix

Marketplace Products In Products Out Orders In Fulfillment Out Inventory Out Method
Letzshop CSV Import CSV Export GraphQL Poll GraphQL CSV/GraphQL File + API
Amazon API N/A API Poll API API (Real-time) API
eBay API N/A API Poll API API (Real-time) API
CodesWholesale API Catalog N/A N/A On-demand Keys N/A API
Store Storefront N/A N/A Direct DB Internal Internal Direct

Part 1: Product Integration

1.1 Architecture Overview

graph TB
    subgraph "Source Layer"
        LS_CSV[Letzshop CSV<br/>Multi-language feeds]
        AZ_API[Amazon API<br/>Product catalog]
        EB_API[eBay API<br/>Product catalog]
        CW_API[CodesWholesale API<br/>Digital catalog]
    end

    subgraph "Import Layer"
        LSI[LetzshopImporter]
        AZI[AmazonImporter]
        EBI[EbayImporter]
        CWI[CodesWholesaleImporter]
    end

    subgraph "Canonical Layer"
        MP[(marketplace_products)]
        MPT[(marketplace_product_translations)]
    end

    subgraph "Store Layer"
        P[(products)]
        PT[(product_translations)]
    end

    LS_CSV --> LSI
    AZ_API --> AZI
    EB_API --> EBI
    CW_API --> CWI

    LSI --> MP
    AZI --> MP
    EBI --> MP
    CWI --> MP

    MP --> MPT
    MP --> P
    P --> PT

1.2 Digital Product Supplier Integration (CodesWholesale)

CodesWholesale provides digital products (game keys, gift cards, software licenses) that need special handling:

sequenceDiagram
    participant CW as CodesWholesale API
    participant Sync as Catalog Sync Job
    participant MP as marketplace_products
    participant P as products
    participant LP as License Pool

    Note over Sync: Scheduled catalog sync (e.g., every 6 hours)
    Sync->>CW: GET /products (catalog)
    CW-->>Sync: Product catalog with prices, availability
    Sync->>MP: Upsert products (marketplace='codeswholesale')
    Sync->>MP: Update prices, availability flags

    Note over P: Store adds product to their catalog
    P->>MP: Link to marketplace_product

    Note over LP: Order placed - need license key
    LP->>CW: POST /orders (purchase key on-demand)
    CW-->>LP: License key / download link
    LP->>LP: Store for fulfillment

Key Characteristics:

Aspect Behavior
Catalog Sync Scheduled job fetches full catalog, updates prices/availability
License Keys Purchased on-demand at fulfillment time (not pre-stocked)
Inventory Virtual - always "available" but subject to supplier stock
Pricing Dynamic - supplier prices may change, store sets markup
Regions Products may have region restrictions (EU, US, Global)

1.3 Product Data Model

See Multi-Marketplace Product Architecture for detailed schema.

Summary of tables:

Table Purpose
marketplace_products Canonical product data from all sources
marketplace_product_translations Localized titles, descriptions per language
products Store-specific overrides and settings
product_translations Store-specific localized overrides

1.4 Import Job Flow

stateDiagram-v2
    [*] --> Pending: Job Created
    Pending --> Processing: Worker picks up
    Processing --> Downloading: Fetch source data
    Downloading --> Parsing: Parse rows
    Parsing --> Upserting: Update database
    Upserting --> Completed: All rows processed
    Upserting --> PartiallyCompleted: Some rows failed
    Processing --> Failed: Fatal error
    Completed --> [*]
    PartiallyCompleted --> [*]
    Failed --> [*]

Part 2: Order Integration

2.1 Unified Order Model

Orders from all channels (marketplaces + store storefront) flow into a unified order management system.

graph TB
    subgraph "Order Sources"
        LS_O[Letzshop Orders<br/>GraphQL Poll]
        AZ_O[Amazon Orders<br/>API Poll]
        EB_O[eBay Orders<br/>API Poll]
        VS_O[Store Storefront<br/>Direct]
    end

    subgraph "Order Import"
        OI[Order Importer Service]
        OQ[Order Import Queue]
    end

    subgraph "Unified Orders"
        O[(orders)]
        OI_T[(order_items)]
        OS[(order_status_history)]
    end

    LS_O -->|Poll every N min| OI
    AZ_O -->|Poll every N min| OI
    EB_O -->|Poll every N min| OI
    VS_O -->|Direct insert| O

    OI --> OQ
    OQ --> O
    O --> OI_T
    O --> OS

2.2 Order Data Model

class OrderChannel(str, Enum):
    """Order source channel."""
    STOREFRONT = "storefront"      # Store's own Orion shop
    LETZSHOP = "letzshop"
    AMAZON = "amazon"
    EBAY = "ebay"

class OrderStatus(str, Enum):
    """Unified order status."""
    PENDING = "pending"                    # Awaiting payment/confirmation
    CONFIRMED = "confirmed"                # Payment confirmed
    PROCESSING = "processing"              # Being prepared
    READY_FOR_SHIPMENT = "ready_for_shipment"  # Physical: packed
    SHIPPED = "shipped"                    # Physical: in transit
    DELIVERED = "delivered"                # Physical: delivered
    FULFILLED = "fulfilled"                # Digital: key/download sent
    CANCELLED = "cancelled"
    REFUNDED = "refunded"
    PARTIALLY_REFUNDED = "partially_refunded"

class Order(Base, TimestampMixin):
    """Unified order from all channels."""
    __tablename__ = "orders"

    id = Column(Integer, primary_key=True)
    store_id = Column(Integer, ForeignKey("stores.id"), nullable=False)

    # === CHANNEL TRACKING ===
    channel = Column(SQLEnum(OrderChannel), nullable=False, index=True)
    channel_order_id = Column(String, index=True)  # External order ID
    channel_order_url = Column(String)  # Link to order in marketplace

    # === CUSTOMER INFO ===
    customer_id = Column(Integer, ForeignKey("customers.id"), nullable=True)
    customer_email = Column(String, nullable=False)
    customer_name = Column(String)
    customer_phone = Column(String)

    # === ADDRESSES ===
    shipping_address = Column(JSON)  # For physical products
    billing_address = Column(JSON)

    # === ORDER TOTALS ===
    subtotal = Column(Float, nullable=False)
    shipping_cost = Column(Float, default=0)
    tax_amount = Column(Float, default=0)
    discount_amount = Column(Float, default=0)
    total = Column(Float, nullable=False)
    currency = Column(String(3), default="EUR")

    # === STATUS ===
    status = Column(SQLEnum(OrderStatus), default=OrderStatus.PENDING, index=True)

    # === FULFILLMENT TYPE ===
    requires_shipping = Column(Boolean, default=True)  # False for digital-only
    is_fully_digital = Column(Boolean, default=False)

    # === TIMESTAMPS ===
    ordered_at = Column(DateTime, nullable=False)  # When customer placed order
    confirmed_at = Column(DateTime)
    shipped_at = Column(DateTime)
    delivered_at = Column(DateTime)
    fulfilled_at = Column(DateTime)  # For digital products

    # === SYNC STATUS ===
    last_synced_at = Column(DateTime)  # Last sync with marketplace
    sync_status = Column(String)  # 'synced', 'pending', 'error'
    sync_error = Column(Text)

    # === RELATIONSHIPS ===
    store = relationship("Store", back_populates="orders")
    customer = relationship("Customer", back_populates="orders")
    items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")
    status_history = relationship("OrderStatusHistory", back_populates="order")

    __table_args__ = (
        Index("idx_order_store_status", "store_id", "status"),
        Index("idx_order_channel", "channel", "channel_order_id"),
        Index("idx_order_store_date", "store_id", "ordered_at"),
    )


class OrderItem(Base, TimestampMixin):
    """Individual item in an order."""
    __tablename__ = "order_items"

    id = Column(Integer, primary_key=True)
    order_id = Column(Integer, ForeignKey("orders.id", ondelete="CASCADE"), nullable=False)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=True)

    # === PRODUCT SNAPSHOT (at time of order) ===
    product_name = Column(String, nullable=False)
    product_sku = Column(String)
    product_gtin = Column(String)
    product_image_url = Column(String)

    # === PRICING ===
    unit_price = Column(Float, nullable=False)
    quantity = Column(Integer, nullable=False, default=1)
    subtotal = Column(Float, nullable=False)
    tax_amount = Column(Float, default=0)
    discount_amount = Column(Float, default=0)
    total = Column(Float, nullable=False)

    # === PRODUCT TYPE ===
    is_digital = Column(Boolean, default=False)

    # === DIGITAL FULFILLMENT ===
    license_key = Column(String)  # For digital products
    download_url = Column(String)
    download_expiry = Column(DateTime)
    digital_fulfilled_at = Column(DateTime)

    # === PHYSICAL FULFILLMENT ===
    shipped_quantity = Column(Integer, default=0)

    # === SUPPLIER TRACKING (for CodesWholesale etc) ===
    supplier = Column(String)  # 'codeswholesale', 'internal', etc.
    supplier_order_id = Column(String)  # Supplier's order reference
    supplier_cost = Column(Float)  # What we paid supplier

    # === RELATIONSHIPS ===
    order = relationship("Order", back_populates="items")
    product = relationship("Product")


class OrderStatusHistory(Base, TimestampMixin):
    """Audit trail of order status changes."""
    __tablename__ = "order_status_history"

    id = Column(Integer, primary_key=True)
    order_id = Column(Integer, ForeignKey("orders.id", ondelete="CASCADE"), nullable=False)

    from_status = Column(SQLEnum(OrderStatus))
    to_status = Column(SQLEnum(OrderStatus), nullable=False)
    changed_by = Column(String)  # 'system', 'store:123', 'marketplace:letzshop'
    reason = Column(String)
    metadata = Column(JSON)  # Additional context (tracking number, etc.)

    order = relationship("Order", back_populates="status_history")

2.3 Order Import Flow

sequenceDiagram
    participant Scheduler as Scheduler
    participant Poller as Order Poller
    participant MP as Marketplace API
    participant Queue as Import Queue
    participant Worker as Import Worker
    participant DB as Database
    participant Notify as Notification Service

    Scheduler->>Poller: Trigger poll (every N minutes)
    Poller->>MP: Fetch orders since last_sync
    MP-->>Poller: New/updated orders

    loop For each order
        Poller->>Queue: Enqueue order import job
    end

    Worker->>Queue: Pick up job
    Worker->>DB: Check if order exists (by channel_order_id)

    alt New Order
        Worker->>DB: Create order + items
        Worker->>Notify: New order notification
    else Existing Order
        Worker->>DB: Update order status/details
    end

    Worker->>DB: Update last_synced_at

2.4 Letzshop Order Integration (GraphQL)

# Example GraphQL queries for Letzshop order integration

LETZSHOP_ORDERS_QUERY = """
query GetOrders($since: DateTime, $status: [OrderStatus!]) {
    orders(
        filter: {
            updatedAt: { gte: $since }
            status: { in: $status }
        }
        first: 100
    ) {
        edges {
            node {
                id
                orderNumber
                status
                createdAt
                updatedAt
                customer {
                    email
                    firstName
                    lastName
                    phone
                }
                shippingAddress {
                    street
                    city
                    postalCode
                    country
                }
                items {
                    product {
                        id
                        sku
                        name
                    }
                    quantity
                    unitPrice
                    totalPrice
                }
                totals {
                    subtotal
                    shipping
                    tax
                    total
                    currency
                }
            }
        }
        pageInfo {
            hasNextPage
            endCursor
        }
    }
}
"""

class LetzshopOrderImporter:
    """Import orders from Letzshop via GraphQL."""

    def __init__(self, store_id: int, api_url: str, api_token: str):
        self.store_id = store_id
        self.api_url = api_url
        self.api_token = api_token

    async def fetch_orders_since(self, since: datetime) -> list[dict]:
        """Fetch orders updated since given timestamp."""
        # Implementation: Execute GraphQL query
        pass

    def map_to_order(self, letzshop_order: dict) -> OrderCreate:
        """Map Letzshop order to unified Order schema."""
        return OrderCreate(
            store_id=self.store_id,
            channel=OrderChannel.LETZSHOP,
            channel_order_id=letzshop_order["id"],
            customer_email=letzshop_order["customer"]["email"],
            customer_name=f"{letzshop_order['customer']['firstName']} {letzshop_order['customer']['lastName']}",
            status=self._map_status(letzshop_order["status"]),
            # ... map remaining fields
        )

    def _map_status(self, letzshop_status: str) -> OrderStatus:
        """Map Letzshop status to unified status."""
        mapping = {
            "PENDING": OrderStatus.PENDING,
            "PAID": OrderStatus.CONFIRMED,
            "PROCESSING": OrderStatus.PROCESSING,
            "SHIPPED": OrderStatus.SHIPPED,
            "DELIVERED": OrderStatus.DELIVERED,
            "CANCELLED": OrderStatus.CANCELLED,
        }
        return mapping.get(letzshop_status, OrderStatus.PENDING)

Part 3: Order Fulfillment Sync

3.1 Fulfillment Architecture

graph TB
    subgraph "Store Actions"
        VA[Store marks order shipped]
        VD[Store marks delivered]
        VF[Digital fulfillment triggered]
    end

    subgraph "Fulfillment Service"
        FS[Fulfillment Service]
        DFS[Digital Fulfillment Service]
    end

    subgraph "Outbound Sync"
        SQ[Sync Queue]
        SW[Sync Workers]
    end

    subgraph "External Systems"
        LS_API[Letzshop GraphQL]
        AZ_API[Amazon API]
        EB_API[eBay API]
        CW_API[CodesWholesale API]
        EMAIL[Email Service]
    end

    VA --> FS
    VD --> FS
    VF --> DFS

    FS --> SQ
    DFS --> CW_API
    DFS --> EMAIL

    SQ --> SW
    SW --> LS_API
    SW --> AZ_API
    SW --> EB_API

3.2 Physical Product Fulfillment

sequenceDiagram
    participant Store as Store UI
    participant API as Fulfillment API
    participant DB as Database
    participant Queue as Sync Queue
    participant Worker as Sync Worker
    participant MP as Marketplace API

    Store->>API: Mark order as shipped (tracking #)
    API->>DB: Update order status
    API->>DB: Add status history entry
    API->>Queue: Enqueue fulfillment sync job

    Worker->>Queue: Pick up job
    Worker->>DB: Get order details
    Worker->>MP: Update fulfillment status

    alt Sync Success
        MP-->>Worker: 200 OK
        Worker->>DB: Mark sync_status='synced'
    else Sync Failed
        MP-->>Worker: Error
        Worker->>DB: Mark sync_status='error', store error
        Worker->>Queue: Retry with backoff
    end

3.3 Digital Product Fulfillment (CodesWholesale)

sequenceDiagram
    participant Order as Order Service
    participant DFS as Digital Fulfillment Service
    participant CW as CodesWholesale API
    participant DB as Database
    participant Email as Email Service
    participant Customer as Customer

    Note over Order: Order confirmed, contains digital item
    Order->>DFS: Fulfill digital items

    loop For each digital item
        DFS->>DB: Check product supplier

        alt Supplier = CodesWholesale
            DFS->>CW: POST /orders (purchase key)
            CW-->>DFS: License key + download info
            DFS->>DB: Store license key on order_item
        else Internal / Pre-loaded
            DFS->>DB: Get key from license pool
            DFS->>DB: Mark key as used
        end
    end

    DFS->>DB: Update order status to FULFILLED
    DFS->>Email: Send fulfillment email
    Email->>Customer: License keys + download links

3.4 Fulfillment Service Implementation

class FulfillmentService:
    """Service for managing order fulfillment across channels."""

    def __init__(
        self,
        db: Session,
        digital_fulfillment: "DigitalFulfillmentService",
        sync_queue: "SyncQueue",
    ):
        self.db = db
        self.digital_fulfillment = digital_fulfillment
        self.sync_queue = sync_queue

    async def mark_shipped(
        self,
        order_id: int,
        tracking_number: str | None = None,
        carrier: str | None = None,
        shipped_items: list[int] | None = None,  # Partial shipment
    ) -> Order:
        """Mark order (or items) as shipped."""
        order = self._get_order(order_id)

        # Update order
        order.status = OrderStatus.SHIPPED
        order.shipped_at = datetime.utcnow()

        # Add tracking info
        if tracking_number:
            self._add_status_history(
                order,
                OrderStatus.SHIPPED,
                metadata={"tracking_number": tracking_number, "carrier": carrier}
            )

        # Queue sync to marketplace
        if order.channel != OrderChannel.STOREFRONT:
            self.sync_queue.enqueue(
                SyncJob(
                    type="fulfillment",
                    order_id=order_id,
                    channel=order.channel,
                    data={"tracking_number": tracking_number, "carrier": carrier}
                )
            )

        self.db.commit()
        return order

    async def fulfill_digital_items(self, order_id: int) -> Order:
        """Fulfill digital items in order."""
        order = self._get_order(order_id)

        digital_items = [item for item in order.items if item.is_digital]

        for item in digital_items:
            await self.digital_fulfillment.fulfill_item(item)

        # Check if fully fulfilled
        if all(item.digital_fulfilled_at for item in digital_items):
            if order.is_fully_digital:
                order.status = OrderStatus.FULFILLED
                order.fulfilled_at = datetime.utcnow()

        self.db.commit()
        return order


class DigitalFulfillmentService:
    """Service for fulfilling digital products."""

    def __init__(
        self,
        db: Session,
        codeswholesale_client: "CodesWholesaleClient",
        email_service: "EmailService",
    ):
        self.db = db
        self.codeswholesale = codeswholesale_client
        self.email_service = email_service

    async def fulfill_item(self, item: OrderItem) -> OrderItem:
        """Fulfill a single digital order item."""
        if item.digital_fulfilled_at:
            return item  # Already fulfilled

        # Get license key based on supplier
        if item.supplier == "codeswholesale":
            key_data = await self._fulfill_from_codeswholesale(item)
        else:
            key_data = await self._fulfill_from_internal_pool(item)

        # Update item
        item.license_key = key_data.get("license_key")
        item.download_url = key_data.get("download_url")
        item.download_expiry = key_data.get("expiry")
        item.digital_fulfilled_at = datetime.utcnow()
        item.supplier_order_id = key_data.get("supplier_order_id")
        item.supplier_cost = key_data.get("cost")

        return item

    async def _fulfill_from_codeswholesale(self, item: OrderItem) -> dict:
        """Purchase key from CodesWholesale on-demand."""
        # Get the marketplace product to find CodesWholesale product ID
        product = item.product
        mp = product.marketplace_product

        if mp.marketplace != "codeswholesale":
            raise ValueError(f"Product {product.id} is not from CodesWholesale")

        # Purchase from CodesWholesale
        result = await self.codeswholesale.purchase_code(
            product_id=mp.marketplace_product_id,
            quantity=item.quantity,
        )

        return {
            "license_key": result["codes"][0]["code"],  # First code for qty=1
            "download_url": result.get("download_url"),
            "supplier_order_id": result["order_id"],
            "cost": result["total_price"],
        }

    async def _fulfill_from_internal_pool(self, item: OrderItem) -> dict:
        """Get key from internal pre-loaded pool."""
        # Implementation for stores who pre-load their own keys
        pass

3.5 Letzshop Fulfillment Sync (GraphQL)

LETZSHOP_UPDATE_FULFILLMENT = """
mutation UpdateOrderFulfillment($orderId: ID!, $input: FulfillmentInput!) {
    updateOrderFulfillment(orderId: $orderId, input: $input) {
        order {
            id
            status
            fulfillment {
                status
                trackingNumber
                carrier
                shippedAt
            }
        }
        errors {
            field
            message
        }
    }
}
"""

class LetzshopFulfillmentSync:
    """Sync fulfillment status to Letzshop."""

    async def sync_shipment(
        self,
        order: Order,
        tracking_number: str | None,
        carrier: str | None,
    ) -> SyncResult:
        """Update Letzshop with shipment info."""
        variables = {
            "orderId": order.channel_order_id,
            "input": {
                "status": "SHIPPED",
                "trackingNumber": tracking_number,
                "carrier": carrier,
                "shippedAt": order.shipped_at.isoformat(),
            }
        }

        result = await self._execute_mutation(
            LETZSHOP_UPDATE_FULFILLMENT,
            variables
        )

        if result.get("errors"):
            return SyncResult(success=False, errors=result["errors"])

        return SyncResult(success=True)

Part 4: Inventory Sync

4.1 Inventory Sync Architecture

graph TB
    subgraph "Inventory Changes"
        OC[Order Created<br/>Reserve stock]
        OF[Order Fulfilled<br/>Deduct stock]
        OX[Order Cancelled<br/>Release stock]
        MA[Manual Adjustment]
        SI[Stock Import]
    end

    subgraph "Inventory Service"
        IS[Inventory Service]
        EQ[Event Queue]
    end

    subgraph "Sync Strategy"
        RT[Real-time Sync<br/>For API marketplaces]
        SC[Scheduled Batch<br/>For file-based]
    end

    subgraph "Outbound"
        LS_S[Letzshop<br/>CSV/GraphQL]
        AZ_S[Amazon API]
        EB_S[eBay API]
    end

    OC --> IS
    OF --> IS
    OX --> IS
    MA --> IS
    SI --> IS

    IS --> EQ
    EQ --> RT
    EQ --> SC

    RT --> AZ_S
    RT --> EB_S
    SC --> LS_S

4.2 Sync Strategies

Strategy Use Case Trigger Marketplaces
Real-time API-based marketplaces Inventory change event Amazon, eBay
Scheduled Batch File-based or rate-limited Cron job (configurable) Letzshop
On-demand Manual trigger Store action All

4.3 Inventory Data Model Extensions

class InventorySyncConfig(Base, TimestampMixin):
    """Per-store, per-marketplace sync configuration."""
    __tablename__ = "inventory_sync_configs"

    id = Column(Integer, primary_key=True)
    store_id = Column(Integer, ForeignKey("stores.id"), nullable=False)
    marketplace = Column(String, nullable=False)  # 'letzshop', 'amazon', 'ebay'

    # === SYNC SETTINGS ===
    is_enabled = Column(Boolean, default=True)
    sync_strategy = Column(String, default="scheduled")  # 'realtime', 'scheduled', 'manual'
    sync_interval_minutes = Column(Integer, default=15)  # For scheduled

    # === CREDENTIALS ===
    api_credentials = Column(JSON)  # Encrypted credentials

    # === STOCK RULES ===
    safety_stock = Column(Integer, default=0)  # Reserve buffer
    out_of_stock_threshold = Column(Integer, default=0)
    sync_zero_stock = Column(Boolean, default=True)  # Sync when stock=0

    # === STATUS ===
    last_sync_at = Column(DateTime)
    last_sync_status = Column(String)
    last_sync_error = Column(Text)
    items_synced_count = Column(Integer, default=0)

    __table_args__ = (
        UniqueConstraint("store_id", "marketplace", name="uq_inventory_sync_store_marketplace"),
    )


class InventorySyncLog(Base, TimestampMixin):
    """Log of inventory sync operations."""
    __tablename__ = "inventory_sync_logs"

    id = Column(Integer, primary_key=True)
    store_id = Column(Integer, ForeignKey("stores.id"), nullable=False)
    marketplace = Column(String, nullable=False)

    sync_type = Column(String)  # 'full', 'incremental', 'single_product'
    started_at = Column(DateTime, nullable=False)
    completed_at = Column(DateTime)

    status = Column(String)  # 'success', 'partial', 'failed'
    items_processed = Column(Integer, default=0)
    items_succeeded = Column(Integer, default=0)
    items_failed = Column(Integer, default=0)

    errors = Column(JSON)  # List of errors

4.4 Inventory Sync Service

class InventorySyncService:
    """Service for syncing inventory to marketplaces."""

    def __init__(
        self,
        db: Session,
        sync_adapters: dict[str, "MarketplaceSyncAdapter"],
    ):
        self.db = db
        self.adapters = sync_adapters

    async def sync_inventory_change(
        self,
        product_id: int,
        new_quantity: int,
        change_reason: str,
    ):
        """Handle inventory change event - trigger real-time syncs."""
        product = self.db.query(Product).get(product_id)
        store_id = product.store_id

        # Get enabled real-time sync configs
        configs = self.db.query(InventorySyncConfig).filter(
            InventorySyncConfig.store_id == store_id,
            InventorySyncConfig.is_enabled == True,
            InventorySyncConfig.sync_strategy == "realtime",
        ).all()

        for config in configs:
            adapter = self.adapters.get(config.marketplace)
            if adapter:
                await self._sync_single_product(adapter, config, product, new_quantity)

    async def run_scheduled_sync(self, store_id: int, marketplace: str):
        """Run scheduled batch sync for a marketplace."""
        config = self._get_sync_config(store_id, marketplace)
        adapter = self.adapters.get(marketplace)

        log = InventorySyncLog(
            store_id=store_id,
            marketplace=marketplace,
            sync_type="full",
            started_at=datetime.utcnow(),
            status="running",
        )
        self.db.add(log)
        self.db.commit()

        try:
            # Get all products for this store linked to this marketplace
            products = self._get_products_for_sync(store_id, marketplace)

            # Build inventory update payload
            inventory_data = []
            for product in products:
                available_qty = self._calculate_available_quantity(product, config)
                inventory_data.append({
                    "sku": product.store_sku or product.marketplace_product.marketplace_product_id,
                    "quantity": available_qty,
                })

            # Sync via adapter
            result = await adapter.sync_inventory_batch(config, inventory_data)

            log.completed_at = datetime.utcnow()
            log.status = "success" if result.all_succeeded else "partial"
            log.items_processed = len(inventory_data)
            log.items_succeeded = result.succeeded_count
            log.items_failed = result.failed_count
            log.errors = result.errors

            config.last_sync_at = datetime.utcnow()
            config.last_sync_status = log.status
            config.items_synced_count = log.items_succeeded

        except Exception as e:
            log.completed_at = datetime.utcnow()
            log.status = "failed"
            log.errors = [{"error": str(e)}]
            config.last_sync_error = str(e)

        self.db.commit()
        return log

    def _calculate_available_quantity(
        self,
        product: Product,
        config: InventorySyncConfig,
    ) -> int:
        """Calculate quantity to report, applying safety stock."""
        inventory = product.inventory_entries[0] if product.inventory_entries else None
        if not inventory:
            return 0

        available = inventory.quantity - inventory.reserved_quantity
        available -= config.safety_stock  # Apply safety buffer

        if available <= config.out_of_stock_threshold:
            return 0 if config.sync_zero_stock else config.out_of_stock_threshold

        return max(0, available)

4.5 Letzshop Inventory Sync

class LetzshopInventorySyncAdapter:
    """Sync inventory to Letzshop via CSV or GraphQL."""

    async def sync_inventory_batch(
        self,
        config: InventorySyncConfig,
        inventory_data: list[dict],
    ) -> SyncResult:
        """Sync inventory batch to Letzshop."""
        # Option 1: CSV Export (upload to SFTP/web location)
        if config.api_credentials.get("method") == "csv":
            return await self._sync_via_csv(config, inventory_data)

        # Option 2: GraphQL mutations
        return await self._sync_via_graphql(config, inventory_data)

    async def _sync_via_csv(
        self,
        config: InventorySyncConfig,
        inventory_data: list[dict],
    ) -> SyncResult:
        """Generate and upload CSV inventory file."""
        # Generate CSV
        csv_content = self._generate_inventory_csv(inventory_data)

        # Upload to configured location (SFTP, S3, etc.)
        upload_location = config.api_credentials.get("upload_url")
        # ... upload logic

        return SyncResult(success=True, succeeded_count=len(inventory_data))

    async def _sync_via_graphql(
        self,
        config: InventorySyncConfig,
        inventory_data: list[dict],
    ) -> SyncResult:
        """Update inventory via GraphQL mutations."""
        mutation = """
        mutation UpdateInventory($input: InventoryUpdateInput!) {
            updateInventory(input: $input) {
                success
                errors { sku, message }
            }
        }
        """
        # ... execute mutation

Part 5: Scheduler & Job Management

5.1 Scheduled Jobs Overview

Job Default Schedule Configurable Description
order_import_{marketplace} Every 5 min Per store Poll orders from marketplace
inventory_sync_{marketplace} Every 15 min Per store Sync inventory to marketplace
codeswholesale_catalog_sync Every 6 hours Global Update digital product catalog
product_price_sync Daily Per store Sync price changes to marketplace
sync_retry_failed Every 10 min Global Retry failed sync jobs

5.2 Job Configuration Model

class ScheduledJob(Base, TimestampMixin):
    """Configurable scheduled job."""
    __tablename__ = "scheduled_jobs"

    id = Column(Integer, primary_key=True)
    store_id = Column(Integer, ForeignKey("stores.id"), nullable=True)  # Null = global

    job_type = Column(String, nullable=False)  # 'order_import', 'inventory_sync', etc.
    marketplace = Column(String)  # Relevant marketplace if applicable

    # === SCHEDULE ===
    is_enabled = Column(Boolean, default=True)
    cron_expression = Column(String)  # Cron format
    interval_minutes = Column(Integer)  # Simple interval alternative

    # === EXECUTION ===
    last_run_at = Column(DateTime)
    last_run_status = Column(String)
    last_run_duration_ms = Column(Integer)
    next_run_at = Column(DateTime)

    # === RETRY CONFIG ===
    max_retries = Column(Integer, default=3)
    retry_delay_seconds = Column(Integer, default=60)

    __table_args__ = (
        UniqueConstraint("store_id", "job_type", "marketplace", name="uq_scheduled_job"),
    )

Implementation Roadmap

Phase 1: Product Import (Weeks 1-2)

Goal: Multi-marketplace product import with translations and digital product support.

Task Priority Status
Add product_type, is_digital fields to marketplace_products High [ ]
Create marketplace_product_translations table High [ ]
Create product_translations table High [ ]
Implement BaseMarketplaceImporter pattern High [ ]
Refactor LetzshopImporter from CSV processor High [ ]
Add CodesWholesale catalog importer High [ ]
Implement store override pattern on products Medium [ ]
Add translation override support Medium [ ]
Update API endpoints for translations Medium [ ]

Detailed tasks: See Multi-Marketplace Product Architecture

Phase 2: Order Import (Weeks 3-4)

Goal: Unified order management with multi-channel support.

Task Priority Status
Design unified orders schema High [ ]
Create orders, order_items, order_status_history tables High [ ]
Implement BaseOrderImporter pattern High [ ]
Implement LetzshopOrderImporter (GraphQL) High [ ]
Create order polling scheduler High [ ]
Build order list/detail store UI Medium [ ]
Add order notifications Medium [ ]
Implement order search and filtering Medium [ ]

Phase 3: Order Fulfillment Sync (Weeks 5-6)

Goal: Sync fulfillment status back to marketplaces, handle digital delivery.

Task Priority Status
Implement FulfillmentService High [ ]
Implement DigitalFulfillmentService High [ ]
Integrate CodesWholesale key purchase API High [ ]
Create fulfillment sync queue High [ ]
Implement LetzshopFulfillmentSync High [ ]
Build fulfillment UI (mark shipped, add tracking) Medium [ ]
Digital fulfillment email templates Medium [ ]
Fulfillment retry logic Medium [ ]

Phase 4: Inventory Sync (Weeks 7-8)

Goal: Real-time and scheduled inventory sync to marketplaces.

Task Priority Status
Create inventory_sync_configs table High [ ]
Create inventory_sync_logs table High [ ]
Implement InventorySyncService High [ ]
Implement LetzshopInventorySyncAdapter High [ ]
Create inventory change event system High [ ]
Build sync configuration UI Medium [ ]
Add sync status dashboard Medium [ ]
Implement real-time sync for future marketplaces Low [ ]

Security Considerations

Credential Storage

# All marketplace credentials should be encrypted at rest
class MarketplaceCredential(Base, TimestampMixin):
    """Encrypted marketplace credentials."""
    __tablename__ = "marketplace_credentials"

    id = Column(Integer, primary_key=True)
    store_id = Column(Integer, ForeignKey("stores.id"), nullable=False)
    marketplace = Column(String, nullable=False)

    # Encrypted using application-level encryption
    credentials_encrypted = Column(LargeBinary, nullable=False)

    # Metadata (not sensitive)
    credential_type = Column(String)  # 'api_key', 'oauth', 'basic'
    expires_at = Column(DateTime)
    is_valid = Column(Boolean, default=True)
    last_validated_at = Column(DateTime)

API Rate Limiting

  • Respect marketplace API rate limits
  • Implement exponential backoff for failures
  • Queue operations to smooth out request bursts

Data Privacy

  • Customer data from marketplaces should follow GDPR guidelines
  • Implement data retention policies
  • Provide data export/deletion capabilities

Monitoring & Observability

Key Metrics

Metric Description Alert Threshold
order_import_lag_seconds Time since last successful order poll > 15 min
inventory_sync_lag_seconds Time since last inventory sync > 30 min
fulfillment_sync_queue_depth Pending fulfillment syncs > 100
sync_error_rate Failed syncs / total syncs > 5%
digital_fulfillment_success_rate Successful key retrievals < 95%

Health Checks

@router.get("/health/marketplace-integrations")
async def check_marketplace_health(store_id: int):
    """Check health of marketplace integrations."""
    return {
        "letzshop": {
            "order_import": check_last_sync("letzshop", "order_import"),
            "inventory_sync": check_last_sync("letzshop", "inventory_sync"),
        },
        "codeswholesale": {
            "catalog_sync": check_last_sync("codeswholesale", "catalog"),
            "api_status": await check_codeswholesale_api(),
        }
    }

Appendix A: CodesWholesale Integration Details

API Endpoints Used

Endpoint Purpose Frequency
GET /products Fetch full catalog Every 6 hours
GET /products/{id} Get single product details On-demand
POST /orders Purchase license key On order fulfillment
GET /orders/{id} Check order status After purchase
GET /account/balance Check account balance Periodically

Product Catalog Mapping

def map_codeswholesale_product(cw_product: dict) -> dict:
    """Map CodesWholesale product to marketplace_product format."""
    return {
        "marketplace_product_id": f"cw_{cw_product['productId']}",
        "marketplace": "codeswholesale",
        "gtin": cw_product.get("ean"),
        "product_type": "digital",
        "is_digital": True,
        "digital_delivery_method": "license_key",
        "platform": cw_product.get("platform", "").lower(),  # steam, origin, etc.
        "region_restrictions": cw_product.get("regions"),
        "price": cw_product["prices"][0]["value"],  # Supplier cost
        "currency": cw_product["prices"][0]["currency"],
        "availability": "in_stock" if cw_product["quantity"] > 0 else "out_of_stock",
        "attributes": {
            "languages": cw_product.get("languages"),
            "release_date": cw_product.get("releaseDate"),
        }
    }

Appendix B: Status Mapping Reference

Order Status Mapping

Orion Status Letzshop Amazon eBay
PENDING PENDING Pending -
CONFIRMED PAID Unshipped Paid
PROCESSING PROCESSING - -
SHIPPED SHIPPED Shipped Shipped
DELIVERED DELIVERED - Delivered
FULFILLED - - -
CANCELLED CANCELLED Cancelled Cancelled
REFUNDED REFUNDED Refunded Refunded