diff --git a/docs/architecture/marketplace-integration.md b/docs/architecture/marketplace-integration.md
new file mode 100644
index 00000000..205de482
--- /dev/null
+++ b/docs/architecture/marketplace-integration.md
@@ -0,0 +1,1345 @@
+# Multi-Marketplace Integration Architecture
+
+## Executive Summary
+
+This document defines the complete architecture for integrating Wizamart with multiple external marketplaces (Letzshop, Amazon, eBay) and digital product suppliers (CodesWholesale). The integration is **bidirectional**, supporting both inbound flows (products, orders) and outbound flows (inventory sync, fulfillment status).
+
+**Key Capabilities:**
+
+| Capability | Description |
+|------------|-------------|
+| **Multi-Marketplace Products** | Import and normalize products from multiple sources |
+| **Multi-Language Support** | Translations with language fallback |
+| **Unified Order Management** | Orders from all channels in one place |
+| **Digital Product Fulfillment** | On-demand license key retrieval from suppliers |
+| **Inventory Sync** | Real-time and scheduled stock updates to marketplaces |
+| **Fulfillment Sync** | Order status and tracking back to marketplaces |
+
+---
+
+## System Overview
+
+```mermaid
+graph TB
+ subgraph "External Systems"
+ LS[Letzshop
CSV + GraphQL]
+ AZ[Amazon
API]
+ EB[eBay
API]
+ CW[CodesWholesale
Digital Supplier API]
+ WS[Vendor Storefront
Wizamart Shop]
+ end
+
+ subgraph "Integration Layer"
+ subgraph "Inbound Adapters"
+ PI[Product Importers]
+ OI[Order Importers]
+ DPI[Digital Product Importer]
+ end
+ subgraph "Outbound Adapters"
+ IS[Inventory Sync]
+ FS[Fulfillment Sync]
+ PE[Product Export]
+ end
+ end
+
+ subgraph "Wizamart Core"
+ MP[Marketplace Products]
+ P[Vendor Products]
+ O[Unified Orders]
+ I[Inventory]
+ F[Fulfillment]
+ DL[Digital License Pool]
+ end
+
+ LS -->|CSV Import| PI
+ AZ -->|API Pull| PI
+ EB -->|API Pull| PI
+ CW -->|Catalog Sync| DPI
+
+ LS -->|GraphQL Poll| OI
+ AZ -->|API Poll| OI
+ EB -->|API Poll| OI
+ WS -->|Direct| O
+
+ PI --> MP
+ DPI --> MP
+ MP --> P
+ OI --> O
+
+ I -->|Real-time/Scheduled| IS
+ F -->|Status Update| FS
+ P -->|CSV Export| PE
+
+ IS --> LS
+ IS --> AZ
+ IS --> EB
+ FS --> LS
+ FS --> AZ
+ FS --> EB
+ PE --> LS
+
+ CW -->|On-demand Keys| DL
+ DL --> F
+```
+
+---
+
+## Integration Phases
+
+| Phase | Scope | Priority | Dependencies |
+|-------|-------|----------|--------------|
+| **Phase 1** | Product Import | High | None |
+| **Phase 2** | Order Import | High | Phase 1 |
+| **Phase 3** | Order Fulfillment Sync | High | Phase 2 |
+| **Phase 4** | Inventory Sync | Medium | Phase 1 |
+
+---
+
+## Marketplace Capabilities Matrix
+
+| Marketplace | Products In | Products Out | Orders In | Fulfillment Out | Inventory Out | Method |
+|-------------|-------------|--------------|-----------|-----------------|---------------|--------|
+| **Letzshop** | CSV Import | CSV Export | GraphQL Poll | GraphQL | CSV/GraphQL | File + API |
+| **Amazon** | API | N/A | API Poll | API | API (Real-time) | API |
+| **eBay** | API | N/A | API Poll | API | API (Real-time) | API |
+| **CodesWholesale** | API Catalog | N/A | N/A | On-demand Keys | N/A | API |
+| **Vendor Storefront** | N/A | N/A | Direct DB | Internal | Internal | Direct |
+
+---
+
+## Part 1: Product Integration
+
+### 1.1 Architecture Overview
+
+```mermaid
+graph TB
+ subgraph "Source Layer"
+ LS_CSV[Letzshop CSV
Multi-language feeds]
+ AZ_API[Amazon API
Product catalog]
+ EB_API[eBay API
Product catalog]
+ CW_API[CodesWholesale API
Digital catalog]
+ end
+
+ subgraph "Import Layer"
+ LSI[LetzshopImporter]
+ AZI[AmazonImporter]
+ EBI[EbayImporter]
+ CWI[CodesWholesaleImporter]
+ end
+
+ subgraph "Canonical Layer"
+ MP[(marketplace_products)]
+ MPT[(marketplace_product_translations)]
+ end
+
+ subgraph "Vendor Layer"
+ P[(products)]
+ PT[(product_translations)]
+ end
+
+ LS_CSV --> LSI
+ AZ_API --> AZI
+ EB_API --> EBI
+ CW_API --> CWI
+
+ LSI --> MP
+ AZI --> MP
+ EBI --> MP
+ CWI --> MP
+
+ MP --> MPT
+ MP --> P
+ P --> PT
+```
+
+### 1.2 Digital Product Supplier Integration (CodesWholesale)
+
+CodesWholesale provides digital products (game keys, gift cards, software licenses) that need special handling:
+
+```mermaid
+sequenceDiagram
+ participant CW as CodesWholesale API
+ participant Sync as Catalog Sync Job
+ participant MP as marketplace_products
+ participant P as products
+ participant LP as License Pool
+
+ Note over Sync: Scheduled catalog sync (e.g., every 6 hours)
+ Sync->>CW: GET /products (catalog)
+ CW-->>Sync: Product catalog with prices, availability
+ Sync->>MP: Upsert products (marketplace='codeswholesale')
+ Sync->>MP: Update prices, availability flags
+
+ Note over P: Vendor adds product to their catalog
+ P->>MP: Link to marketplace_product
+
+ Note over LP: Order placed - need license key
+ LP->>CW: POST /orders (purchase key on-demand)
+ CW-->>LP: License key / download link
+ LP->>LP: Store for fulfillment
+```
+
+**Key Characteristics:**
+
+| Aspect | Behavior |
+|--------|----------|
+| **Catalog Sync** | Scheduled job fetches full catalog, updates prices/availability |
+| **License Keys** | Purchased on-demand at fulfillment time (not pre-stocked) |
+| **Inventory** | Virtual - always "available" but subject to supplier stock |
+| **Pricing** | Dynamic - supplier prices may change, vendor sets markup |
+| **Regions** | Products may have region restrictions (EU, US, Global) |
+
+### 1.3 Product Data Model
+
+See [Multi-Marketplace Product Architecture](../development/migration/multi-marketplace-product-architecture.md) for detailed schema.
+
+**Summary of tables:**
+
+| Table | Purpose |
+|-------|---------|
+| `marketplace_products` | Canonical product data from all sources |
+| `marketplace_product_translations` | Localized titles, descriptions per language |
+| `products` | Vendor-specific overrides and settings |
+| `product_translations` | Vendor-specific localized overrides |
+
+### 1.4 Import Job Flow
+
+```mermaid
+stateDiagram-v2
+ [*] --> Pending: Job Created
+ Pending --> Processing: Worker picks up
+ Processing --> Downloading: Fetch source data
+ Downloading --> Parsing: Parse rows
+ Parsing --> Upserting: Update database
+ Upserting --> Completed: All rows processed
+ Upserting --> PartiallyCompleted: Some rows failed
+ Processing --> Failed: Fatal error
+ Completed --> [*]
+ PartiallyCompleted --> [*]
+ Failed --> [*]
+```
+
+---
+
+## Part 2: Order Integration
+
+### 2.1 Unified Order Model
+
+Orders from all channels (marketplaces + vendor storefront) flow into a unified order management system.
+
+```mermaid
+graph TB
+ subgraph "Order Sources"
+ LS_O[Letzshop Orders
GraphQL Poll]
+ AZ_O[Amazon Orders
API Poll]
+ EB_O[eBay Orders
API Poll]
+ VS_O[Vendor Storefront
Direct]
+ end
+
+ subgraph "Order Import"
+ OI[Order Importer Service]
+ OQ[Order Import Queue]
+ end
+
+ subgraph "Unified Orders"
+ O[(orders)]
+ OI_T[(order_items)]
+ OS[(order_status_history)]
+ end
+
+ LS_O -->|Poll every N min| OI
+ AZ_O -->|Poll every N min| OI
+ EB_O -->|Poll every N min| OI
+ VS_O -->|Direct insert| O
+
+ OI --> OQ
+ OQ --> O
+ O --> OI_T
+ O --> OS
+```
+
+### 2.2 Order Data Model
+
+```python
+class OrderChannel(str, Enum):
+ """Order source channel."""
+ STOREFRONT = "storefront" # Vendor's own Wizamart shop
+ LETZSHOP = "letzshop"
+ AMAZON = "amazon"
+ EBAY = "ebay"
+
+class OrderStatus(str, Enum):
+ """Unified order status."""
+ PENDING = "pending" # Awaiting payment/confirmation
+ CONFIRMED = "confirmed" # Payment confirmed
+ PROCESSING = "processing" # Being prepared
+ READY_FOR_SHIPMENT = "ready_for_shipment" # Physical: packed
+ SHIPPED = "shipped" # Physical: in transit
+ DELIVERED = "delivered" # Physical: delivered
+ FULFILLED = "fulfilled" # Digital: key/download sent
+ CANCELLED = "cancelled"
+ REFUNDED = "refunded"
+ PARTIALLY_REFUNDED = "partially_refunded"
+
+class Order(Base, TimestampMixin):
+ """Unified order from all channels."""
+ __tablename__ = "orders"
+
+ id = Column(Integer, primary_key=True)
+ vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
+
+ # === CHANNEL TRACKING ===
+ channel = Column(SQLEnum(OrderChannel), nullable=False, index=True)
+ channel_order_id = Column(String, index=True) # External order ID
+ channel_order_url = Column(String) # Link to order in marketplace
+
+ # === CUSTOMER INFO ===
+ customer_id = Column(Integer, ForeignKey("customers.id"), nullable=True)
+ customer_email = Column(String, nullable=False)
+ customer_name = Column(String)
+ customer_phone = Column(String)
+
+ # === ADDRESSES ===
+ shipping_address = Column(JSON) # For physical products
+ billing_address = Column(JSON)
+
+ # === ORDER TOTALS ===
+ subtotal = Column(Float, nullable=False)
+ shipping_cost = Column(Float, default=0)
+ tax_amount = Column(Float, default=0)
+ discount_amount = Column(Float, default=0)
+ total = Column(Float, nullable=False)
+ currency = Column(String(3), default="EUR")
+
+ # === STATUS ===
+ status = Column(SQLEnum(OrderStatus), default=OrderStatus.PENDING, index=True)
+
+ # === FULFILLMENT TYPE ===
+ requires_shipping = Column(Boolean, default=True) # False for digital-only
+ is_fully_digital = Column(Boolean, default=False)
+
+ # === TIMESTAMPS ===
+ ordered_at = Column(DateTime, nullable=False) # When customer placed order
+ confirmed_at = Column(DateTime)
+ shipped_at = Column(DateTime)
+ delivered_at = Column(DateTime)
+ fulfilled_at = Column(DateTime) # For digital products
+
+ # === SYNC STATUS ===
+ last_synced_at = Column(DateTime) # Last sync with marketplace
+ sync_status = Column(String) # 'synced', 'pending', 'error'
+ sync_error = Column(Text)
+
+ # === RELATIONSHIPS ===
+ vendor = relationship("Vendor", back_populates="orders")
+ customer = relationship("Customer", back_populates="orders")
+ items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")
+ status_history = relationship("OrderStatusHistory", back_populates="order")
+
+ __table_args__ = (
+ Index("idx_order_vendor_status", "vendor_id", "status"),
+ Index("idx_order_channel", "channel", "channel_order_id"),
+ Index("idx_order_vendor_date", "vendor_id", "ordered_at"),
+ )
+
+
+class OrderItem(Base, TimestampMixin):
+ """Individual item in an order."""
+ __tablename__ = "order_items"
+
+ id = Column(Integer, primary_key=True)
+ order_id = Column(Integer, ForeignKey("orders.id", ondelete="CASCADE"), nullable=False)
+ product_id = Column(Integer, ForeignKey("products.id"), nullable=True)
+
+ # === PRODUCT SNAPSHOT (at time of order) ===
+ product_name = Column(String, nullable=False)
+ product_sku = Column(String)
+ product_gtin = Column(String)
+ product_image_url = Column(String)
+
+ # === PRICING ===
+ unit_price = Column(Float, nullable=False)
+ quantity = Column(Integer, nullable=False, default=1)
+ subtotal = Column(Float, nullable=False)
+ tax_amount = Column(Float, default=0)
+ discount_amount = Column(Float, default=0)
+ total = Column(Float, nullable=False)
+
+ # === PRODUCT TYPE ===
+ is_digital = Column(Boolean, default=False)
+
+ # === DIGITAL FULFILLMENT ===
+ license_key = Column(String) # For digital products
+ download_url = Column(String)
+ download_expiry = Column(DateTime)
+ digital_fulfilled_at = Column(DateTime)
+
+ # === PHYSICAL FULFILLMENT ===
+ shipped_quantity = Column(Integer, default=0)
+
+ # === SUPPLIER TRACKING (for CodesWholesale etc) ===
+ supplier = Column(String) # 'codeswholesale', 'internal', etc.
+ supplier_order_id = Column(String) # Supplier's order reference
+ supplier_cost = Column(Float) # What we paid supplier
+
+ # === RELATIONSHIPS ===
+ order = relationship("Order", back_populates="items")
+ product = relationship("Product")
+
+
+class OrderStatusHistory(Base, TimestampMixin):
+ """Audit trail of order status changes."""
+ __tablename__ = "order_status_history"
+
+ id = Column(Integer, primary_key=True)
+ order_id = Column(Integer, ForeignKey("orders.id", ondelete="CASCADE"), nullable=False)
+
+ from_status = Column(SQLEnum(OrderStatus))
+ to_status = Column(SQLEnum(OrderStatus), nullable=False)
+ changed_by = Column(String) # 'system', 'vendor:123', 'marketplace:letzshop'
+ reason = Column(String)
+ metadata = Column(JSON) # Additional context (tracking number, etc.)
+
+ order = relationship("Order", back_populates="status_history")
+```
+
+### 2.3 Order Import Flow
+
+```mermaid
+sequenceDiagram
+ participant Scheduler as Scheduler
+ participant Poller as Order Poller
+ participant MP as Marketplace API
+ participant Queue as Import Queue
+ participant Worker as Import Worker
+ participant DB as Database
+ participant Notify as Notification Service
+
+ Scheduler->>Poller: Trigger poll (every N minutes)
+ Poller->>MP: Fetch orders since last_sync
+ MP-->>Poller: New/updated orders
+
+ loop For each order
+ Poller->>Queue: Enqueue order import job
+ end
+
+ Worker->>Queue: Pick up job
+ Worker->>DB: Check if order exists (by channel_order_id)
+
+ alt New Order
+ Worker->>DB: Create order + items
+ Worker->>Notify: New order notification
+ else Existing Order
+ Worker->>DB: Update order status/details
+ end
+
+ Worker->>DB: Update last_synced_at
+```
+
+### 2.4 Letzshop Order Integration (GraphQL)
+
+```python
+# Example GraphQL queries for Letzshop order integration
+
+LETZSHOP_ORDERS_QUERY = """
+query GetOrders($since: DateTime, $status: [OrderStatus!]) {
+ orders(
+ filter: {
+ updatedAt: { gte: $since }
+ status: { in: $status }
+ }
+ first: 100
+ ) {
+ edges {
+ node {
+ id
+ orderNumber
+ status
+ createdAt
+ updatedAt
+ customer {
+ email
+ firstName
+ lastName
+ phone
+ }
+ shippingAddress {
+ street
+ city
+ postalCode
+ country
+ }
+ items {
+ product {
+ id
+ sku
+ name
+ }
+ quantity
+ unitPrice
+ totalPrice
+ }
+ totals {
+ subtotal
+ shipping
+ tax
+ total
+ currency
+ }
+ }
+ }
+ pageInfo {
+ hasNextPage
+ endCursor
+ }
+ }
+}
+"""
+
+class LetzshopOrderImporter:
+ """Import orders from Letzshop via GraphQL."""
+
+ def __init__(self, vendor_id: int, api_url: str, api_token: str):
+ self.vendor_id = vendor_id
+ self.api_url = api_url
+ self.api_token = api_token
+
+ async def fetch_orders_since(self, since: datetime) -> list[dict]:
+ """Fetch orders updated since given timestamp."""
+ # Implementation: Execute GraphQL query
+ pass
+
+ def map_to_order(self, letzshop_order: dict) -> OrderCreate:
+ """Map Letzshop order to unified Order schema."""
+ return OrderCreate(
+ vendor_id=self.vendor_id,
+ channel=OrderChannel.LETZSHOP,
+ channel_order_id=letzshop_order["id"],
+ customer_email=letzshop_order["customer"]["email"],
+ customer_name=f"{letzshop_order['customer']['firstName']} {letzshop_order['customer']['lastName']}",
+ status=self._map_status(letzshop_order["status"]),
+ # ... map remaining fields
+ )
+
+ def _map_status(self, letzshop_status: str) -> OrderStatus:
+ """Map Letzshop status to unified status."""
+ mapping = {
+ "PENDING": OrderStatus.PENDING,
+ "PAID": OrderStatus.CONFIRMED,
+ "PROCESSING": OrderStatus.PROCESSING,
+ "SHIPPED": OrderStatus.SHIPPED,
+ "DELIVERED": OrderStatus.DELIVERED,
+ "CANCELLED": OrderStatus.CANCELLED,
+ }
+ return mapping.get(letzshop_status, OrderStatus.PENDING)
+```
+
+---
+
+## Part 3: Order Fulfillment Sync
+
+### 3.1 Fulfillment Architecture
+
+```mermaid
+graph TB
+ subgraph "Vendor Actions"
+ VA[Vendor marks order shipped]
+ VD[Vendor marks delivered]
+ VF[Digital fulfillment triggered]
+ end
+
+ subgraph "Fulfillment Service"
+ FS[Fulfillment Service]
+ DFS[Digital Fulfillment Service]
+ end
+
+ subgraph "Outbound Sync"
+ SQ[Sync Queue]
+ SW[Sync Workers]
+ end
+
+ subgraph "External Systems"
+ LS_API[Letzshop GraphQL]
+ AZ_API[Amazon API]
+ EB_API[eBay API]
+ CW_API[CodesWholesale API]
+ EMAIL[Email Service]
+ end
+
+ VA --> FS
+ VD --> FS
+ VF --> DFS
+
+ FS --> SQ
+ DFS --> CW_API
+ DFS --> EMAIL
+
+ SQ --> SW
+ SW --> LS_API
+ SW --> AZ_API
+ SW --> EB_API
+```
+
+### 3.2 Physical Product Fulfillment
+
+```mermaid
+sequenceDiagram
+ participant Vendor as Vendor UI
+ participant API as Fulfillment API
+ participant DB as Database
+ participant Queue as Sync Queue
+ participant Worker as Sync Worker
+ participant MP as Marketplace API
+
+ Vendor->>API: Mark order as shipped (tracking #)
+ API->>DB: Update order status
+ API->>DB: Add status history entry
+ API->>Queue: Enqueue fulfillment sync job
+
+ Worker->>Queue: Pick up job
+ Worker->>DB: Get order details
+ Worker->>MP: Update fulfillment status
+
+ alt Sync Success
+ MP-->>Worker: 200 OK
+ Worker->>DB: Mark sync_status='synced'
+ else Sync Failed
+ MP-->>Worker: Error
+ Worker->>DB: Mark sync_status='error', store error
+ Worker->>Queue: Retry with backoff
+ end
+```
+
+### 3.3 Digital Product Fulfillment (CodesWholesale)
+
+```mermaid
+sequenceDiagram
+ participant Order as Order Service
+ participant DFS as Digital Fulfillment Service
+ participant CW as CodesWholesale API
+ participant DB as Database
+ participant Email as Email Service
+ participant Customer as Customer
+
+ Note over Order: Order confirmed, contains digital item
+ Order->>DFS: Fulfill digital items
+
+ loop For each digital item
+ DFS->>DB: Check product supplier
+
+ alt Supplier = CodesWholesale
+ DFS->>CW: POST /orders (purchase key)
+ CW-->>DFS: License key + download info
+ DFS->>DB: Store license key on order_item
+ else Internal / Pre-loaded
+ DFS->>DB: Get key from license pool
+ DFS->>DB: Mark key as used
+ end
+ end
+
+ DFS->>DB: Update order status to FULFILLED
+ DFS->>Email: Send fulfillment email
+ Email->>Customer: License keys + download links
+```
+
+### 3.4 Fulfillment Service Implementation
+
+```python
+class FulfillmentService:
+ """Service for managing order fulfillment across channels."""
+
+ def __init__(
+ self,
+ db: Session,
+ digital_fulfillment: "DigitalFulfillmentService",
+ sync_queue: "SyncQueue",
+ ):
+ self.db = db
+ self.digital_fulfillment = digital_fulfillment
+ self.sync_queue = sync_queue
+
+ async def mark_shipped(
+ self,
+ order_id: int,
+ tracking_number: str | None = None,
+ carrier: str | None = None,
+ shipped_items: list[int] | None = None, # Partial shipment
+ ) -> Order:
+ """Mark order (or items) as shipped."""
+ order = self._get_order(order_id)
+
+ # Update order
+ order.status = OrderStatus.SHIPPED
+ order.shipped_at = datetime.utcnow()
+
+ # Add tracking info
+ if tracking_number:
+ self._add_status_history(
+ order,
+ OrderStatus.SHIPPED,
+ metadata={"tracking_number": tracking_number, "carrier": carrier}
+ )
+
+ # Queue sync to marketplace
+ if order.channel != OrderChannel.STOREFRONT:
+ self.sync_queue.enqueue(
+ SyncJob(
+ type="fulfillment",
+ order_id=order_id,
+ channel=order.channel,
+ data={"tracking_number": tracking_number, "carrier": carrier}
+ )
+ )
+
+ self.db.commit()
+ return order
+
+ async def fulfill_digital_items(self, order_id: int) -> Order:
+ """Fulfill digital items in order."""
+ order = self._get_order(order_id)
+
+ digital_items = [item for item in order.items if item.is_digital]
+
+ for item in digital_items:
+ await self.digital_fulfillment.fulfill_item(item)
+
+ # Check if fully fulfilled
+ if all(item.digital_fulfilled_at for item in digital_items):
+ if order.is_fully_digital:
+ order.status = OrderStatus.FULFILLED
+ order.fulfilled_at = datetime.utcnow()
+
+ self.db.commit()
+ return order
+
+
+class DigitalFulfillmentService:
+ """Service for fulfilling digital products."""
+
+ def __init__(
+ self,
+ db: Session,
+ codeswholesale_client: "CodesWholesaleClient",
+ email_service: "EmailService",
+ ):
+ self.db = db
+ self.codeswholesale = codeswholesale_client
+ self.email_service = email_service
+
+ async def fulfill_item(self, item: OrderItem) -> OrderItem:
+ """Fulfill a single digital order item."""
+ if item.digital_fulfilled_at:
+ return item # Already fulfilled
+
+ # Get license key based on supplier
+ if item.supplier == "codeswholesale":
+ key_data = await self._fulfill_from_codeswholesale(item)
+ else:
+ key_data = await self._fulfill_from_internal_pool(item)
+
+ # Update item
+ item.license_key = key_data.get("license_key")
+ item.download_url = key_data.get("download_url")
+ item.download_expiry = key_data.get("expiry")
+ item.digital_fulfilled_at = datetime.utcnow()
+ item.supplier_order_id = key_data.get("supplier_order_id")
+ item.supplier_cost = key_data.get("cost")
+
+ return item
+
+ async def _fulfill_from_codeswholesale(self, item: OrderItem) -> dict:
+ """Purchase key from CodesWholesale on-demand."""
+ # Get the marketplace product to find CodesWholesale product ID
+ product = item.product
+ mp = product.marketplace_product
+
+ if mp.marketplace != "codeswholesale":
+ raise ValueError(f"Product {product.id} is not from CodesWholesale")
+
+ # Purchase from CodesWholesale
+ result = await self.codeswholesale.purchase_code(
+ product_id=mp.marketplace_product_id,
+ quantity=item.quantity,
+ )
+
+ return {
+ "license_key": result["codes"][0]["code"], # First code for qty=1
+ "download_url": result.get("download_url"),
+ "supplier_order_id": result["order_id"],
+ "cost": result["total_price"],
+ }
+
+ async def _fulfill_from_internal_pool(self, item: OrderItem) -> dict:
+ """Get key from internal pre-loaded pool."""
+ # Implementation for vendors who pre-load their own keys
+ pass
+```
+
+### 3.5 Letzshop Fulfillment Sync (GraphQL)
+
+```python
+LETZSHOP_UPDATE_FULFILLMENT = """
+mutation UpdateOrderFulfillment($orderId: ID!, $input: FulfillmentInput!) {
+ updateOrderFulfillment(orderId: $orderId, input: $input) {
+ order {
+ id
+ status
+ fulfillment {
+ status
+ trackingNumber
+ carrier
+ shippedAt
+ }
+ }
+ errors {
+ field
+ message
+ }
+ }
+}
+"""
+
+class LetzshopFulfillmentSync:
+ """Sync fulfillment status to Letzshop."""
+
+ async def sync_shipment(
+ self,
+ order: Order,
+ tracking_number: str | None,
+ carrier: str | None,
+ ) -> SyncResult:
+ """Update Letzshop with shipment info."""
+ variables = {
+ "orderId": order.channel_order_id,
+ "input": {
+ "status": "SHIPPED",
+ "trackingNumber": tracking_number,
+ "carrier": carrier,
+ "shippedAt": order.shipped_at.isoformat(),
+ }
+ }
+
+ result = await self._execute_mutation(
+ LETZSHOP_UPDATE_FULFILLMENT,
+ variables
+ )
+
+ if result.get("errors"):
+ return SyncResult(success=False, errors=result["errors"])
+
+ return SyncResult(success=True)
+```
+
+---
+
+## Part 4: Inventory Sync
+
+### 4.1 Inventory Sync Architecture
+
+```mermaid
+graph TB
+ subgraph "Inventory Changes"
+ OC[Order Created
Reserve stock]
+ OF[Order Fulfilled
Deduct stock]
+ OX[Order Cancelled
Release stock]
+ MA[Manual Adjustment]
+ SI[Stock Import]
+ end
+
+ subgraph "Inventory Service"
+ IS[Inventory Service]
+ EQ[Event Queue]
+ end
+
+ subgraph "Sync Strategy"
+ RT[Real-time Sync
For API marketplaces]
+ SC[Scheduled Batch
For file-based]
+ end
+
+ subgraph "Outbound"
+ LS_S[Letzshop
CSV/GraphQL]
+ AZ_S[Amazon API]
+ EB_S[eBay API]
+ end
+
+ OC --> IS
+ OF --> IS
+ OX --> IS
+ MA --> IS
+ SI --> IS
+
+ IS --> EQ
+ EQ --> RT
+ EQ --> SC
+
+ RT --> AZ_S
+ RT --> EB_S
+ SC --> LS_S
+```
+
+### 4.2 Sync Strategies
+
+| Strategy | Use Case | Trigger | Marketplaces |
+|----------|----------|---------|--------------|
+| **Real-time** | API-based marketplaces | Inventory change event | Amazon, eBay |
+| **Scheduled Batch** | File-based or rate-limited | Cron job (configurable) | Letzshop |
+| **On-demand** | Manual trigger | Vendor action | All |
+
+### 4.3 Inventory Data Model Extensions
+
+```python
+class InventorySyncConfig(Base, TimestampMixin):
+ """Per-vendor, per-marketplace sync configuration."""
+ __tablename__ = "inventory_sync_configs"
+
+ id = Column(Integer, primary_key=True)
+ vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
+ marketplace = Column(String, nullable=False) # 'letzshop', 'amazon', 'ebay'
+
+ # === SYNC SETTINGS ===
+ is_enabled = Column(Boolean, default=True)
+ sync_strategy = Column(String, default="scheduled") # 'realtime', 'scheduled', 'manual'
+ sync_interval_minutes = Column(Integer, default=15) # For scheduled
+
+ # === CREDENTIALS ===
+ api_credentials = Column(JSON) # Encrypted credentials
+
+ # === STOCK RULES ===
+ safety_stock = Column(Integer, default=0) # Reserve buffer
+ out_of_stock_threshold = Column(Integer, default=0)
+ sync_zero_stock = Column(Boolean, default=True) # Sync when stock=0
+
+ # === STATUS ===
+ last_sync_at = Column(DateTime)
+ last_sync_status = Column(String)
+ last_sync_error = Column(Text)
+ items_synced_count = Column(Integer, default=0)
+
+ __table_args__ = (
+ UniqueConstraint("vendor_id", "marketplace", name="uq_inventory_sync_vendor_marketplace"),
+ )
+
+
+class InventorySyncLog(Base, TimestampMixin):
+ """Log of inventory sync operations."""
+ __tablename__ = "inventory_sync_logs"
+
+ id = Column(Integer, primary_key=True)
+ vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
+ marketplace = Column(String, nullable=False)
+
+ sync_type = Column(String) # 'full', 'incremental', 'single_product'
+ started_at = Column(DateTime, nullable=False)
+ completed_at = Column(DateTime)
+
+ status = Column(String) # 'success', 'partial', 'failed'
+ items_processed = Column(Integer, default=0)
+ items_succeeded = Column(Integer, default=0)
+ items_failed = Column(Integer, default=0)
+
+ errors = Column(JSON) # List of errors
+```
+
+### 4.4 Inventory Sync Service
+
+```python
+class InventorySyncService:
+ """Service for syncing inventory to marketplaces."""
+
+ def __init__(
+ self,
+ db: Session,
+ sync_adapters: dict[str, "MarketplaceSyncAdapter"],
+ ):
+ self.db = db
+ self.adapters = sync_adapters
+
+ async def sync_inventory_change(
+ self,
+ product_id: int,
+ new_quantity: int,
+ change_reason: str,
+ ):
+ """Handle inventory change event - trigger real-time syncs."""
+ product = self.db.query(Product).get(product_id)
+ vendor_id = product.vendor_id
+
+ # Get enabled real-time sync configs
+ configs = self.db.query(InventorySyncConfig).filter(
+ InventorySyncConfig.vendor_id == vendor_id,
+ InventorySyncConfig.is_enabled == True,
+ InventorySyncConfig.sync_strategy == "realtime",
+ ).all()
+
+ for config in configs:
+ adapter = self.adapters.get(config.marketplace)
+ if adapter:
+ await self._sync_single_product(adapter, config, product, new_quantity)
+
+ async def run_scheduled_sync(self, vendor_id: int, marketplace: str):
+ """Run scheduled batch sync for a marketplace."""
+ config = self._get_sync_config(vendor_id, marketplace)
+ adapter = self.adapters.get(marketplace)
+
+ log = InventorySyncLog(
+ vendor_id=vendor_id,
+ marketplace=marketplace,
+ sync_type="full",
+ started_at=datetime.utcnow(),
+ status="running",
+ )
+ self.db.add(log)
+ self.db.commit()
+
+ try:
+ # Get all products for this vendor linked to this marketplace
+ products = self._get_products_for_sync(vendor_id, marketplace)
+
+ # Build inventory update payload
+ inventory_data = []
+ for product in products:
+ available_qty = self._calculate_available_quantity(product, config)
+ inventory_data.append({
+ "sku": product.vendor_sku or product.marketplace_product.marketplace_product_id,
+ "quantity": available_qty,
+ })
+
+ # Sync via adapter
+ result = await adapter.sync_inventory_batch(config, inventory_data)
+
+ log.completed_at = datetime.utcnow()
+ log.status = "success" if result.all_succeeded else "partial"
+ log.items_processed = len(inventory_data)
+ log.items_succeeded = result.succeeded_count
+ log.items_failed = result.failed_count
+ log.errors = result.errors
+
+ config.last_sync_at = datetime.utcnow()
+ config.last_sync_status = log.status
+ config.items_synced_count = log.items_succeeded
+
+ except Exception as e:
+ log.completed_at = datetime.utcnow()
+ log.status = "failed"
+ log.errors = [{"error": str(e)}]
+ config.last_sync_error = str(e)
+
+ self.db.commit()
+ return log
+
+ def _calculate_available_quantity(
+ self,
+ product: Product,
+ config: InventorySyncConfig,
+ ) -> int:
+ """Calculate quantity to report, applying safety stock."""
+ inventory = product.inventory_entries[0] if product.inventory_entries else None
+ if not inventory:
+ return 0
+
+ available = inventory.quantity - inventory.reserved_quantity
+ available -= config.safety_stock # Apply safety buffer
+
+ if available <= config.out_of_stock_threshold:
+ return 0 if config.sync_zero_stock else config.out_of_stock_threshold
+
+ return max(0, available)
+```
+
+### 4.5 Letzshop Inventory Sync
+
+```python
+class LetzshopInventorySyncAdapter:
+ """Sync inventory to Letzshop via CSV or GraphQL."""
+
+ async def sync_inventory_batch(
+ self,
+ config: InventorySyncConfig,
+ inventory_data: list[dict],
+ ) -> SyncResult:
+ """Sync inventory batch to Letzshop."""
+ # Option 1: CSV Export (upload to SFTP/web location)
+ if config.api_credentials.get("method") == "csv":
+ return await self._sync_via_csv(config, inventory_data)
+
+ # Option 2: GraphQL mutations
+ return await self._sync_via_graphql(config, inventory_data)
+
+ async def _sync_via_csv(
+ self,
+ config: InventorySyncConfig,
+ inventory_data: list[dict],
+ ) -> SyncResult:
+ """Generate and upload CSV inventory file."""
+ # Generate CSV
+ csv_content = self._generate_inventory_csv(inventory_data)
+
+ # Upload to configured location (SFTP, S3, etc.)
+ upload_location = config.api_credentials.get("upload_url")
+ # ... upload logic
+
+ return SyncResult(success=True, succeeded_count=len(inventory_data))
+
+ async def _sync_via_graphql(
+ self,
+ config: InventorySyncConfig,
+ inventory_data: list[dict],
+ ) -> SyncResult:
+ """Update inventory via GraphQL mutations."""
+ mutation = """
+ mutation UpdateInventory($input: InventoryUpdateInput!) {
+ updateInventory(input: $input) {
+ success
+ errors { sku, message }
+ }
+ }
+ """
+ # ... execute mutation
+```
+
+---
+
+## Part 5: Scheduler & Job Management
+
+### 5.1 Scheduled Jobs Overview
+
+| Job | Default Schedule | Configurable | Description |
+|-----|------------------|--------------|-------------|
+| `order_import_{marketplace}` | Every 5 min | Per vendor | Poll orders from marketplace |
+| `inventory_sync_{marketplace}` | Every 15 min | Per vendor | Sync inventory to marketplace |
+| `codeswholesale_catalog_sync` | Every 6 hours | Global | Update digital product catalog |
+| `product_price_sync` | Daily | Per vendor | Sync price changes to marketplace |
+| `sync_retry_failed` | Every 10 min | Global | Retry failed sync jobs |
+
+### 5.2 Job Configuration Model
+
+```python
+class ScheduledJob(Base, TimestampMixin):
+ """Configurable scheduled job."""
+ __tablename__ = "scheduled_jobs"
+
+ id = Column(Integer, primary_key=True)
+ vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=True) # Null = global
+
+ job_type = Column(String, nullable=False) # 'order_import', 'inventory_sync', etc.
+ marketplace = Column(String) # Relevant marketplace if applicable
+
+ # === SCHEDULE ===
+ is_enabled = Column(Boolean, default=True)
+ cron_expression = Column(String) # Cron format
+ interval_minutes = Column(Integer) # Simple interval alternative
+
+ # === EXECUTION ===
+ last_run_at = Column(DateTime)
+ last_run_status = Column(String)
+ last_run_duration_ms = Column(Integer)
+ next_run_at = Column(DateTime)
+
+ # === RETRY CONFIG ===
+ max_retries = Column(Integer, default=3)
+ retry_delay_seconds = Column(Integer, default=60)
+
+ __table_args__ = (
+ UniqueConstraint("vendor_id", "job_type", "marketplace", name="uq_scheduled_job"),
+ )
+```
+
+---
+
+## Implementation Roadmap
+
+### Phase 1: Product Import (Weeks 1-2)
+
+**Goal:** Multi-marketplace product import with translations and digital product support.
+
+| Task | Priority | Status |
+|------|----------|--------|
+| Add product_type, is_digital fields to marketplace_products | High | [ ] |
+| Create marketplace_product_translations table | High | [ ] |
+| Create product_translations table | High | [ ] |
+| Implement BaseMarketplaceImporter pattern | High | [ ] |
+| Refactor LetzshopImporter from CSV processor | High | [ ] |
+| Add CodesWholesale catalog importer | High | [ ] |
+| Implement vendor override pattern on products | Medium | [ ] |
+| Add translation override support | Medium | [ ] |
+| Update API endpoints for translations | Medium | [ ] |
+
+**Detailed tasks:** See [Multi-Marketplace Product Architecture](../development/migration/multi-marketplace-product-architecture.md)
+
+### Phase 2: Order Import (Weeks 3-4)
+
+**Goal:** Unified order management with multi-channel support.
+
+| Task | Priority | Status |
+|------|----------|--------|
+| Design unified orders schema | High | [ ] |
+| Create orders, order_items, order_status_history tables | High | [ ] |
+| Implement BaseOrderImporter pattern | High | [ ] |
+| Implement LetzshopOrderImporter (GraphQL) | High | [ ] |
+| Create order polling scheduler | High | [ ] |
+| Build order list/detail vendor UI | Medium | [ ] |
+| Add order notifications | Medium | [ ] |
+| Implement order search and filtering | Medium | [ ] |
+
+### Phase 3: Order Fulfillment Sync (Weeks 5-6)
+
+**Goal:** Sync fulfillment status back to marketplaces, handle digital delivery.
+
+| Task | Priority | Status |
+|------|----------|--------|
+| Implement FulfillmentService | High | [ ] |
+| Implement DigitalFulfillmentService | High | [ ] |
+| Integrate CodesWholesale key purchase API | High | [ ] |
+| Create fulfillment sync queue | High | [ ] |
+| Implement LetzshopFulfillmentSync | High | [ ] |
+| Build fulfillment UI (mark shipped, add tracking) | Medium | [ ] |
+| Digital fulfillment email templates | Medium | [ ] |
+| Fulfillment retry logic | Medium | [ ] |
+
+### Phase 4: Inventory Sync (Weeks 7-8)
+
+**Goal:** Real-time and scheduled inventory sync to marketplaces.
+
+| Task | Priority | Status |
+|------|----------|--------|
+| Create inventory_sync_configs table | High | [ ] |
+| Create inventory_sync_logs table | High | [ ] |
+| Implement InventorySyncService | High | [ ] |
+| Implement LetzshopInventorySyncAdapter | High | [ ] |
+| Create inventory change event system | High | [ ] |
+| Build sync configuration UI | Medium | [ ] |
+| Add sync status dashboard | Medium | [ ] |
+| Implement real-time sync for future marketplaces | Low | [ ] |
+
+---
+
+## Security Considerations
+
+### Credential Storage
+
+```python
+# All marketplace credentials should be encrypted at rest
+class MarketplaceCredential(Base, TimestampMixin):
+ """Encrypted marketplace credentials."""
+ __tablename__ = "marketplace_credentials"
+
+ id = Column(Integer, primary_key=True)
+ vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
+ marketplace = Column(String, nullable=False)
+
+ # Encrypted using application-level encryption
+ credentials_encrypted = Column(LargeBinary, nullable=False)
+
+ # Metadata (not sensitive)
+ credential_type = Column(String) # 'api_key', 'oauth', 'basic'
+ expires_at = Column(DateTime)
+ is_valid = Column(Boolean, default=True)
+ last_validated_at = Column(DateTime)
+```
+
+### API Rate Limiting
+
+- Respect marketplace API rate limits
+- Implement exponential backoff for failures
+- Queue operations to smooth out request bursts
+
+### Data Privacy
+
+- Customer data from marketplaces should follow GDPR guidelines
+- Implement data retention policies
+- Provide data export/deletion capabilities
+
+---
+
+## Monitoring & Observability
+
+### Key Metrics
+
+| Metric | Description | Alert Threshold |
+|--------|-------------|-----------------|
+| `order_import_lag_seconds` | Time since last successful order poll | > 15 min |
+| `inventory_sync_lag_seconds` | Time since last inventory sync | > 30 min |
+| `fulfillment_sync_queue_depth` | Pending fulfillment syncs | > 100 |
+| `sync_error_rate` | Failed syncs / total syncs | > 5% |
+| `digital_fulfillment_success_rate` | Successful key retrievals | < 95% |
+
+### Health Checks
+
+```python
+@router.get("/health/marketplace-integrations")
+async def check_marketplace_health(vendor_id: int):
+ """Check health of marketplace integrations."""
+ return {
+ "letzshop": {
+ "order_import": check_last_sync("letzshop", "order_import"),
+ "inventory_sync": check_last_sync("letzshop", "inventory_sync"),
+ },
+ "codeswholesale": {
+ "catalog_sync": check_last_sync("codeswholesale", "catalog"),
+ "api_status": await check_codeswholesale_api(),
+ }
+ }
+```
+
+---
+
+## Appendix A: CodesWholesale Integration Details
+
+### API Endpoints Used
+
+| Endpoint | Purpose | Frequency |
+|----------|---------|-----------|
+| `GET /products` | Fetch full catalog | Every 6 hours |
+| `GET /products/{id}` | Get single product details | On-demand |
+| `POST /orders` | Purchase license key | On order fulfillment |
+| `GET /orders/{id}` | Check order status | After purchase |
+| `GET /account/balance` | Check account balance | Periodically |
+
+### Product Catalog Mapping
+
+```python
+def map_codeswholesale_product(cw_product: dict) -> dict:
+ """Map CodesWholesale product to marketplace_product format."""
+ return {
+ "marketplace_product_id": f"cw_{cw_product['productId']}",
+ "marketplace": "codeswholesale",
+ "gtin": cw_product.get("ean"),
+ "product_type": "digital",
+ "is_digital": True,
+ "digital_delivery_method": "license_key",
+ "platform": cw_product.get("platform", "").lower(), # steam, origin, etc.
+ "region_restrictions": cw_product.get("regions"),
+ "price": cw_product["prices"][0]["value"], # Supplier cost
+ "currency": cw_product["prices"][0]["currency"],
+ "availability": "in_stock" if cw_product["quantity"] > 0 else "out_of_stock",
+ "attributes": {
+ "languages": cw_product.get("languages"),
+ "release_date": cw_product.get("releaseDate"),
+ }
+ }
+```
+
+---
+
+## Appendix B: Status Mapping Reference
+
+### Order Status Mapping
+
+| Wizamart Status | Letzshop | Amazon | eBay |
+|-----------------|----------|--------|------|
+| PENDING | PENDING | Pending | - |
+| CONFIRMED | PAID | Unshipped | Paid |
+| PROCESSING | PROCESSING | - | - |
+| SHIPPED | SHIPPED | Shipped | Shipped |
+| DELIVERED | DELIVERED | - | Delivered |
+| FULFILLED | - | - | - |
+| CANCELLED | CANCELLED | Cancelled | Cancelled |
+| REFUNDED | REFUNDED | Refunded | Refunded |
+
+---
+
+## Related Documents
+
+- [Multi-Marketplace Product Architecture](../development/migration/multi-marketplace-product-architecture.md) - Detailed product data model
+- [Vendor Contact Inheritance](../development/migration/vendor-contact-inheritance.md) - Override pattern reference
+- [Database Migrations](../development/migration/database-migrations.md) - Migration guidelines
diff --git a/docs/development/migration/multi-marketplace-product-architecture.md b/docs/development/migration/multi-marketplace-product-architecture.md
new file mode 100644
index 00000000..7adf6573
--- /dev/null
+++ b/docs/development/migration/multi-marketplace-product-architecture.md
@@ -0,0 +1,1577 @@
+# Multi-Marketplace Product Architecture
+
+## Overview
+
+This document outlines the implementation plan for evolving the product management system to support:
+
+1. **Multiple Marketplaces**: Letzshop, Amazon, eBay, and future sources
+2. **Multi-Language Support**: Localized titles, descriptions with language fallback
+3. **Vendor Override Pattern**: Override any field with reset-to-source capability
+4. **Digital Products**: Support for digital goods (games, gift cards, downloadable content)
+5. **Universal Product Model**: Marketplace-agnostic canonical product representation
+
+## Core Principles (Preserved)
+
+| Principle | Description |
+|-----------|-------------|
+| **Separation of Concerns** | Raw marketplace data in source tables; vendor customizations in `products` |
+| **Multi-Vendor Support** | Same marketplace product can appear in multiple vendor catalogs |
+| **Idempotent Imports** | Re-importing CSV updates existing records, never duplicates |
+| **Asynchronous Processing** | Large imports run in background tasks |
+
+---
+
+## Architecture Overview
+
+```mermaid
+graph TB
+ subgraph "Source Layer (Raw Data)"
+ LS[Letzshop CSV]
+ AZ[Amazon API]
+ EB[eBay API]
+ end
+
+ subgraph "Import Layer"
+ LSI[LetzshopImporter]
+ AZI[AmazonImporter]
+ EBI[EbayImporter]
+ end
+
+ subgraph "Canonical Layer (Normalized)"
+ MP[marketplace_products]
+ MPT[marketplace_product_translations]
+ end
+
+ subgraph "Vendor Layer (Overrides)"
+ P[products]
+ PT[product_translations]
+ end
+
+ LS --> LSI
+ AZ --> AZI
+ EB --> EBI
+
+ LSI --> MP
+ AZI --> MP
+ EBI --> MP
+
+ MP --> MPT
+ MP --> P
+ P --> PT
+```
+
+---
+
+## Database Schema Design
+
+### Phase 1: Enhanced Marketplace Products
+
+#### 1.1 Updated `marketplace_products` Table
+
+Evolve the existing table to be marketplace-agnostic:
+
+```python
+# models/database/marketplace_product.py
+
+class ProductType(str, Enum):
+ """Product type enumeration."""
+ PHYSICAL = "physical"
+ DIGITAL = "digital"
+ SERVICE = "service"
+ SUBSCRIPTION = "subscription"
+
+class DigitalDeliveryMethod(str, Enum):
+ """Digital product delivery methods."""
+ DOWNLOAD = "download"
+ EMAIL = "email"
+ IN_APP = "in_app"
+ STREAMING = "streaming"
+ LICENSE_KEY = "license_key"
+
+class MarketplaceProduct(Base, TimestampMixin):
+ __tablename__ = "marketplace_products"
+
+ id = Column(Integer, primary_key=True, index=True)
+
+ # === UNIVERSAL IDENTIFIERS ===
+ marketplace_product_id = Column(String, unique=True, index=True, nullable=False)
+ gtin = Column(String, index=True) # EAN/UPC - primary cross-marketplace matching
+ mpn = Column(String, index=True) # Manufacturer Part Number
+ sku = Column(String, index=True) # Internal SKU if assigned
+
+ # === SOURCE TRACKING ===
+ marketplace = Column(String, index=True, nullable=False) # 'letzshop', 'amazon', 'ebay'
+ source_url = Column(String) # Original product URL
+ vendor_name = Column(String, index=True) # Seller/vendor in marketplace
+
+ # === PRODUCT TYPE (NEW) ===
+ product_type = Column(
+ SQLEnum(ProductType, name="product_type_enum"),
+ default=ProductType.PHYSICAL,
+ nullable=False,
+ index=True
+ )
+
+ # === DIGITAL PRODUCT FIELDS (NEW) ===
+ is_digital = Column(Boolean, default=False, index=True)
+ digital_delivery_method = Column(
+ SQLEnum(DigitalDeliveryMethod, name="digital_delivery_enum"),
+ nullable=True
+ )
+ download_url = Column(String) # For downloadable products
+ license_type = Column(String) # e.g., "single_use", "subscription", "lifetime"
+ platform = Column(String) # e.g., "steam", "playstation", "xbox", "universal"
+ region_restrictions = Column(JSON) # ["EU", "US"] or null for global
+
+ # === NON-LOCALIZED FIELDS ===
+ brand = Column(String, index=True)
+ google_product_category = Column(String, index=True)
+ category_path = Column(String) # Normalized category hierarchy
+ condition = Column(String)
+
+ # === PRICING ===
+ price = Column(Float) # Normalized numeric price
+ price_raw = Column(String) # Original "19.99 EUR" format
+ sale_price = Column(Float)
+ sale_price_raw = Column(String)
+ currency = Column(String(3), default='EUR')
+
+ # === MEDIA ===
+ image_link = Column(String)
+ additional_image_link = Column(String) # Kept for backward compat
+ additional_images = Column(JSON) # Array of image URLs (new)
+
+ # === PRODUCT ATTRIBUTES (Flexible) ===
+ attributes = Column(JSON) # {color, size, material, etc.}
+
+ # === PHYSICAL PRODUCT FIELDS ===
+ weight = Column(Float) # In kg
+ weight_unit = Column(String, default='kg')
+ dimensions = Column(JSON) # {length, width, height, unit}
+
+ # === GOOGLE SHOPPING FIELDS (Preserved) ===
+ # These remain for Letzshop compatibility
+ link = Column(String)
+ availability = Column(String, index=True)
+ adult = Column(String)
+ multipack = Column(Integer)
+ is_bundle = Column(String)
+ age_group = Column(String)
+ color = Column(String)
+ gender = Column(String)
+ material = Column(String)
+ pattern = Column(String)
+ size = Column(String)
+ size_type = Column(String)
+ size_system = Column(String)
+ item_group_id = Column(String)
+ product_type_raw = Column(String) # Renamed from product_type
+ custom_label_0 = Column(String)
+ custom_label_1 = Column(String)
+ custom_label_2 = Column(String)
+ custom_label_3 = Column(String)
+ custom_label_4 = Column(String)
+ unit_pricing_measure = Column(String)
+ unit_pricing_base_measure = Column(String)
+ identifier_exists = Column(String)
+ shipping = Column(String)
+
+ # === STATUS ===
+ is_active = Column(Boolean, default=True, index=True)
+
+ # === RELATIONSHIPS ===
+ translations = relationship(
+ "MarketplaceProductTranslation",
+ back_populates="marketplace_product",
+ cascade="all, delete-orphan"
+ )
+ vendor_products = relationship("Product", back_populates="marketplace_product")
+
+ # === INDEXES ===
+ __table_args__ = (
+ Index("idx_mp_marketplace_vendor", "marketplace", "vendor_name"),
+ Index("idx_mp_marketplace_brand", "marketplace", "brand"),
+ Index("idx_mp_gtin_marketplace", "gtin", "marketplace"),
+ Index("idx_mp_product_type", "product_type", "is_digital"),
+ )
+```
+
+#### 1.2 New `marketplace_product_translations` Table
+
+```python
+# models/database/marketplace_product_translation.py
+
+class MarketplaceProductTranslation(Base, TimestampMixin):
+ """Localized content for marketplace products."""
+ __tablename__ = "marketplace_product_translations"
+
+ id = Column(Integer, primary_key=True, index=True)
+ marketplace_product_id = Column(
+ Integer,
+ ForeignKey("marketplace_products.id", ondelete="CASCADE"),
+ nullable=False
+ )
+ language = Column(String(5), nullable=False) # 'en', 'fr', 'de', 'lb'
+
+ # === LOCALIZED CONTENT ===
+ title = Column(String, nullable=False)
+ description = Column(Text)
+ short_description = Column(String(500))
+
+ # === SEO FIELDS ===
+ meta_title = Column(String(70))
+ meta_description = Column(String(160))
+ url_slug = Column(String(255))
+
+ # === SOURCE TRACKING ===
+ source_import_id = Column(Integer) # Which import job provided this
+ source_file = Column(String) # e.g., "letzshop_fr.csv"
+
+ # === RELATIONSHIPS ===
+ marketplace_product = relationship(
+ "MarketplaceProduct",
+ back_populates="translations"
+ )
+
+ __table_args__ = (
+ UniqueConstraint(
+ "marketplace_product_id", "language",
+ name="uq_marketplace_product_translation"
+ ),
+ Index("idx_mpt_language", "language"),
+ )
+```
+
+### Phase 2: Enhanced Vendor Products with Override Pattern
+
+#### 2.1 Updated `products` Table
+
+```python
+# models/database/product.py
+
+class Product(Base, TimestampMixin):
+ """Vendor-specific product with override capability."""
+ __tablename__ = "products"
+
+ id = Column(Integer, primary_key=True, index=True)
+ vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
+ marketplace_product_id = Column(
+ Integer,
+ ForeignKey("marketplace_products.id"),
+ nullable=False
+ )
+
+ # === VENDOR REFERENCE ===
+ vendor_sku = Column(String, index=True) # Vendor's internal SKU
+
+ # === OVERRIDABLE FIELDS (NULL = inherit from marketplace_product) ===
+ # Pricing
+ price = Column(Float)
+ sale_price = Column(Float)
+ currency = Column(String(3))
+
+ # Product Info
+ brand = Column(String)
+ condition = Column(String)
+ availability = Column(String)
+
+ # Media
+ primary_image_url = Column(String)
+ additional_images = Column(JSON)
+
+ # Digital Product Overrides
+ download_url = Column(String)
+ license_type = Column(String)
+
+ # === VENDOR-SPECIFIC (No inheritance) ===
+ is_featured = Column(Boolean, default=False)
+ is_active = Column(Boolean, default=True)
+ display_order = Column(Integer, default=0)
+
+ # Inventory Settings
+ min_quantity = Column(Integer, default=1)
+ max_quantity = Column(Integer)
+
+ # Digital Fulfillment
+ fulfillment_email_template = Column(String) # For digital delivery
+
+ # === RELATIONSHIPS ===
+ vendor = relationship("Vendor", back_populates="products")
+ marketplace_product = relationship(
+ "MarketplaceProduct",
+ back_populates="vendor_products"
+ )
+ translations = relationship(
+ "ProductTranslation",
+ back_populates="product",
+ cascade="all, delete-orphan"
+ )
+ inventory_entries = relationship(
+ "Inventory",
+ back_populates="product",
+ cascade="all, delete-orphan"
+ )
+
+ __table_args__ = (
+ UniqueConstraint(
+ "vendor_id", "marketplace_product_id",
+ name="uq_vendor_marketplace_product"
+ ),
+ Index("idx_product_vendor_active", "vendor_id", "is_active"),
+ Index("idx_product_vendor_featured", "vendor_id", "is_featured"),
+ Index("idx_product_vendor_sku", "vendor_id", "vendor_sku"),
+ )
+
+ # === EFFECTIVE PROPERTIES (Override Pattern) ===
+
+ OVERRIDABLE_FIELDS = [
+ "price", "sale_price", "currency", "brand", "condition",
+ "availability", "primary_image_url", "additional_images",
+ "download_url", "license_type"
+ ]
+
+ @property
+ def effective_price(self) -> float | None:
+ """Get price (vendor override or marketplace fallback)."""
+ if self.price is not None:
+ return self.price
+ return self.marketplace_product.price if self.marketplace_product else None
+
+ @property
+ def effective_sale_price(self) -> float | None:
+ if self.sale_price is not None:
+ return self.sale_price
+ return self.marketplace_product.sale_price if self.marketplace_product else None
+
+ @property
+ def effective_currency(self) -> str:
+ if self.currency is not None:
+ return self.currency
+ return self.marketplace_product.currency if self.marketplace_product else "EUR"
+
+ @property
+ def effective_brand(self) -> str | None:
+ if self.brand is not None:
+ return self.brand
+ return self.marketplace_product.brand if self.marketplace_product else None
+
+ @property
+ def effective_condition(self) -> str | None:
+ if self.condition is not None:
+ return self.condition
+ return self.marketplace_product.condition if self.marketplace_product else None
+
+ @property
+ def effective_availability(self) -> str | None:
+ if self.availability is not None:
+ return self.availability
+ return self.marketplace_product.availability if self.marketplace_product else None
+
+ @property
+ def effective_primary_image_url(self) -> str | None:
+ if self.primary_image_url is not None:
+ return self.primary_image_url
+ return self.marketplace_product.image_link if self.marketplace_product else None
+
+ @property
+ def effective_additional_images(self) -> list | None:
+ if self.additional_images is not None:
+ return self.additional_images
+ mp = self.marketplace_product
+ if mp and mp.additional_images:
+ return mp.additional_images
+ return None
+
+ @property
+ def is_digital(self) -> bool:
+ """Check if this is a digital product."""
+ return self.marketplace_product.is_digital if self.marketplace_product else False
+
+ @property
+ def product_type(self) -> str:
+ """Get product type from marketplace product."""
+ return self.marketplace_product.product_type if self.marketplace_product else "physical"
+
+ def get_override_info(self) -> dict:
+ """
+ Get all fields with inheritance flags.
+ Similar to Vendor.get_contact_info_with_inheritance()
+ """
+ mp = self.marketplace_product
+ return {
+ # Price
+ "price": self.effective_price,
+ "price_overridden": self.price is not None,
+ "price_source": mp.price if mp else None,
+
+ # Sale Price
+ "sale_price": self.effective_sale_price,
+ "sale_price_overridden": self.sale_price is not None,
+ "sale_price_source": mp.sale_price if mp else None,
+
+ # Currency
+ "currency": self.effective_currency,
+ "currency_overridden": self.currency is not None,
+ "currency_source": mp.currency if mp else None,
+
+ # Brand
+ "brand": self.effective_brand,
+ "brand_overridden": self.brand is not None,
+ "brand_source": mp.brand if mp else None,
+
+ # Condition
+ "condition": self.effective_condition,
+ "condition_overridden": self.condition is not None,
+ "condition_source": mp.condition if mp else None,
+
+ # Availability
+ "availability": self.effective_availability,
+ "availability_overridden": self.availability is not None,
+ "availability_source": mp.availability if mp else None,
+
+ # Images
+ "primary_image_url": self.effective_primary_image_url,
+ "primary_image_url_overridden": self.primary_image_url is not None,
+ "primary_image_url_source": mp.image_link if mp else None,
+
+ # Product type info
+ "is_digital": self.is_digital,
+ "product_type": self.product_type,
+ }
+
+ def reset_field_to_source(self, field_name: str) -> bool:
+ """Reset a single field to inherit from marketplace product."""
+ if field_name in self.OVERRIDABLE_FIELDS:
+ setattr(self, field_name, None)
+ return True
+ return False
+
+ def reset_all_to_source(self) -> None:
+ """Reset all overridable fields to inherit from marketplace product."""
+ for field in self.OVERRIDABLE_FIELDS:
+ setattr(self, field, None)
+```
+
+#### 2.2 New `product_translations` Table
+
+```python
+# models/database/product_translation.py
+
+class ProductTranslation(Base, TimestampMixin):
+ """Vendor-specific localized content with override capability."""
+ __tablename__ = "product_translations"
+
+ id = Column(Integer, primary_key=True, index=True)
+ product_id = Column(
+ Integer,
+ ForeignKey("products.id", ondelete="CASCADE"),
+ nullable=False
+ )
+ language = Column(String(5), nullable=False)
+
+ # === OVERRIDABLE LOCALIZED FIELDS (NULL = inherit) ===
+ title = Column(String)
+ description = Column(Text)
+ short_description = Column(String(500))
+
+ # SEO Overrides
+ meta_title = Column(String(70))
+ meta_description = Column(String(160))
+ url_slug = Column(String(255))
+
+ # === RELATIONSHIPS ===
+ product = relationship("Product", back_populates="translations")
+
+ __table_args__ = (
+ UniqueConstraint("product_id", "language", name="uq_product_translation"),
+ Index("idx_pt_product_language", "product_id", "language"),
+ )
+
+ OVERRIDABLE_FIELDS = [
+ "title", "description", "short_description",
+ "meta_title", "meta_description", "url_slug"
+ ]
+
+ def get_effective_title(self) -> str | None:
+ """Get title with fallback to marketplace translation."""
+ if self.title is not None:
+ return self.title
+ return self._get_marketplace_translation_field("title")
+
+ def get_effective_description(self) -> str | None:
+ if self.description is not None:
+ return self.description
+ return self._get_marketplace_translation_field("description")
+
+ def _get_marketplace_translation_field(self, field: str) -> str | None:
+ """Helper to get field from marketplace translation."""
+ mp = self.product.marketplace_product
+ if mp:
+ for t in mp.translations:
+ if t.language == self.language:
+ return getattr(t, field, None)
+ return None
+
+ def get_override_info(self) -> dict:
+ """Get all fields with inheritance flags."""
+ return {
+ "title": self.get_effective_title(),
+ "title_overridden": self.title is not None,
+ "description": self.get_effective_description(),
+ "description_overridden": self.description is not None,
+ # ... similar for other fields
+ }
+
+ def reset_to_source(self) -> None:
+ """Reset all fields to inherit from marketplace translation."""
+ for field in self.OVERRIDABLE_FIELDS:
+ setattr(self, field, None)
+```
+
+---
+
+## Pydantic Schemas
+
+### 3.1 Product Update Schema with Reset
+
+```python
+# models/schema/product.py
+
+class ProductUpdate(BaseModel):
+ """Update product with override and reset capabilities."""
+
+ # === OVERRIDABLE FIELDS ===
+ price: float | None = None
+ sale_price: float | None = None
+ currency: str | None = None
+ brand: str | None = None
+ condition: str | None = None
+ availability: str | None = None
+ primary_image_url: str | None = None
+ additional_images: list[str] | None = None
+ download_url: str | None = None
+ license_type: str | None = None
+
+ # === VENDOR-SPECIFIC FIELDS ===
+ vendor_sku: str | None = None
+ is_featured: bool | None = None
+ is_active: bool | None = None
+ display_order: int | None = None
+ min_quantity: int | None = None
+ max_quantity: int | None = None
+ fulfillment_email_template: str | None = None
+
+ # === RESET CONTROLS ===
+ reset_all_to_source: bool | None = Field(
+ None,
+ description="Reset ALL overridable fields to marketplace source values"
+ )
+ reset_fields: list[str] | None = Field(
+ None,
+ description="List of specific fields to reset: ['price', 'brand']"
+ )
+
+ model_config = ConfigDict(extra="forbid")
+
+ @field_validator("reset_fields")
+ @classmethod
+ def validate_reset_fields(cls, v):
+ if v:
+ valid_fields = Product.OVERRIDABLE_FIELDS
+ invalid = [f for f in v if f not in valid_fields]
+ if invalid:
+ raise ValueError(f"Invalid reset fields: {invalid}. Valid: {valid_fields}")
+ return v
+
+
+class ProductTranslationUpdate(BaseModel):
+ """Update product translation with reset capability."""
+
+ title: str | None = None
+ description: str | None = None
+ short_description: str | None = None
+ meta_title: str | None = None
+ meta_description: str | None = None
+ url_slug: str | None = None
+
+ reset_to_source: bool | None = Field(
+ None,
+ description="Reset this translation to marketplace source"
+ )
+ reset_fields: list[str] | None = Field(
+ None,
+ description="List of specific translation fields to reset"
+ )
+
+
+class ProductDetailResponse(BaseModel):
+ """Detailed product response with override information."""
+
+ id: int
+ vendor_id: int
+ marketplace_product_id: int
+ vendor_sku: str | None
+
+ # === EFFECTIVE VALUES WITH INHERITANCE FLAGS ===
+ price: float | None
+ price_overridden: bool
+ price_source: float | None
+
+ sale_price: float | None
+ sale_price_overridden: bool
+ sale_price_source: float | None
+
+ currency: str
+ currency_overridden: bool
+ currency_source: str | None
+
+ brand: str | None
+ brand_overridden: bool
+ brand_source: str | None
+
+ condition: str | None
+ condition_overridden: bool
+ condition_source: str | None
+
+ availability: str | None
+ availability_overridden: bool
+ availability_source: str | None
+
+ primary_image_url: str | None
+ primary_image_url_overridden: bool
+ primary_image_url_source: str | None
+
+ # === PRODUCT TYPE INFO ===
+ is_digital: bool
+ product_type: str
+
+ # === VENDOR-SPECIFIC ===
+ is_featured: bool
+ is_active: bool
+ display_order: int
+ min_quantity: int
+ max_quantity: int | None
+
+ # === TRANSLATIONS ===
+ translations: list["ProductTranslationResponse"]
+
+ # === TIMESTAMPS ===
+ created_at: datetime
+ updated_at: datetime
+
+
+class ProductTranslationResponse(BaseModel):
+ """Translation response with override information."""
+
+ language: str
+
+ title: str | None
+ title_overridden: bool
+
+ description: str | None
+ description_overridden: bool
+
+ short_description: str | None
+ short_description_overridden: bool
+
+ meta_title: str | None
+ meta_title_overridden: bool
+
+ meta_description: str | None
+ meta_description_overridden: bool
+
+ url_slug: str | None
+ url_slug_overridden: bool
+```
+
+---
+
+## Service Layer Implementation
+
+### 4.1 Product Service with Reset Logic
+
+```python
+# app/services/product_service.py
+
+class ProductService:
+ """Service for managing vendor products with override pattern."""
+
+ def update_product(
+ self,
+ db: Session,
+ vendor_id: int,
+ product_id: int,
+ update_data: ProductUpdate
+ ) -> Product:
+ """
+ Update product with override and reset support.
+
+ Reset behavior:
+ - reset_all_to_source=True: Resets ALL overridable fields
+ - reset_fields=['price', 'brand']: Resets specific fields
+ - Setting field to empty string: Resets that field
+ """
+ product = db.query(Product).filter(
+ Product.id == product_id,
+ Product.vendor_id == vendor_id
+ ).first()
+
+ if not product:
+ raise NotFoundError("Product not found")
+
+ data = update_data.model_dump(exclude_unset=True)
+
+ # Handle reset_all_to_source
+ if data.pop("reset_all_to_source", False):
+ product.reset_all_to_source()
+
+ # Handle reset_fields (selective reset)
+ reset_fields = data.pop("reset_fields", None)
+ if reset_fields:
+ for field in reset_fields:
+ product.reset_field_to_source(field)
+
+ # Handle empty strings = reset (like vendor pattern)
+ for field in Product.OVERRIDABLE_FIELDS:
+ if field in data and data[field] == "":
+ data[field] = None
+
+ # Apply remaining updates
+ for key, value in data.items():
+ if hasattr(product, key):
+ setattr(product, key, value)
+
+ db.flush()
+ return product
+
+ def update_product_translation(
+ self,
+ db: Session,
+ product_id: int,
+ language: str,
+ update_data: ProductTranslationUpdate
+ ) -> ProductTranslation:
+ """Update product translation with reset support."""
+ translation = db.query(ProductTranslation).filter(
+ ProductTranslation.product_id == product_id,
+ ProductTranslation.language == language
+ ).first()
+
+ if not translation:
+ # Create new translation if doesn't exist
+ translation = ProductTranslation(
+ product_id=product_id,
+ language=language
+ )
+ db.add(translation)
+
+ data = update_data.model_dump(exclude_unset=True)
+
+ # Handle reset_to_source
+ if data.pop("reset_to_source", False):
+ translation.reset_to_source()
+
+ # Handle reset_fields
+ reset_fields = data.pop("reset_fields", None)
+ if reset_fields:
+ for field in reset_fields:
+ if field in ProductTranslation.OVERRIDABLE_FIELDS:
+ setattr(translation, field, None)
+
+ # Handle empty strings
+ for field in ProductTranslation.OVERRIDABLE_FIELDS:
+ if field in data and data[field] == "":
+ data[field] = None
+
+ # Apply updates
+ for key, value in data.items():
+ if hasattr(translation, key):
+ setattr(translation, key, value)
+
+ db.flush()
+ return translation
+
+ def build_detail_response(self, product: Product) -> ProductDetailResponse:
+ """Build detailed response with override information."""
+ override_info = product.get_override_info()
+
+ translations = []
+ for t in product.translations:
+ trans_info = t.get_override_info()
+ translations.append(ProductTranslationResponse(
+ language=t.language,
+ **trans_info
+ ))
+
+ return ProductDetailResponse(
+ id=product.id,
+ vendor_id=product.vendor_id,
+ marketplace_product_id=product.marketplace_product_id,
+ vendor_sku=product.vendor_sku,
+ **override_info,
+ is_featured=product.is_featured,
+ is_active=product.is_active,
+ display_order=product.display_order,
+ min_quantity=product.min_quantity,
+ max_quantity=product.max_quantity,
+ translations=translations,
+ created_at=product.created_at,
+ updated_at=product.updated_at,
+ )
+```
+
+---
+
+## Import Architecture
+
+### 5.1 Base Importer Pattern
+
+```python
+# app/utils/marketplace_importers/base.py
+
+from abc import ABC, abstractmethod
+from typing import Generator
+
+class BaseMarketplaceImporter(ABC):
+ """Abstract base class for marketplace importers."""
+
+ marketplace_name: str = ""
+
+ @abstractmethod
+ def parse_row(self, row: dict) -> dict:
+ """
+ Parse marketplace-specific row into intermediate format.
+ Override in subclasses for each marketplace.
+ """
+ pass
+
+ @abstractmethod
+ def get_product_identifier(self, row: dict) -> str:
+ """Get unique product identifier from marketplace."""
+ pass
+
+ @abstractmethod
+ def get_language(self) -> str:
+ """Get language code for this import."""
+ pass
+
+ def normalize_gtin(self, gtin: str | None) -> str | None:
+ """Normalize GTIN to standard format."""
+ if not gtin:
+ return None
+ gtin = str(gtin).strip().replace(" ", "")
+ if gtin.endswith(".0"):
+ gtin = gtin[:-2]
+ if len(gtin) in (8, 12, 13, 14) and gtin.isdigit():
+ return gtin.zfill(14) # Pad to GTIN-14
+ return gtin
+
+ def parse_price(self, price_str: str | None) -> tuple[float | None, str]:
+ """Parse price string into amount and currency."""
+ if not price_str:
+ return None, "EUR"
+
+ # Handle formats like "19.99 EUR", "EUR 19.99", "19,99€"
+ import re
+ price_str = str(price_str).strip()
+
+ # Extract currency
+ currency = "EUR"
+ currency_patterns = [
+ (r"EUR", "EUR"),
+ (r"USD", "USD"),
+ (r"€", "EUR"),
+ (r"\$", "USD"),
+ (r"£", "GBP"),
+ ]
+ for pattern, curr in currency_patterns:
+ if re.search(pattern, price_str):
+ currency = curr
+ break
+
+ # Extract numeric value
+ numbers = re.findall(r"[\d.,]+", price_str)
+ if numbers:
+ num_str = numbers[0].replace(",", ".")
+ try:
+ return float(num_str), currency
+ except ValueError:
+ pass
+
+ return None, currency
+
+ def to_canonical(self, row: dict) -> dict:
+ """
+ Convert parsed row to canonical MarketplaceProduct format.
+ """
+ parsed = self.parse_row(row)
+ price, currency = self.parse_price(parsed.get("price"))
+ sale_price, _ = self.parse_price(parsed.get("sale_price"))
+
+ return {
+ "marketplace_product_id": self.get_product_identifier(row),
+ "marketplace": self.marketplace_name,
+ "gtin": self.normalize_gtin(parsed.get("gtin")),
+ "mpn": parsed.get("mpn"),
+ "brand": parsed.get("brand"),
+ "price": price,
+ "price_raw": parsed.get("price"),
+ "sale_price": sale_price,
+ "sale_price_raw": parsed.get("sale_price"),
+ "currency": currency,
+ "image_link": parsed.get("image_url"),
+ "condition": parsed.get("condition"),
+ "availability": parsed.get("availability"),
+ "google_product_category": parsed.get("category"),
+ "product_type": self.determine_product_type(parsed),
+ "is_digital": self.is_digital_product(parsed),
+ "attributes": parsed.get("attributes", {}),
+ # Raw fields preserved
+ **{k: v for k, v in parsed.items() if k.startswith("raw_")}
+ }
+
+ def to_translation(self, row: dict) -> dict:
+ """Extract translation data from row."""
+ parsed = self.parse_row(row)
+ return {
+ "language": self.get_language(),
+ "title": parsed.get("title"),
+ "description": parsed.get("description"),
+ "short_description": parsed.get("short_description"),
+ }
+
+ def determine_product_type(self, parsed: dict) -> str:
+ """Determine if product is physical, digital, etc."""
+ # Override in subclass for marketplace-specific logic
+ return "physical"
+
+ def is_digital_product(self, parsed: dict) -> bool:
+ """Check if product is digital."""
+ # Override in subclass
+ return False
+```
+
+### 5.2 Letzshop Importer
+
+```python
+# app/utils/marketplace_importers/letzshop.py
+
+class LetzshopImporter(BaseMarketplaceImporter):
+ """Importer for Letzshop Google Shopping CSV feeds."""
+
+ marketplace_name = "letzshop"
+
+ # Column mapping from Letzshop CSV to internal format
+ COLUMN_MAP = {
+ "g:id": "id",
+ "g:title": "title",
+ "g:description": "description",
+ "g:link": "link",
+ "g:image_link": "image_url",
+ "g:availability": "availability",
+ "g:price": "price",
+ "g:sale_price": "sale_price",
+ "g:brand": "brand",
+ "g:gtin": "gtin",
+ "g:mpn": "mpn",
+ "g:condition": "condition",
+ "g:google_product_category": "category",
+ "g:product_type": "product_type_raw",
+ "g:color": "color",
+ "g:size": "size",
+ "g:material": "material",
+ "g:gender": "gender",
+ "g:age_group": "age_group",
+ "g:item_group_id": "item_group_id",
+ "g:additional_image_link": "additional_images",
+ "g:custom_label_0": "custom_label_0",
+ "g:custom_label_1": "custom_label_1",
+ "g:custom_label_2": "custom_label_2",
+ "g:custom_label_3": "custom_label_3",
+ "g:custom_label_4": "custom_label_4",
+ }
+
+ def __init__(self, language: str = "en"):
+ self.language = language
+
+ def get_language(self) -> str:
+ return self.language
+
+ def get_product_identifier(self, row: dict) -> str:
+ return row.get("g:id") or row.get("id")
+
+ def parse_row(self, row: dict) -> dict:
+ """Parse Letzshop CSV row."""
+ result = {}
+
+ for csv_col, internal_key in self.COLUMN_MAP.items():
+ value = row.get(csv_col) or row.get(csv_col.replace("g:", ""))
+ if value:
+ result[internal_key] = value
+
+ # Build attributes dict
+ result["attributes"] = {
+ k: result.pop(k) for k in ["color", "size", "material", "gender", "age_group"]
+ if k in result
+ }
+
+ return result
+
+ def determine_product_type(self, parsed: dict) -> str:
+ """Detect digital products from Letzshop data."""
+ category = (parsed.get("category") or "").lower()
+ product_type = (parsed.get("product_type_raw") or "").lower()
+ title = (parsed.get("title") or "").lower()
+
+ digital_keywords = [
+ "digital", "download", "ebook", "e-book", "software",
+ "license", "subscription", "gift card", "voucher",
+ "game key", "steam", "playstation", "xbox", "nintendo"
+ ]
+
+ combined = f"{category} {product_type} {title}"
+ if any(kw in combined for kw in digital_keywords):
+ return "digital"
+
+ return "physical"
+
+ def is_digital_product(self, parsed: dict) -> bool:
+ return self.determine_product_type(parsed) == "digital"
+```
+
+### 5.3 Amazon Importer (Future)
+
+```python
+# app/utils/marketplace_importers/amazon.py
+
+class AmazonImporter(BaseMarketplaceImporter):
+ """Importer for Amazon product feeds."""
+
+ marketplace_name = "amazon"
+
+ def __init__(self, language: str = "en"):
+ self.language = language
+
+ def get_language(self) -> str:
+ return self.language
+
+ def get_product_identifier(self, row: dict) -> str:
+ return f"amazon_{row.get('asin')}"
+
+ def parse_row(self, row: dict) -> dict:
+ """Parse Amazon product data."""
+ # Amazon has different field names
+ bullet_points = row.get("bullet_points", [])
+ description = row.get("product_description", "")
+
+ # Combine bullet points into description if needed
+ if bullet_points and not description:
+ description = "\n".join(f"• {bp}" for bp in bullet_points)
+
+ return {
+ "id": row.get("asin"),
+ "title": row.get("item_name"),
+ "description": description,
+ "short_description": bullet_points[0] if bullet_points else None,
+ "price": row.get("list_price"),
+ "sale_price": row.get("deal_price"),
+ "brand": row.get("brand_name"),
+ "gtin": row.get("product_id"), # Amazon uses different field
+ "image_url": row.get("main_image_url"),
+ "additional_images": row.get("other_image_urls", []),
+ "category": row.get("browse_node_name"),
+ "condition": "new", # Amazon typically new
+ "availability": "in_stock" if row.get("availability") == "IN_STOCK" else "out_of_stock",
+ "attributes": {
+ "color": row.get("color"),
+ "size": row.get("size"),
+ "material": row.get("material"),
+ },
+ # Amazon-specific fields preserved
+ "raw_parent_asin": row.get("parent_asin"),
+ "raw_variation_theme": row.get("variation_theme"),
+ }
+
+ def determine_product_type(self, parsed: dict) -> str:
+ """Amazon has explicit product type indicators."""
+ # Amazon provides explicit digital flags
+ if parsed.get("raw_is_digital"):
+ return "digital"
+
+ category = (parsed.get("category") or "").lower()
+ if "digital" in category or "software" in category:
+ return "digital"
+
+ return "physical"
+```
+
+### 5.4 Importer Factory
+
+```python
+# app/utils/marketplace_importers/__init__.py
+
+from .base import BaseMarketplaceImporter
+from .letzshop import LetzshopImporter
+
+# Registry of available importers
+IMPORTER_REGISTRY = {
+ "letzshop": LetzshopImporter,
+ # "amazon": AmazonImporter, # Future
+ # "ebay": EbayImporter, # Future
+}
+
+def get_importer(
+ marketplace: str,
+ language: str = "en"
+) -> BaseMarketplaceImporter:
+ """
+ Factory function to get appropriate importer.
+
+ Args:
+ marketplace: Marketplace name (letzshop, amazon, ebay)
+ language: Language code for import
+
+ Returns:
+ Configured importer instance
+
+ Raises:
+ ValueError: If marketplace not supported
+ """
+ importer_class = IMPORTER_REGISTRY.get(marketplace.lower())
+ if not importer_class:
+ supported = list(IMPORTER_REGISTRY.keys())
+ raise ValueError(
+ f"Unsupported marketplace: {marketplace}. "
+ f"Supported: {supported}"
+ )
+
+ return importer_class(language=language)
+```
+
+---
+
+## Digital Products Handling
+
+### 6.1 Digital Product Considerations
+
+| Aspect | Physical Products | Digital Products |
+|--------|-------------------|------------------|
+| **Inventory** | Track stock by location | Unlimited or license-based |
+| **Fulfillment** | Shipping required | Instant delivery (email/download) |
+| **Returns** | Physical return process | Different policy (often non-refundable) |
+| **Variants** | Size, color, material | Platform, region, edition |
+| **Pricing** | Per unit | Per license/subscription |
+
+### 6.2 Digital Product Fields
+
+```python
+# In MarketplaceProduct model
+
+# Digital product classification
+product_type = Column(Enum(ProductType)) # physical, digital, service, subscription
+is_digital = Column(Boolean, default=False)
+
+# Digital delivery
+digital_delivery_method = Column(Enum(DigitalDeliveryMethod))
+# Options: download, email, in_app, streaming, license_key
+
+# Platform/Region restrictions
+platform = Column(String) # steam, playstation, xbox, nintendo, universal
+region_restrictions = Column(JSON) # ["EU", "US"] or null for global
+
+# License information
+license_type = Column(String) # single_use, subscription, lifetime
+license_duration_days = Column(Integer) # For subscriptions
+```
+
+### 6.3 Digital Product Import Detection
+
+```python
+def detect_digital_product(row: dict) -> tuple[bool, str | None, str | None]:
+ """
+ Detect if product is digital and extract delivery info.
+
+ Returns:
+ (is_digital, delivery_method, platform)
+ """
+ title = (row.get("title") or "").lower()
+ category = (row.get("category") or "").lower()
+ description = (row.get("description") or "").lower()
+
+ combined = f"{title} {category} {description}"
+
+ # Platform detection
+ platform_keywords = {
+ "steam": "steam",
+ "playstation": "playstation",
+ "psn": "playstation",
+ "xbox": "xbox",
+ "nintendo": "nintendo",
+ "switch": "nintendo",
+ "pc": "pc",
+ }
+
+ detected_platform = None
+ for keyword, platform in platform_keywords.items():
+ if keyword in combined:
+ detected_platform = platform
+ break
+
+ # Digital product keywords
+ digital_indicators = [
+ ("gift card", "email", None),
+ ("voucher", "email", None),
+ ("e-book", "download", None),
+ ("ebook", "download", None),
+ ("download", "download", None),
+ ("digital", "download", None),
+ ("license", "license_key", None),
+ ("game key", "license_key", detected_platform),
+ ("activation code", "license_key", detected_platform),
+ ("subscription", "in_app", None),
+ ]
+
+ for keyword, delivery_method, platform in digital_indicators:
+ if keyword in combined:
+ return True, delivery_method, platform or detected_platform
+
+ return False, None, None
+```
+
+### 6.4 Inventory Handling for Digital Products
+
+```python
+# app/services/inventory_service.py
+
+def create_inventory_for_product(
+ db: Session,
+ product: Product,
+ quantity: int = None
+) -> Inventory:
+ """
+ Create inventory record with digital product handling.
+ """
+ # Digital products get special handling
+ if product.is_digital:
+ return Inventory(
+ product_id=product.id,
+ vendor_id=product.vendor_id,
+ location="digital", # Special location for digital
+ quantity=999999, # Effectively unlimited
+ reserved_quantity=0,
+ is_digital=True,
+ # Track license keys if applicable
+ license_pool_id=product.license_pool_id,
+ )
+
+ # Physical products
+ return Inventory(
+ product_id=product.id,
+ vendor_id=product.vendor_id,
+ location="warehouse",
+ quantity=quantity or 0,
+ reserved_quantity=0,
+ is_digital=False,
+ )
+```
+
+---
+
+## Migration Plan
+
+### Phase 1: Database Schema Updates
+
+**Migration 1: Add product type and digital fields to marketplace_products**
+
+```python
+# alembic/versions/xxxx_add_product_type_and_digital_fields.py
+
+def upgrade():
+ # Add product type enum
+ product_type_enum = sa.Enum(
+ 'physical', 'digital', 'service', 'subscription',
+ name='product_type_enum'
+ )
+ product_type_enum.create(op.get_bind())
+
+ delivery_method_enum = sa.Enum(
+ 'download', 'email', 'in_app', 'streaming', 'license_key',
+ name='digital_delivery_enum'
+ )
+ delivery_method_enum.create(op.get_bind())
+
+ # Add columns to marketplace_products
+ op.add_column('marketplace_products',
+ sa.Column('product_type', product_type_enum,
+ server_default='physical', nullable=False)
+ )
+ op.add_column('marketplace_products',
+ sa.Column('is_digital', sa.Boolean(), server_default='false', nullable=False)
+ )
+ op.add_column('marketplace_products',
+ sa.Column('digital_delivery_method', delivery_method_enum, nullable=True)
+ )
+ op.add_column('marketplace_products',
+ sa.Column('platform', sa.String(), nullable=True)
+ )
+ op.add_column('marketplace_products',
+ sa.Column('region_restrictions', sa.JSON(), nullable=True)
+ )
+ op.add_column('marketplace_products',
+ sa.Column('license_type', sa.String(), nullable=True)
+ )
+ op.add_column('marketplace_products',
+ sa.Column('attributes', sa.JSON(), nullable=True)
+ )
+
+ # Add indexes
+ op.create_index('idx_mp_product_type', 'marketplace_products',
+ ['product_type', 'is_digital'])
+
+def downgrade():
+ op.drop_index('idx_mp_product_type')
+ op.drop_column('marketplace_products', 'attributes')
+ op.drop_column('marketplace_products', 'license_type')
+ op.drop_column('marketplace_products', 'region_restrictions')
+ op.drop_column('marketplace_products', 'platform')
+ op.drop_column('marketplace_products', 'digital_delivery_method')
+ op.drop_column('marketplace_products', 'is_digital')
+ op.drop_column('marketplace_products', 'product_type')
+
+ sa.Enum(name='digital_delivery_enum').drop(op.get_bind())
+ sa.Enum(name='product_type_enum').drop(op.get_bind())
+```
+
+**Migration 2: Create translation tables**
+
+```python
+# alembic/versions/xxxx_create_translation_tables.py
+
+def upgrade():
+ # Create marketplace_product_translations table
+ op.create_table(
+ 'marketplace_product_translations',
+ sa.Column('id', sa.Integer(), primary_key=True),
+ sa.Column('marketplace_product_id', sa.Integer(),
+ sa.ForeignKey('marketplace_products.id', ondelete='CASCADE'),
+ nullable=False),
+ sa.Column('language', sa.String(5), nullable=False),
+ sa.Column('title', sa.String(), nullable=False),
+ sa.Column('description', sa.Text(), nullable=True),
+ sa.Column('short_description', sa.String(500), nullable=True),
+ sa.Column('meta_title', sa.String(70), nullable=True),
+ sa.Column('meta_description', sa.String(160), nullable=True),
+ sa.Column('url_slug', sa.String(255), nullable=True),
+ sa.Column('source_import_id', sa.Integer(), nullable=True),
+ sa.Column('source_file', sa.String(), nullable=True),
+ sa.Column('created_at', sa.DateTime(), nullable=True),
+ sa.Column('updated_at', sa.DateTime(), nullable=True),
+ sa.UniqueConstraint('marketplace_product_id', 'language',
+ name='uq_marketplace_product_translation'),
+ )
+ op.create_index('idx_mpt_language', 'marketplace_product_translations', ['language'])
+
+ # Create product_translations table
+ op.create_table(
+ 'product_translations',
+ sa.Column('id', sa.Integer(), primary_key=True),
+ sa.Column('product_id', sa.Integer(),
+ sa.ForeignKey('products.id', ondelete='CASCADE'),
+ nullable=False),
+ sa.Column('language', sa.String(5), nullable=False),
+ sa.Column('title', sa.String(), nullable=True),
+ sa.Column('description', sa.Text(), nullable=True),
+ sa.Column('short_description', sa.String(500), nullable=True),
+ sa.Column('meta_title', sa.String(70), nullable=True),
+ sa.Column('meta_description', sa.String(160), nullable=True),
+ sa.Column('url_slug', sa.String(255), nullable=True),
+ sa.Column('created_at', sa.DateTime(), nullable=True),
+ sa.Column('updated_at', sa.DateTime(), nullable=True),
+ sa.UniqueConstraint('product_id', 'language', name='uq_product_translation'),
+ )
+ op.create_index('idx_pt_product_language', 'product_translations',
+ ['product_id', 'language'])
+
+def downgrade():
+ op.drop_table('product_translations')
+ op.drop_table('marketplace_product_translations')
+```
+
+**Migration 3: Add override fields to products table**
+
+```python
+# alembic/versions/xxxx_add_product_override_fields.py
+
+def upgrade():
+ # Rename product_id to vendor_sku for clarity
+ op.alter_column('products', 'product_id', new_column_name='vendor_sku')
+
+ # Add new overridable fields
+ op.add_column('products',
+ sa.Column('brand', sa.String(), nullable=True)
+ )
+ op.add_column('products',
+ sa.Column('primary_image_url', sa.String(), nullable=True)
+ )
+ op.add_column('products',
+ sa.Column('additional_images', sa.JSON(), nullable=True)
+ )
+ op.add_column('products',
+ sa.Column('download_url', sa.String(), nullable=True)
+ )
+ op.add_column('products',
+ sa.Column('license_type', sa.String(), nullable=True)
+ )
+ op.add_column('products',
+ sa.Column('fulfillment_email_template', sa.String(), nullable=True)
+ )
+
+ # Add index for vendor_sku
+ op.create_index('idx_product_vendor_sku', 'products', ['vendor_id', 'vendor_sku'])
+
+def downgrade():
+ op.drop_index('idx_product_vendor_sku')
+ op.drop_column('products', 'fulfillment_email_template')
+ op.drop_column('products', 'license_type')
+ op.drop_column('products', 'download_url')
+ op.drop_column('products', 'additional_images')
+ op.drop_column('products', 'primary_image_url')
+ op.drop_column('products', 'brand')
+ op.alter_column('products', 'vendor_sku', new_column_name='product_id')
+```
+
+**Migration 4: Data migration for existing products**
+
+```python
+# alembic/versions/xxxx_migrate_existing_product_data.py
+
+def upgrade():
+ """Migrate existing title/description to translation tables."""
+
+ # Get connection for raw SQL
+ conn = op.get_bind()
+
+ # Migrate marketplace_products title/description to translations
+ # Default language is 'en' for existing data
+ conn.execute(text("""
+ INSERT INTO marketplace_product_translations
+ (marketplace_product_id, language, title, description, created_at, updated_at)
+ SELECT
+ id,
+ 'en', -- Default language for existing data
+ title,
+ description,
+ created_at,
+ updated_at
+ FROM marketplace_products
+ WHERE title IS NOT NULL
+ ON CONFLICT (marketplace_product_id, language) DO NOTHING
+ """))
+
+def downgrade():
+ # Data migration is one-way, but we keep original columns
+ pass
+```
+
+### Phase 2: Code Updates
+
+1. **Update Models**: Modify `models/database/` files
+2. **Update Schemas**: Modify `models/schema/` files
+3. **Update Services**: Add reset logic to services
+4. **Update Importers**: Refactor CSV processor to use importer pattern
+5. **Update API Endpoints**: Add translation and reset endpoints
+
+### Phase 3: Testing
+
+1. **Unit Tests**: Test override properties and reset methods
+2. **Integration Tests**: Test import with multiple languages
+3. **Migration Tests**: Test data migration on copy of production data
+
+---
+
+## API Endpoints
+
+### New Endpoints Required
+
+```
+# Product Translations
+GET /api/v1/vendor/products/{id}/translations
+POST /api/v1/vendor/products/{id}/translations/{lang}
+PUT /api/v1/vendor/products/{id}/translations/{lang}
+DELETE /api/v1/vendor/products/{id}/translations/{lang}
+
+# Reset Operations
+POST /api/v1/vendor/products/{id}/reset
+POST /api/v1/vendor/products/{id}/translations/{lang}/reset
+
+# Marketplace Import with Language
+POST /api/v1/vendor/marketplace/import
+ Body: { source_url, marketplace, language }
+
+# Admin: Multi-language Import
+POST /api/v1/admin/marketplace/import
+ Body: { vendor_id, source_url, marketplace, language }
+```
+
+---
+
+## Implementation Checklist
+
+### Database Layer
+- [ ] Create `product_type_enum` and `digital_delivery_enum` types
+- [ ] Add digital product fields to `marketplace_products`
+- [ ] Create `marketplace_product_translations` table
+- [ ] Create `product_translations` table
+- [ ] Add override fields to `products` table
+- [ ] Run data migration for existing content
+
+### Model Layer
+- [ ] Update `MarketplaceProduct` model with new fields
+- [ ] Create `MarketplaceProductTranslation` model
+- [ ] Update `Product` model with effective properties
+- [ ] Create `ProductTranslation` model
+- [ ] Add `reset_*` methods to models
+
+### Schema Layer
+- [ ] Update `ProductUpdate` with reset fields
+- [ ] Create `ProductTranslationUpdate` schema
+- [ ] Update `ProductDetailResponse` with override flags
+- [ ] Create `ProductTranslationResponse` schema
+
+### Service Layer
+- [ ] Update `ProductService` with reset logic
+- [ ] Create `ProductTranslationService`
+- [ ] Update import service for multi-language
+
+### Import Layer
+- [ ] Create `BaseMarketplaceImporter` abstract class
+- [ ] Refactor `LetzshopImporter` from CSV processor
+- [ ] Create importer factory
+- [ ] Add digital product detection
+
+### API Layer
+- [ ] Add translation endpoints
+- [ ] Add reset endpoints
+- [ ] Update import endpoint for language parameter
+
+### Testing
+- [ ] Unit tests for override properties
+- [ ] Unit tests for reset methods
+- [ ] Integration tests for multi-language import
+- [ ] API tests for new endpoints
+
+---
+
+## Summary
+
+This architecture provides:
+
+1. **Universal Product Model**: Marketplace-agnostic with flexible attributes
+2. **Multi-Language Support**: Translations at both marketplace and vendor levels
+3. **Override Pattern**: Consistent with existing vendor contact pattern
+4. **Reset Capability**: Individual field or bulk reset to source
+5. **Digital Products**: Full support for games, gift cards, downloads
+6. **Extensibility**: Easy to add Amazon, eBay, or other marketplaces
+7. **Backward Compatibility**: Existing Letzshop imports continue to work
+
+The implementation preserves all existing principles while adding the flexibility needed for a multi-marketplace, multi-language e-commerce platform.
diff --git a/docs/development/migration/product-migration-database-changes.md b/docs/development/migration/product-migration-database-changes.md
new file mode 100644
index 00000000..09bbebc1
--- /dev/null
+++ b/docs/development/migration/product-migration-database-changes.md
@@ -0,0 +1,783 @@
+# Product Migration - Database Changes Plan
+
+## Overview
+
+This document details the database schema changes required for Phase 1 of the Multi-Marketplace Product Architecture. It serves as the implementation guide for the Alembic migrations.
+
+**Related Documents:**
+- [Multi-Marketplace Product Architecture](./multi-marketplace-product-architecture.md) - Full architecture design
+- [Marketplace Integration Architecture](../../architecture/marketplace-integration.md) - System-wide integration plan
+
+---
+
+## Current State Analysis
+
+### Existing Tables
+
+#### `marketplace_products` (Current)
+
+| Column | Type | Constraints | Notes |
+|--------|------|-------------|-------|
+| id | Integer | PK, Index | |
+| marketplace_product_id | String | Unique, Index, NOT NULL | Feed product ID |
+| title | String | NOT NULL | Single language only |
+| description | String | | Single language only |
+| link | String | | |
+| image_link | String | | |
+| availability | String | Index | |
+| price | String | | Raw price string "19.99 EUR" |
+| sale_price | String | | Raw price string |
+| brand | String | Index | |
+| gtin | String | Index | |
+| mpn | String | | |
+| condition | String | | |
+| adult | String | | |
+| multipack | Integer | | |
+| is_bundle | String | | |
+| age_group | String | | |
+| color | String | | |
+| gender | String | | |
+| material | String | | |
+| pattern | String | | |
+| size | String | | |
+| size_type | String | | |
+| size_system | String | | |
+| item_group_id | String | | |
+| google_product_category | String | Index | |
+| product_type | String | | Raw feed value |
+| custom_label_0-4 | String | | |
+| additional_image_link | String | | Single string, not array |
+| unit_pricing_measure | String | | |
+| unit_pricing_base_measure | String | | |
+| identifier_exists | String | | |
+| shipping | String | | |
+| currency | String | | |
+| marketplace | String | Index, Default='Letzshop' | |
+| vendor_name | String | Index | |
+| created_at | DateTime | | TimestampMixin |
+| updated_at | DateTime | | TimestampMixin |
+
+**Indexes:**
+- `idx_marketplace_vendor` (marketplace, vendor_name)
+- `idx_marketplace_brand` (marketplace, brand)
+
+#### `products` (Current)
+
+| Column | Type | Constraints | Notes |
+|--------|------|-------------|-------|
+| id | Integer | PK, Index | |
+| vendor_id | Integer | FK → vendors.id, NOT NULL | |
+| marketplace_product_id | Integer | FK → marketplace_products.id, NOT NULL | |
+| product_id | String | | Vendor's internal SKU |
+| price | Float | | Override |
+| sale_price | Float | | Override |
+| currency | String | | Override |
+| availability | String | | Override |
+| condition | String | | Override |
+| is_featured | Boolean | Default=False | |
+| is_active | Boolean | Default=True | |
+| display_order | Integer | Default=0 | |
+| min_quantity | Integer | Default=1 | |
+| max_quantity | Integer | | |
+| created_at | DateTime | | TimestampMixin |
+| updated_at | DateTime | | TimestampMixin |
+
+**Constraints:**
+- `uq_product` UNIQUE (vendor_id, marketplace_product_id)
+
+**Indexes:**
+- `idx_product_active` (vendor_id, is_active)
+- `idx_product_featured` (vendor_id, is_featured)
+
+### Issues with Current Schema
+
+| Issue | Impact | Solution |
+|-------|--------|----------|
+| No translation support | Cannot support multi-language feeds | Add translation tables |
+| No product type distinction | Cannot differentiate physical/digital | Add product_type enum |
+| No digital product fields | Cannot support game keys, downloads | Add digital-specific columns |
+| Price as String | Harder to filter/sort by price | Add parsed numeric price |
+| Single additional_image_link | Can't store multiple images properly | Add JSON array column |
+| No override pattern properties | No `effective_*` helpers | Add to model layer |
+| One-to-one relationship | Same product can't exist for multiple vendors | Fix to one-to-many |
+
+---
+
+## Target Schema
+
+### Visual Diagram
+
+```
+┌─────────────────────────────────┐
+│ marketplace_products │
+├─────────────────────────────────┤
+│ id (PK) │
+│ marketplace_product_id (UNIQUE) │
+│ marketplace │
+│ vendor_name │
+│ │
+│ # Product Type (NEW) │
+│ product_type (ENUM) │
+│ is_digital │
+│ digital_delivery_method (ENUM) │
+│ platform │
+│ region_restrictions (JSON) │
+│ license_type │
+│ │
+│ # Pricing (ENHANCED) │
+│ price (String) [raw] │
+│ price_numeric (Float) [NEW] │
+│ sale_price (String) [raw] │
+│ sale_price_numeric (Float) [NEW]│
+│ currency │
+│ │
+│ # Media (ENHANCED) │
+│ image_link │
+│ additional_image_link [legacy] │
+│ additional_images (JSON) [NEW] │
+│ │
+│ # Attributes (NEW) │
+│ attributes (JSON) │
+│ │
+│ # Status (NEW) │
+│ is_active │
+│ │
+│ # Renamed │
+│ product_type_raw [was product_type] │
+│ │
+│ # Preserved Google Shopping │
+│ brand, gtin, mpn, condition... │
+│ google_product_category... │
+│ custom_label_0-4... │
+└─────────────────────────────────┘
+ │
+ │ 1:N
+ ▼
+┌─────────────────────────────────┐
+│ marketplace_product_translations │
+├─────────────────────────────────┤
+│ id (PK) │
+│ marketplace_product_id (FK) │
+│ language ('en','fr','de','lb') │
+│ │
+│ # Localized Content │
+│ title (NOT NULL) │
+│ description │
+│ short_description │
+│ │
+│ # SEO │
+│ meta_title │
+│ meta_description │
+│ url_slug │
+│ │
+│ # Source Tracking │
+│ source_import_id │
+│ source_file │
+│ │
+│ created_at, updated_at │
+├─────────────────────────────────┤
+│ UNIQUE(marketplace_product_id, │
+│ language) │
+└─────────────────────────────────┘
+
+┌─────────────────────────────────┐
+│ products │
+├─────────────────────────────────┤
+│ id (PK) │
+│ vendor_id (FK) │
+│ marketplace_product_id (FK) │
+│ │
+│ # Renamed │
+│ vendor_sku [was product_id] │
+│ │
+│ # Existing Overrides │
+│ price │
+│ sale_price │
+│ currency │
+│ availability │
+│ condition │
+│ │
+│ # New Overrides │
+│ brand (NEW) │
+│ primary_image_url (NEW) │
+│ additional_images (JSON) (NEW) │
+│ download_url (NEW) │
+│ license_type (NEW) │
+│ fulfillment_email_template (NEW)│
+│ │
+│ # Vendor-Specific │
+│ is_featured │
+│ is_active │
+│ display_order │
+│ min_quantity │
+│ max_quantity │
+│ │
+│ created_at, updated_at │
+├─────────────────────────────────┤
+│ UNIQUE(vendor_id, │
+│ marketplace_product_id) │
+└─────────────────────────────────┘
+ │
+ │ 1:N
+ ▼
+┌─────────────────────────────────┐
+│ product_translations │
+├─────────────────────────────────┤
+│ id (PK) │
+│ product_id (FK) │
+│ language │
+│ │
+│ # Overridable (NULL = inherit) │
+│ title │
+│ description │
+│ short_description │
+│ meta_title │
+│ meta_description │
+│ url_slug │
+│ │
+│ created_at, updated_at │
+├─────────────────────────────────┤
+│ UNIQUE(product_id, language) │
+└─────────────────────────────────┘
+```
+
+---
+
+## Migration Plan
+
+### Migration 1: Add Product Type and Digital Fields
+
+**File:** `alembic/versions/xxxx_add_product_type_digital_fields.py`
+
+**Changes:**
+
+```sql
+-- Create ENUMs
+CREATE TYPE product_type_enum AS ENUM ('physical', 'digital', 'service', 'subscription');
+CREATE TYPE digital_delivery_enum AS ENUM ('download', 'email', 'in_app', 'streaming', 'license_key');
+
+-- Add columns to marketplace_products
+ALTER TABLE marketplace_products ADD COLUMN product_type product_type_enum NOT NULL DEFAULT 'physical';
+ALTER TABLE marketplace_products ADD COLUMN is_digital BOOLEAN NOT NULL DEFAULT false;
+ALTER TABLE marketplace_products ADD COLUMN digital_delivery_method digital_delivery_enum;
+ALTER TABLE marketplace_products ADD COLUMN platform VARCHAR;
+ALTER TABLE marketplace_products ADD COLUMN region_restrictions JSON;
+ALTER TABLE marketplace_products ADD COLUMN license_type VARCHAR;
+ALTER TABLE marketplace_products ADD COLUMN source_url VARCHAR;
+ALTER TABLE marketplace_products ADD COLUMN attributes JSON;
+ALTER TABLE marketplace_products ADD COLUMN additional_images JSON;
+ALTER TABLE marketplace_products ADD COLUMN is_active BOOLEAN NOT NULL DEFAULT true;
+ALTER TABLE marketplace_products ADD COLUMN price_numeric FLOAT;
+ALTER TABLE marketplace_products ADD COLUMN sale_price_numeric FLOAT;
+
+-- Rename product_type to product_type_raw (keep original feed value)
+ALTER TABLE marketplace_products RENAME COLUMN product_type TO product_type_raw;
+
+-- Add index
+CREATE INDEX idx_mp_product_type ON marketplace_products (product_type, is_digital);
+```
+
+**Rollback:**
+
+```sql
+DROP INDEX idx_mp_product_type;
+ALTER TABLE marketplace_products RENAME COLUMN product_type_raw TO product_type;
+ALTER TABLE marketplace_products DROP COLUMN sale_price_numeric;
+ALTER TABLE marketplace_products DROP COLUMN price_numeric;
+ALTER TABLE marketplace_products DROP COLUMN is_active;
+ALTER TABLE marketplace_products DROP COLUMN additional_images;
+ALTER TABLE marketplace_products DROP COLUMN attributes;
+ALTER TABLE marketplace_products DROP COLUMN source_url;
+ALTER TABLE marketplace_products DROP COLUMN license_type;
+ALTER TABLE marketplace_products DROP COLUMN region_restrictions;
+ALTER TABLE marketplace_products DROP COLUMN platform;
+ALTER TABLE marketplace_products DROP COLUMN digital_delivery_method;
+ALTER TABLE marketplace_products DROP COLUMN is_digital;
+ALTER TABLE marketplace_products DROP COLUMN product_type;
+DROP TYPE digital_delivery_enum;
+DROP TYPE product_type_enum;
+```
+
+---
+
+### Migration 2: Create Translation Tables
+
+**File:** `alembic/versions/xxxx_create_translation_tables.py`
+
+**Changes:**
+
+```sql
+-- Create marketplace_product_translations
+CREATE TABLE marketplace_product_translations (
+ id SERIAL PRIMARY KEY,
+ marketplace_product_id INTEGER NOT NULL REFERENCES marketplace_products(id) ON DELETE CASCADE,
+ language VARCHAR(5) NOT NULL,
+ title VARCHAR NOT NULL,
+ description TEXT,
+ short_description VARCHAR(500),
+ meta_title VARCHAR(70),
+ meta_description VARCHAR(160),
+ url_slug VARCHAR(255),
+ source_import_id INTEGER,
+ source_file VARCHAR,
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ CONSTRAINT uq_marketplace_product_translation UNIQUE (marketplace_product_id, language)
+);
+
+CREATE INDEX idx_mpt_language ON marketplace_product_translations (language);
+CREATE INDEX idx_mpt_mp_id ON marketplace_product_translations (marketplace_product_id);
+
+-- Create product_translations
+CREATE TABLE product_translations (
+ id SERIAL PRIMARY KEY,
+ product_id INTEGER NOT NULL REFERENCES products(id) ON DELETE CASCADE,
+ language VARCHAR(5) NOT NULL,
+ title VARCHAR,
+ description TEXT,
+ short_description VARCHAR(500),
+ meta_title VARCHAR(70),
+ meta_description VARCHAR(160),
+ url_slug VARCHAR(255),
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ CONSTRAINT uq_product_translation UNIQUE (product_id, language)
+);
+
+CREATE INDEX idx_pt_product_language ON product_translations (product_id, language);
+```
+
+**Rollback:**
+
+```sql
+DROP TABLE product_translations;
+DROP TABLE marketplace_product_translations;
+```
+
+---
+
+### Migration 3: Add Override Fields to Products
+
+**File:** `alembic/versions/xxxx_add_product_override_fields.py`
+
+**Changes:**
+
+```sql
+-- Rename product_id to vendor_sku
+ALTER TABLE products RENAME COLUMN product_id TO vendor_sku;
+
+-- Add new override columns
+ALTER TABLE products ADD COLUMN brand VARCHAR;
+ALTER TABLE products ADD COLUMN primary_image_url VARCHAR;
+ALTER TABLE products ADD COLUMN additional_images JSON;
+ALTER TABLE products ADD COLUMN download_url VARCHAR;
+ALTER TABLE products ADD COLUMN license_type VARCHAR;
+ALTER TABLE products ADD COLUMN fulfillment_email_template VARCHAR;
+
+-- Add index for vendor_sku
+CREATE INDEX idx_product_vendor_sku ON products (vendor_id, vendor_sku);
+```
+
+**Rollback:**
+
+```sql
+DROP INDEX idx_product_vendor_sku;
+ALTER TABLE products DROP COLUMN fulfillment_email_template;
+ALTER TABLE products DROP COLUMN license_type;
+ALTER TABLE products DROP COLUMN download_url;
+ALTER TABLE products DROP COLUMN additional_images;
+ALTER TABLE products DROP COLUMN primary_image_url;
+ALTER TABLE products DROP COLUMN brand;
+ALTER TABLE products RENAME COLUMN vendor_sku TO product_id;
+```
+
+---
+
+### Migration 4: Data Migration
+
+**File:** `alembic/versions/xxxx_migrate_product_data.py`
+
+**Changes:**
+
+```sql
+-- Migrate existing title/description to translations table (default language: 'en')
+INSERT INTO marketplace_product_translations (
+ marketplace_product_id,
+ language,
+ title,
+ description,
+ created_at,
+ updated_at
+)
+SELECT
+ id,
+ 'en',
+ title,
+ description,
+ created_at,
+ updated_at
+FROM marketplace_products
+WHERE title IS NOT NULL
+ON CONFLICT (marketplace_product_id, language) DO NOTHING;
+
+-- Parse prices to numeric (handled in Python for complex parsing)
+-- This will be done via a Python data migration function
+```
+
+**Python Migration Function:**
+
+```python
+def parse_and_update_prices(connection):
+ """Parse price strings to numeric values."""
+ import re
+
+ # Get all marketplace products
+ result = connection.execute(
+ text("SELECT id, price, sale_price FROM marketplace_products")
+ )
+
+ for row in result:
+ price_numeric = parse_price(row.price)
+ sale_price_numeric = parse_price(row.sale_price)
+
+ connection.execute(
+ text("""
+ UPDATE marketplace_products
+ SET price_numeric = :price, sale_price_numeric = :sale_price
+ WHERE id = :id
+ """),
+ {"id": row.id, "price": price_numeric, "sale_price": sale_price_numeric}
+ )
+
+def parse_price(price_str: str) -> float | None:
+ """Parse price string like '19.99 EUR' to float."""
+ if not price_str:
+ return None
+
+ # Extract numeric value
+ numbers = re.findall(r'[\d.,]+', str(price_str))
+ if numbers:
+ num_str = numbers[0].replace(',', '.')
+ try:
+ return float(num_str)
+ except ValueError:
+ pass
+ return None
+```
+
+**Rollback:**
+
+```sql
+-- Data migration is one-way, original columns preserved
+-- No rollback needed for data
+```
+
+---
+
+## Migration Execution Order
+
+| Order | Migration | Risk Level | Notes |
+|-------|-----------|------------|-------|
+| 1 | Add product type & digital fields | Low | All columns nullable or have defaults |
+| 2 | Create translation tables | Low | New tables, no existing data affected |
+| 3 | Add product override fields | Low | All columns nullable |
+| 4 | Data migration | Medium | Copies data, original preserved |
+
+---
+
+## Model Layer Updates Required
+
+### MarketplaceProduct Model
+
+```python
+# Add to models/database/marketplace_product.py
+
+class ProductType(str, Enum):
+ PHYSICAL = "physical"
+ DIGITAL = "digital"
+ SERVICE = "service"
+ SUBSCRIPTION = "subscription"
+
+class DigitalDeliveryMethod(str, Enum):
+ DOWNLOAD = "download"
+ EMAIL = "email"
+ IN_APP = "in_app"
+ STREAMING = "streaming"
+ LICENSE_KEY = "license_key"
+
+class MarketplaceProduct(Base, TimestampMixin):
+ # ... existing fields ...
+
+ # NEW FIELDS
+ product_type = Column(SQLEnum(ProductType), default=ProductType.PHYSICAL, nullable=False)
+ is_digital = Column(Boolean, default=False, nullable=False)
+ digital_delivery_method = Column(SQLEnum(DigitalDeliveryMethod), nullable=True)
+ platform = Column(String, nullable=True)
+ region_restrictions = Column(JSON, nullable=True)
+ license_type = Column(String, nullable=True)
+ source_url = Column(String, nullable=True)
+ attributes = Column(JSON, nullable=True)
+ additional_images = Column(JSON, nullable=True)
+ is_active = Column(Boolean, default=True, nullable=False)
+ price_numeric = Column(Float, nullable=True)
+ sale_price_numeric = Column(Float, nullable=True)
+ product_type_raw = Column(String) # Renamed from product_type
+
+ # NEW RELATIONSHIP
+ translations = relationship(
+ "MarketplaceProductTranslation",
+ back_populates="marketplace_product",
+ cascade="all, delete-orphan"
+ )
+
+ # Change to one-to-many
+ vendor_products = relationship("Product", back_populates="marketplace_product")
+```
+
+### MarketplaceProductTranslation Model (NEW)
+
+```python
+# models/database/marketplace_product_translation.py
+
+class MarketplaceProductTranslation(Base, TimestampMixin):
+ __tablename__ = "marketplace_product_translations"
+
+ id = Column(Integer, primary_key=True)
+ marketplace_product_id = Column(
+ Integer,
+ ForeignKey("marketplace_products.id", ondelete="CASCADE"),
+ nullable=False
+ )
+ language = Column(String(5), nullable=False)
+ title = Column(String, nullable=False)
+ description = Column(Text, nullable=True)
+ short_description = Column(String(500), nullable=True)
+ meta_title = Column(String(70), nullable=True)
+ meta_description = Column(String(160), nullable=True)
+ url_slug = Column(String(255), nullable=True)
+ source_import_id = Column(Integer, nullable=True)
+ source_file = Column(String, nullable=True)
+
+ marketplace_product = relationship(
+ "MarketplaceProduct",
+ back_populates="translations"
+ )
+
+ __table_args__ = (
+ UniqueConstraint("marketplace_product_id", "language", name="uq_marketplace_product_translation"),
+ Index("idx_mpt_language", "language"),
+ )
+```
+
+### Product Model Updates
+
+```python
+# Update models/database/product.py
+
+class Product(Base, TimestampMixin):
+ # ... existing fields ...
+
+ # RENAMED
+ vendor_sku = Column(String) # Was: product_id
+
+ # NEW OVERRIDE FIELDS
+ brand = Column(String, nullable=True)
+ primary_image_url = Column(String, nullable=True)
+ additional_images = Column(JSON, nullable=True)
+ download_url = Column(String, nullable=True)
+ license_type = Column(String, nullable=True)
+ fulfillment_email_template = Column(String, nullable=True)
+
+ # NEW RELATIONSHIP
+ translations = relationship(
+ "ProductTranslation",
+ back_populates="product",
+ cascade="all, delete-orphan"
+ )
+
+ # OVERRIDABLE FIELDS LIST
+ OVERRIDABLE_FIELDS = [
+ "price", "sale_price", "currency", "brand", "condition",
+ "availability", "primary_image_url", "additional_images",
+ "download_url", "license_type"
+ ]
+
+ # EFFECTIVE PROPERTIES
+ @property
+ def effective_price(self) -> float | None:
+ if self.price is not None:
+ return self.price
+ return self.marketplace_product.price_numeric if self.marketplace_product else None
+
+ @property
+ def effective_brand(self) -> str | None:
+ if self.brand is not None:
+ return self.brand
+ return self.marketplace_product.brand if self.marketplace_product else None
+
+ # ... other effective_* properties ...
+
+ def get_override_info(self) -> dict:
+ """Get all fields with inheritance flags."""
+ mp = self.marketplace_product
+ return {
+ "price": self.effective_price,
+ "price_overridden": self.price is not None,
+ "price_source": mp.price_numeric if mp else None,
+ # ... other fields ...
+ }
+
+ def reset_field_to_source(self, field_name: str) -> bool:
+ """Reset a field to inherit from marketplace product."""
+ if field_name in self.OVERRIDABLE_FIELDS:
+ setattr(self, field_name, None)
+ return True
+ return False
+```
+
+### ProductTranslation Model (NEW)
+
+```python
+# models/database/product_translation.py
+
+class ProductTranslation(Base, TimestampMixin):
+ __tablename__ = "product_translations"
+
+ id = Column(Integer, primary_key=True)
+ product_id = Column(
+ Integer,
+ ForeignKey("products.id", ondelete="CASCADE"),
+ nullable=False
+ )
+ language = Column(String(5), nullable=False)
+ title = Column(String, nullable=True) # NULL = inherit
+ description = Column(Text, nullable=True)
+ short_description = Column(String(500), nullable=True)
+ meta_title = Column(String(70), nullable=True)
+ meta_description = Column(String(160), nullable=True)
+ url_slug = Column(String(255), nullable=True)
+
+ product = relationship("Product", back_populates="translations")
+
+ OVERRIDABLE_FIELDS = [
+ "title", "description", "short_description",
+ "meta_title", "meta_description", "url_slug"
+ ]
+
+ def get_effective_title(self) -> str | None:
+ if self.title is not None:
+ return self.title
+ return self._get_marketplace_translation_field("title")
+
+ def _get_marketplace_translation_field(self, field: str) -> str | None:
+ mp = self.product.marketplace_product
+ if mp:
+ for t in mp.translations:
+ if t.language == self.language:
+ return getattr(t, field, None)
+ return None
+
+ __table_args__ = (
+ UniqueConstraint("product_id", "language", name="uq_product_translation"),
+ Index("idx_pt_product_language", "product_id", "language"),
+ )
+```
+
+---
+
+## Code Updates Required
+
+### Files to Update
+
+| File | Changes |
+|------|---------|
+| `models/database/__init__.py` | Export new models |
+| `models/database/marketplace_product.py` | Add new fields, enums, relationship |
+| `models/database/product.py` | Rename column, add override fields/properties |
+| `models/schema/product.py` | Update Pydantic schemas |
+| `app/services/product_service.py` | Add reset logic, translation support |
+| `app/utils/csv_processor.py` | Update to use translations |
+| `tests/` | Update all product-related tests |
+
+### Import Service Updates
+
+The CSV processor needs to:
+1. Accept language parameter
+2. Create/update translations instead of direct title/description
+3. Parse prices to numeric
+4. Detect digital products
+
+---
+
+## Open Questions (Requires Decision)
+
+### 1. Keep Original Title/Description Columns?
+
+**Option A: Keep as cache/fallback**
+- Pros: Simpler migration, backward compatible
+- Cons: Data duplication
+
+**Option B: Remove after migration**
+- Pros: Cleaner schema
+- Cons: Breaking change, requires code updates
+
+**Recommendation:** Option A for Phase 1, deprecate later
+
+---
+
+### 2. Default Language for Existing Data?
+
+Current data appears to be English. Confirm before migration.
+
+**Default:** `'en'`
+
+---
+
+### 3. Price Parsing Strategy?
+
+**Option A: Parse during migration**
+- Run Python migration to parse all existing prices
+
+**Option B: Parse on next import only**
+- Leave existing data, parse going forward
+
+**Recommendation:** Option A - parse during migration for consistency
+
+---
+
+## Testing Checklist
+
+Before running migrations in production:
+
+- [ ] Run migrations on local database
+- [ ] Run migrations on staging/test database with production data copy
+- [ ] Verify data integrity after migration
+- [ ] Run full test suite
+- [ ] Test import with multi-language CSV
+- [ ] Test override pattern (set override, reset to source)
+- [ ] Test translation inheritance
+- [ ] Performance test with large dataset
+
+---
+
+## Rollback Plan
+
+If issues occur:
+
+1. Migrations are designed to be reversible
+2. Each migration has explicit downgrade function
+3. Original data is preserved (title/description columns kept)
+4. Run `alembic downgrade -1` for each migration in reverse order
+
+---
+
+## Next Steps
+
+1. [ ] Confirm open questions above
+2. [ ] Create Alembic migration files
+3. [ ] Update SQLAlchemy models
+4. [ ] Update Pydantic schemas
+5. [ ] Update services
+6. [ ] Update tests
+7. [ ] Run on staging environment
+8. [ ] Deploy to production
diff --git a/mkdocs.yml b/mkdocs.yml
index 602f46e0..ee2d496e 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -29,6 +29,7 @@ nav:
# ============================================
- Architecture:
- Overview: architecture/overview.md
+ - Marketplace Integration: architecture/marketplace-integration.md
- Architecture Patterns: architecture/architecture-patterns.md
- Company-Vendor Management: architecture/company-vendor-management.md
- Multi-Tenant System: architecture/multi-tenant.md
@@ -127,6 +128,8 @@ nav:
- Makefile Refactoring: development/migration/makefile-refactoring-complete.md
- SVC-006 Migration Plan: development/migration/svc-006-migration-plan.md
- Vendor Contact Inheritance: development/migration/vendor-contact-inheritance.md
+ - Multi-Marketplace Product Architecture: development/migration/multi-marketplace-product-architecture.md
+ - Product Migration Database Changes: development/migration/product-migration-database-changes.md
- Seed Scripts Audit: development/seed-scripts-audit.md
- Database Seeder:
- Documentation: development/database-seeder/database-seeder-documentation.md