27 KiB
27 KiB
Slice 2: Marketplace Product Import
Vendor Imports Products from Letzshop
Status: 📋 NOT STARTED
Timeline: Week 2 (5 days)
Prerequisites: Slice 1 complete
🎯 Slice Objectives
Enable vendors to import product catalogs from Letzshop marketplace via CSV files.
User Stories
- As a Vendor Owner, I can configure my Letzshop CSV URL
- As a Vendor Owner, I can trigger product imports from Letzshop
- As a Vendor Owner, I can view import job status and history
- The system processes CSV data in the background
- As a Vendor Owner, I can see real-time import progress
Success Criteria
- Vendor can configure Letzshop CSV URL (FR, EN, DE)
- Vendor can trigger import jobs manually
- System downloads and processes CSV files
- Import status updates in real-time (Alpine.js)
- Import history is properly tracked
- Error handling for failed imports
- Products stored in staging area (MarketplaceProduct table)
- Large CSV files process without timeout
📋 Backend Implementation
Database Models
MarketplaceProduct Model (models/database/marketplace_product.py)
class MarketplaceProduct(Base, TimestampMixin):
"""
Staging table for imported marketplace products
Products stay here until vendor publishes them to catalog
"""
__tablename__ = "marketplace_products"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
import_job_id = Column(Integer, ForeignKey("marketplace_import_jobs.id"))
# External identifiers
external_sku = Column(String, nullable=False, index=True)
marketplace = Column(String, default="letzshop") # Future: other marketplaces
# Product information (from CSV)
title = Column(String, nullable=False)
description = Column(Text)
price = Column(Numeric(10, 2))
currency = Column(String(3), default="EUR")
# Categories and attributes
category = Column(String)
brand = Column(String)
attributes = Column(JSON, default=dict) # Store all CSV columns
# Images
image_urls = Column(JSON, default=list) # List of image URLs
# Inventory
stock_quantity = Column(Integer)
is_in_stock = Column(Boolean, default=True)
# Status
is_selected = Column(Boolean, default=False) # Ready to publish?
is_published = Column(Boolean, default=False) # Already in catalog?
published_product_id = Column(Integer, ForeignKey("products.id"), nullable=True)
# Metadata
language = Column(String(2)) # 'fr', 'en', 'de'
raw_data = Column(JSON) # Store complete CSV row
# Relationships
vendor = relationship("Vendor", back_populates="marketplace_products")
import_job = relationship("MarketplaceImportJob", back_populates="products")
published_product = relationship("Product", back_populates="marketplace_source")
# Indexes
__table_args__ = (
Index('ix_marketplace_vendor_sku', 'vendor_id', 'external_sku'),
Index('ix_marketplace_selected', 'vendor_id', 'is_selected'),
)
MarketplaceImportJob Model (models/database/marketplace.py)
class MarketplaceImportJob(Base, TimestampMixin):
"""Track CSV import jobs"""
__tablename__ = "marketplace_import_jobs"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
# Job details
marketplace = Column(String, default="letzshop")
csv_url = Column(String, nullable=False)
language = Column(String(2)) # 'fr', 'en', 'de'
# Status tracking
status = Column(
String,
default="pending"
) # pending, processing, completed, failed
# Progress
total_rows = Column(Integer, default=0)
processed_rows = Column(Integer, default=0)
imported_count = Column(Integer, default=0)
updated_count = Column(Integer, default=0)
error_count = Column(Integer, default=0)
# Timing
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
# Error handling
error_message = Column(Text, nullable=True)
error_details = Column(JSON, nullable=True)
# Relationships
vendor = relationship("Vendor", back_populates="import_jobs")
products = relationship("MarketplaceProduct", back_populates="import_job")
Pydantic Schemas
Import Schemas (models/schema/marketplace.py)
from pydantic import BaseModel, HttpUrl
from typing import Optional, List
from datetime import datetime
class MarketplaceImportCreate(BaseModel):
"""Create new import job"""
csv_url: HttpUrl
language: str = Field(..., regex="^(fr|en|de)$")
marketplace: str = "letzshop"
class MarketplaceImportJobResponse(BaseModel):
"""Import job details"""
id: int
vendor_id: int
marketplace: str
csv_url: str
language: str
status: str
total_rows: int
processed_rows: int
imported_count: int
updated_count: int
error_count: int
started_at: Optional[datetime]
completed_at: Optional[datetime]
error_message: Optional[str]
created_at: datetime
class Config:
from_attributes = True
class MarketplaceProductResponse(BaseModel):
"""Marketplace product in staging"""
id: int
vendor_id: int
external_sku: str
title: str
description: Optional[str]
price: float
currency: str
category: Optional[str]
brand: Optional[str]
stock_quantity: Optional[int]
is_in_stock: bool
is_selected: bool
is_published: bool
image_urls: List[str]
language: str
created_at: datetime
class Config:
from_attributes = True
Service Layer
Marketplace Service (app/services/marketplace_service.py)
from typing import List, Dict, Any
import csv
import requests
from io import StringIO
from sqlalchemy.orm import Session
from models.database.marketplace_product import MarketplaceProduct
from models.database.marketplace import MarketplaceImportJob
class MarketplaceService:
"""Handle marketplace product imports"""
async def create_import_job(
self,
vendor_id: int,
csv_url: str,
language: str,
db: Session
) -> MarketplaceImportJob:
"""Create new import job and start processing"""
# Create job record
job = MarketplaceImportJob(
vendor_id=vendor_id,
csv_url=csv_url,
language=language,
marketplace="letzshop",
status="pending"
)
db.add(job)
db.commit()
db.refresh(job)
# Trigger background processing
from tasks.marketplace_import import process_csv_import
process_csv_import.delay(job.id)
return job
def process_csv_import(self, job_id: int, db: Session):
"""
Process CSV import (called by Celery task)
This is a long-running operation
"""
job = db.query(MarketplaceImportJob).get(job_id)
if not job:
return
try:
# Update status
job.status = "processing"
job.started_at = datetime.utcnow()
db.commit()
# Download CSV
response = requests.get(job.csv_url, timeout=30)
response.raise_for_status()
# Parse CSV
csv_content = StringIO(response.text)
reader = csv.DictReader(csv_content)
# Count total rows
rows = list(reader)
job.total_rows = len(rows)
db.commit()
# Process each row
for idx, row in enumerate(rows):
try:
self._process_csv_row(job, row, db)
job.processed_rows = idx + 1
# Commit every 100 rows
if idx % 100 == 0:
db.commit()
except Exception as e:
job.error_count += 1
# Log error but continue
# Final commit
job.status = "completed"
job.completed_at = datetime.utcnow()
db.commit()
except Exception as e:
job.status = "failed"
job.error_message = str(e)
job.completed_at = datetime.utcnow()
db.commit()
def _process_csv_row(
self,
job: MarketplaceImportJob,
row: Dict[str, Any],
db: Session
):
"""Process single CSV row"""
# Extract fields from CSV
external_sku = row.get('SKU') or row.get('sku')
if not external_sku:
raise ValueError("Missing SKU in CSV row")
# Check if product already exists
existing = db.query(MarketplaceProduct).filter(
MarketplaceProduct.vendor_id == job.vendor_id,
MarketplaceProduct.external_sku == external_sku
).first()
# Parse image URLs
image_urls = []
for i in range(1, 6): # Support up to 5 images
img_url = row.get(f'Image{i}') or row.get(f'image_{i}')
if img_url:
image_urls.append(img_url)
if existing:
# Update existing product
existing.title = row.get('Title') or row.get('title')
existing.description = row.get('Description')
existing.price = float(row.get('Price', 0))
existing.stock_quantity = int(row.get('Stock', 0))
existing.is_in_stock = existing.stock_quantity > 0
existing.category = row.get('Category')
existing.brand = row.get('Brand')
existing.image_urls = image_urls
existing.raw_data = row
existing.import_job_id = job.id
job.updated_count += 1
else:
# Create new product
product = MarketplaceProduct(
vendor_id=job.vendor_id,
import_job_id=job.id,
external_sku=external_sku,
marketplace="letzshop",
title=row.get('Title') or row.get('title'),
description=row.get('Description'),
price=float(row.get('Price', 0)),
currency="EUR",
category=row.get('Category'),
brand=row.get('Brand'),
stock_quantity=int(row.get('Stock', 0)),
is_in_stock=int(row.get('Stock', 0)) > 0,
image_urls=image_urls,
language=job.language,
raw_data=row,
is_selected=False,
is_published=False
)
db.add(product)
job.imported_count += 1
def get_import_jobs(
self,
vendor_id: int,
db: Session,
skip: int = 0,
limit: int = 20
) -> List[MarketplaceImportJob]:
"""Get import job history for vendor"""
return db.query(MarketplaceImportJob).filter(
MarketplaceImportJob.vendor_id == vendor_id
).order_by(
MarketplaceImportJob.created_at.desc()
).offset(skip).limit(limit).all()
def get_marketplace_products(
self,
vendor_id: int,
db: Session,
import_job_id: Optional[int] = None,
is_selected: Optional[bool] = None,
skip: int = 0,
limit: int = 100
) -> List[MarketplaceProduct]:
"""Get marketplace products in staging"""
query = db.query(MarketplaceProduct).filter(
MarketplaceProduct.vendor_id == vendor_id,
MarketplaceProduct.is_published == False # Only unpublished
)
if import_job_id:
query = query.filter(MarketplaceProduct.import_job_id == import_job_id)
if is_selected is not None:
query = query.filter(MarketplaceProduct.is_selected == is_selected)
return query.order_by(
MarketplaceProduct.created_at.desc()
).offset(skip).limit(limit).all()
API Endpoints
Marketplace Endpoints (app/api/v1/vendor/marketplace.py)
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List, Optional
router = APIRouter()
@router.post("/import", response_model=MarketplaceImportJobResponse)
async def trigger_import(
import_data: MarketplaceImportCreate,
current_user: User = Depends(get_current_vendor_user),
vendor: Vendor = Depends(get_current_vendor),
db: Session = Depends(get_db)
):
"""Trigger CSV import from marketplace"""
service = MarketplaceService()
job = await service.create_import_job(
vendor_id=vendor.id,
csv_url=str(import_data.csv_url),
language=import_data.language,
db=db
)
return job
@router.get("/jobs", response_model=List[MarketplaceImportJobResponse])
async def get_import_jobs(
skip: int = 0,
limit: int = 20,
current_user: User = Depends(get_current_vendor_user),
vendor: Vendor = Depends(get_current_vendor),
db: Session = Depends(get_db)
):
"""Get import job history"""
service = MarketplaceService()
jobs = service.get_import_jobs(vendor.id, db, skip, limit)
return jobs
@router.get("/jobs/{job_id}", response_model=MarketplaceImportJobResponse)
async def get_import_job(
job_id: int,
current_user: User = Depends(get_current_vendor_user),
vendor: Vendor = Depends(get_current_vendor),
db: Session = Depends(get_db)
):
"""Get specific import job status"""
job = db.query(MarketplaceImportJob).filter(
MarketplaceImportJob.id == job_id,
MarketplaceImportJob.vendor_id == vendor.id
).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
return job
@router.get("/products", response_model=List[MarketplaceProductResponse])
async def get_marketplace_products(
import_job_id: Optional[int] = None,
is_selected: Optional[bool] = None,
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_vendor_user),
vendor: Vendor = Depends(get_current_vendor),
db: Session = Depends(get_db)
):
"""Get products in marketplace staging area"""
service = MarketplaceService()
products = service.get_marketplace_products(
vendor.id, db, import_job_id, is_selected, skip, limit
)
return products
Background Tasks
Celery Task (tasks/marketplace_import.py)
from celery import shared_task
from app.core.database import SessionLocal
from app.services.marketplace_service import MarketplaceService
@shared_task(bind=True, max_retries=3)
def process_csv_import(self, job_id: int):
"""
Process CSV import in background
This can take several minutes for large files
"""
db = SessionLocal()
try:
service = MarketplaceService()
service.process_csv_import(job_id, db)
except Exception as e:
# Retry on failure
raise self.retry(exc=e, countdown=60)
finally:
db.close()
🎨 Frontend Implementation
Templates
Import Dashboard (templates/vendor/marketplace/imports.html)
{% extends "vendor/base_vendor.html" %}
{% block title %}Product Import{% endblock %}
{% block content %}
<div x-data="marketplaceImport()" x-init="loadJobs()">
<!-- Page Header -->
<div class="page-header">
<h1>Marketplace Import</h1>
<button @click="showImportModal = true" class="btn btn-primary">
New Import
</button>
</div>
<!-- Import Configuration Card -->
<div class="card mb-3">
<h3>Letzshop CSV URLs</h3>
<div class="config-grid">
<div>
<label>French (FR)</label>
<input
type="text"
x-model="vendor.letzshop_csv_url_fr"
class="form-control"
readonly
>
</div>
<div>
<label>English (EN)</label>
<input
type="text"
x-model="vendor.letzshop_csv_url_en"
class="form-control"
readonly
>
</div>
<div>
<label>German (DE)</label>
<input
type="text"
x-model="vendor.letzshop_csv_url_de"
class="form-control"
readonly
>
</div>
</div>
<p class="text-muted mt-2">
Configure these URLs in vendor settings
</p>
</div>
<!-- Import Jobs List -->
<div class="card">
<h3>Import History</h3>
<template x-if="jobs.length === 0 && !loading">
<p class="text-muted">No imports yet. Start your first import!</p>
</template>
<template x-if="jobs.length > 0">
<table class="data-table">
<thead>
<tr>
<th>ID</th>
<th>Language</th>
<th>Status</th>
<th>Progress</th>
<th>Results</th>
<th>Started</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
<template x-for="job in jobs" :key="job.id">
<tr>
<td><strong x-text="`#${job.id}`"></strong></td>
<td x-text="job.language.toUpperCase()"></td>
<td>
<span
class="badge"
:class="{
'badge-warning': job.status === 'pending' || job.status === 'processing',
'badge-success': job.status === 'completed',
'badge-danger': job.status === 'failed'
}"
x-text="job.status"
></span>
</td>
<td>
<template x-if="job.status === 'processing'">
<div class="progress-bar">
<div
class="progress-fill"
:style="`width: ${(job.processed_rows / job.total_rows * 100)}%`"
></div>
</div>
<small x-text="`${job.processed_rows} / ${job.total_rows}`"></small>
</template>
<template x-if="job.status === 'completed'">
<span x-text="`${job.total_rows} rows`"></span>
</template>
</td>
<td>
<template x-if="job.status === 'completed'">
<div class="text-sm">
<div>✓ <span x-text="job.imported_count"></span> imported</div>
<div>↻ <span x-text="job.updated_count"></span> updated</div>
<template x-if="job.error_count > 0">
<div class="text-danger">✗ <span x-text="job.error_count"></span> errors</div>
</template>
</div>
</template>
</td>
<td x-text="formatDate(job.started_at)"></td>
<td>
<button
@click="viewProducts(job.id)"
class="btn btn-sm"
:disabled="job.status !== 'completed'"
>
View Products
</button>
</td>
</tr>
</template>
</tbody>
</table>
</template>
</div>
<!-- New Import Modal -->
<div x-show="showImportModal" class="modal-overlay" @click.self="showImportModal = false">
<div class="modal">
<div class="modal-header">
<h3>New Import</h3>
<button @click="showImportModal = false" class="modal-close">×</button>
</div>
<div class="modal-body">
<form @submit.prevent="triggerImport()">
<div class="form-group">
<label>Language</label>
<select x-model="newImport.language" class="form-control" required>
<option value="">Select language</option>
<option value="fr">French (FR)</option>
<option value="en">English (EN)</option>
<option value="de">German (DE)</option>
</select>
</div>
<div class="form-group">
<label>CSV URL</label>
<input
type="url"
x-model="newImport.csv_url"
class="form-control"
placeholder="https://..."
required
>
<div class="form-help">
Or use configured URL:
<button
type="button"
@click="newImport.csv_url = vendor.letzshop_csv_url_fr"
class="btn-link"
x-show="newImport.language === 'fr'"
>
Use FR URL
</button>
</div>
</div>
<div class="modal-footer">
<button type="button" @click="showImportModal = false" class="btn btn-secondary">
Cancel
</button>
<button type="submit" class="btn btn-primary" :disabled="importing">
<span x-show="!importing">Start Import</span>
<span x-show="importing" class="loading-spinner"></span>
</button>
</div>
</form>
</div>
</div>
</div>
</div>
<script>
window.vendorData = {{ vendor|tojson }};
</script>
{% endblock %}
{% block extra_scripts %}
<script>
function marketplaceImport() {
return {
vendor: window.vendorData,
jobs: [],
loading: false,
importing: false,
showImportModal: false,
newImport: {
language: '',
csv_url: ''
},
async loadJobs() {
this.loading = true;
try {
this.jobs = await apiClient.get('/api/v1/vendor/marketplace/jobs');
// Poll for active jobs
const activeJobs = this.jobs.filter(j =>
j.status === 'pending' || j.status === 'processing'
);
if (activeJobs.length > 0) {
setTimeout(() => this.loadJobs(), 3000); // Poll every 3 seconds
}
} catch (error) {
showNotification('Failed to load imports', 'error');
} finally {
this.loading = false;
}
},
async triggerImport() {
this.importing = true;
try {
const job = await apiClient.post('/api/v1/vendor/marketplace/import', {
csv_url: this.newImport.csv_url,
language: this.newImport.language
});
this.jobs.unshift(job);
this.showImportModal = false;
this.newImport = { language: '', csv_url: '' };
showNotification('Import started successfully', 'success');
// Start polling
setTimeout(() => this.loadJobs(), 3000);
} catch (error) {
showNotification(error.message || 'Import failed', 'error');
} finally {
this.importing = false;
}
},
viewProducts(jobId) {
window.location.href = `/vendor/marketplace/products?job_id=${jobId}`;
},
formatDate(dateString) {
if (!dateString) return '-';
return new Date(dateString).toLocaleString();
}
}
}
</script>
{% endblock %}
✅ Testing Checklist
Backend Tests
- CSV download works with valid URL
- CSV parsing handles various formats
- Products created in staging table
- Duplicate SKUs are updated, not duplicated
- Import job status updates correctly
- Progress tracking is accurate
- Error handling works for invalid CSV
- Large CSV files (10,000+ rows) process successfully
- Celery tasks execute correctly
Frontend Tests
- Import page loads correctly
- New import modal works
- Import jobs display in table
- Real-time progress updates (polling)
- Completed imports show results
- Can view products from import
- Error states display correctly
- Loading states work correctly
Integration Tests
- Complete import workflow works end-to-end
- Vendor isolation maintained
- API endpoints require authentication
- Vendor can only see their own imports
🚀 Deployment Checklist
- Celery worker running
- Redis/RabbitMQ configured for Celery
- Database migrations applied
- CSV download timeout configured
- Error logging configured
- Background task monitoring set up
➡️ Next Steps
After completing Slice 2, move to Slice 3: Product Catalog Management to enable vendors to publish imported products to their catalog.
Slice 2 Status: 📋 Not Started
Dependencies: Slice 1 must be complete
Estimated Duration: 5 days