Files
orion/models/database/letzshop.py
Samir Boulahtit ccfbbcb804 feat: add Letzshop vendor directory with sync and admin management
- Add LetzshopVendorCache model to store cached vendor data from Letzshop API
- Create LetzshopVendorSyncService for syncing vendor directory
- Add Celery task for background vendor sync
- Create admin page at /admin/letzshop/vendor-directory with:
  - Stats dashboard (total, claimed, unclaimed vendors)
  - Searchable/filterable vendor list
  - "Sync Now" button to trigger sync
  - Ability to create platform vendors from Letzshop cache
- Add API endpoints for vendor directory management
- Add Pydantic schemas for API responses

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-13 20:35:46 +01:00

380 lines
13 KiB
Python

# models/database/letzshop.py
"""
Database models for Letzshop marketplace integration.
Provides models for:
- VendorLetzshopCredentials: Per-vendor API key storage (encrypted)
- LetzshopFulfillmentQueue: Outbound operation queue with retry
- LetzshopSyncLog: Audit trail for sync operations
- LetzshopHistoricalImportJob: Progress tracking for historical imports
Note: Orders are now stored in the unified `orders` table with channel='letzshop'.
The LetzshopOrder model has been removed in favor of the unified Order model.
"""
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
Index,
Integer,
String,
Text,
)
from sqlalchemy.dialects.sqlite import JSON
from sqlalchemy.orm import relationship
from app.core.database import Base
from models.database.base import TimestampMixin
class VendorLetzshopCredentials(Base, TimestampMixin):
"""
Per-vendor Letzshop API credentials.
Stores encrypted API keys and sync settings for each vendor's
Letzshop integration.
"""
__tablename__ = "vendor_letzshop_credentials"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(
Integer, ForeignKey("vendors.id"), unique=True, nullable=False, index=True
)
# Encrypted API credentials
api_key_encrypted = Column(Text, nullable=False)
api_endpoint = Column(String(255), default="https://letzshop.lu/graphql")
# Sync settings
auto_sync_enabled = Column(Boolean, default=False)
sync_interval_minutes = Column(Integer, default=15)
# Test mode (disables API mutations when enabled)
test_mode_enabled = Column(Boolean, default=False)
# Default carrier settings
default_carrier = Column(String(50), nullable=True) # greco, colissimo, xpresslogistics
# Carrier label URL prefixes
carrier_greco_label_url = Column(
String(500), default="https://dispatchweb.fr/Tracky/Home/"
)
carrier_colissimo_label_url = Column(String(500), nullable=True)
carrier_xpresslogistics_label_url = Column(String(500), nullable=True)
# Last sync status
last_sync_at = Column(DateTime(timezone=True), nullable=True)
last_sync_status = Column(String(50), nullable=True) # success, failed, partial
last_sync_error = Column(Text, nullable=True)
# Relationships
vendor = relationship("Vendor", back_populates="letzshop_credentials")
def __repr__(self):
return f"<VendorLetzshopCredentials(vendor_id={self.vendor_id}, auto_sync={self.auto_sync_enabled})>"
class LetzshopFulfillmentQueue(Base, TimestampMixin):
"""
Queue for outbound fulfillment operations to Letzshop.
Supports retry logic for failed operations.
References the unified orders table.
"""
__tablename__ = "letzshop_fulfillment_queue"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
order_id = Column(Integer, ForeignKey("orders.id"), nullable=False, index=True)
# Operation type
operation = Column(
String(50), nullable=False
) # confirm_item, decline_item, set_tracking
# Operation payload
payload = Column(JSON, nullable=False)
# Status and retry
status = Column(
String(50), default="pending"
) # pending, processing, completed, failed
attempts = Column(Integer, default=0)
max_attempts = Column(Integer, default=3)
last_attempt_at = Column(DateTime(timezone=True), nullable=True)
next_retry_at = Column(DateTime(timezone=True), nullable=True)
error_message = Column(Text, nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Response from Letzshop
response_data = Column(JSON, nullable=True)
# Relationships
vendor = relationship("Vendor")
order = relationship("Order")
__table_args__ = (
Index("idx_fulfillment_queue_status", "status", "vendor_id"),
Index("idx_fulfillment_queue_retry", "status", "next_retry_at"),
Index("idx_fulfillment_queue_order", "order_id"),
)
def __repr__(self):
return f"<LetzshopFulfillmentQueue(id={self.id}, order_id={self.order_id}, operation='{self.operation}', status='{self.status}')>"
class LetzshopSyncLog(Base, TimestampMixin):
"""
Audit log for all Letzshop sync operations.
"""
__tablename__ = "letzshop_sync_logs"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
# Operation details
operation_type = Column(
String(50), nullable=False
) # order_import, confirm_inventory, set_tracking, etc.
direction = Column(String(10), nullable=False) # inbound, outbound
# Status
status = Column(String(50), nullable=False) # success, failed, partial
# Details
records_processed = Column(Integer, default=0)
records_succeeded = Column(Integer, default=0)
records_failed = Column(Integer, default=0)
error_details = Column(JSON, nullable=True)
# Timestamps
started_at = Column(DateTime(timezone=True), nullable=False)
completed_at = Column(DateTime(timezone=True), nullable=True)
duration_seconds = Column(Integer, nullable=True)
# Triggered by
triggered_by = Column(String(100), nullable=True) # user_id, scheduler, webhook
# Relationships
vendor = relationship("Vendor")
__table_args__ = (
Index("idx_sync_log_vendor_type", "vendor_id", "operation_type"),
Index("idx_sync_log_vendor_date", "vendor_id", "started_at"),
)
def __repr__(self):
return f"<LetzshopSyncLog(id={self.id}, type='{self.operation_type}', status='{self.status}')>"
class LetzshopVendorCache(Base, TimestampMixin):
"""
Cache of Letzshop marketplace vendor directory.
This table stores vendor data fetched from Letzshop's public GraphQL API,
allowing users to browse and claim existing Letzshop shops during signup.
Data is periodically synced from Letzshop (e.g., daily via Celery task).
"""
__tablename__ = "letzshop_vendor_cache"
id = Column(Integer, primary_key=True, index=True)
# Letzshop identifiers
letzshop_id = Column(String(50), unique=True, nullable=False, index=True)
"""Unique ID from Letzshop (e.g., 'lpkedYMRup')."""
slug = Column(String(200), unique=True, nullable=False, index=True)
"""URL slug (e.g., 'nicks-diecast-corner')."""
# Basic info
name = Column(String(255), nullable=False)
"""Vendor display name."""
company_name = Column(String(255), nullable=True)
"""Legal company name."""
is_active = Column(Boolean, default=True)
"""Whether vendor is active on Letzshop."""
# Descriptions (multilingual)
description_en = Column(Text, nullable=True)
description_fr = Column(Text, nullable=True)
description_de = Column(Text, nullable=True)
# Contact information
email = Column(String(255), nullable=True)
phone = Column(String(50), nullable=True)
fax = Column(String(50), nullable=True)
website = Column(String(500), nullable=True)
# Location
street = Column(String(255), nullable=True)
street_number = Column(String(50), nullable=True)
city = Column(String(100), nullable=True)
zipcode = Column(String(20), nullable=True)
country_iso = Column(String(5), default="LU")
latitude = Column(String(20), nullable=True)
longitude = Column(String(20), nullable=True)
# Categories (stored as JSON array of names)
categories = Column(JSON, default=list)
"""List of category names, e.g., ['Fashion', 'Shoes']."""
# Images
background_image_url = Column(String(500), nullable=True)
# Social media (stored as JSON array of URLs)
social_media_links = Column(JSON, default=list)
"""List of social media URLs."""
# Opening hours (multilingual text)
opening_hours_en = Column(Text, nullable=True)
opening_hours_fr = Column(Text, nullable=True)
opening_hours_de = Column(Text, nullable=True)
# Representative
representative_name = Column(String(255), nullable=True)
representative_title = Column(String(100), nullable=True)
# Claiming status (linked to our platform)
claimed_by_vendor_id = Column(
Integer, ForeignKey("vendors.id"), nullable=True, index=True
)
"""If claimed, links to our Vendor record."""
claimed_at = Column(DateTime(timezone=True), nullable=True)
"""When the vendor was claimed on our platform."""
# Sync metadata
last_synced_at = Column(DateTime(timezone=True), nullable=False)
"""When this record was last updated from Letzshop."""
raw_data = Column(JSON, nullable=True)
"""Full raw response from Letzshop API for reference."""
# Relationship to claimed vendor
claimed_vendor = relationship("Vendor", foreign_keys=[claimed_by_vendor_id])
__table_args__ = (
Index("idx_vendor_cache_city", "city"),
Index("idx_vendor_cache_claimed", "claimed_by_vendor_id"),
Index("idx_vendor_cache_active", "is_active"),
)
def __repr__(self):
return f"<LetzshopVendorCache(id={self.id}, slug='{self.slug}', name='{self.name}')>"
@property
def is_claimed(self) -> bool:
"""Check if this vendor has been claimed on our platform."""
return self.claimed_by_vendor_id is not None
@property
def letzshop_url(self) -> str:
"""Get the Letzshop profile URL."""
return f"https://letzshop.lu/vendors/{self.slug}"
def get_description(self, lang: str = "en") -> str | None:
"""Get description in specified language with fallback."""
descriptions = {
"en": self.description_en,
"fr": self.description_fr,
"de": self.description_de,
}
# Try requested language, then fallback order
for try_lang in [lang, "en", "fr", "de"]:
if descriptions.get(try_lang):
return descriptions[try_lang]
return None
def get_opening_hours(self, lang: str = "en") -> str | None:
"""Get opening hours in specified language with fallback."""
hours = {
"en": self.opening_hours_en,
"fr": self.opening_hours_fr,
"de": self.opening_hours_de,
}
for try_lang in [lang, "en", "fr", "de"]:
if hours.get(try_lang):
return hours[try_lang]
return None
def get_full_address(self) -> str | None:
"""Get formatted full address."""
parts = []
if self.street:
addr = self.street
if self.street_number:
addr += f" {self.street_number}"
parts.append(addr)
if self.zipcode or self.city:
parts.append(f"{self.zipcode or ''} {self.city or ''}".strip())
return ", ".join(parts) if parts else None
class LetzshopHistoricalImportJob(Base, TimestampMixin):
"""
Track progress of historical order imports from Letzshop.
Enables real-time progress tracking via polling for long-running imports.
"""
__tablename__ = "letzshop_historical_import_jobs"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Status: pending | fetching | processing | completed | failed
status = Column(String(50), default="pending", nullable=False)
# Current phase: "confirmed" | "declined"
current_phase = Column(String(20), nullable=True)
# Fetch progress
current_page = Column(Integer, default=0)
total_pages = Column(Integer, nullable=True) # null = unknown yet
shipments_fetched = Column(Integer, default=0)
# Processing progress
orders_processed = Column(Integer, default=0)
orders_imported = Column(Integer, default=0)
orders_updated = Column(Integer, default=0)
orders_skipped = Column(Integer, default=0)
# EAN matching stats
products_matched = Column(Integer, default=0)
products_not_found = Column(Integer, default=0)
# Phase-specific stats (stored as JSON for combining confirmed + declined)
confirmed_stats = Column(JSON, nullable=True)
declined_stats = Column(JSON, nullable=True)
# Error handling
error_message = Column(Text, nullable=True)
# Celery task tracking (optional - for USE_CELERY=true)
celery_task_id = Column(String(255), nullable=True, index=True)
# Timing
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
vendor = relationship("Vendor")
user = relationship("User")
__table_args__ = (
Index("idx_historical_import_vendor", "vendor_id", "status"),
)
def __repr__(self):
return f"<LetzshopHistoricalImportJob(id={self.id}, status='{self.status}', phase='{self.current_phase}')>"