Files
orion/models/database/letzshop.py
Samir Boulahtit 2792414395 feat: add Celery/Redis task queue with feature flag support
Migrate background tasks from FastAPI BackgroundTasks to Celery with Redis
for persistent task queuing, retries, and scheduled jobs.

Key changes:
- Add Celery configuration with Redis broker/backend
- Create task dispatcher with USE_CELERY feature flag for gradual rollout
- Add Celery task wrappers for all background operations:
  - Marketplace imports
  - Letzshop historical imports
  - Product exports
  - Code quality scans
  - Test runs
  - Subscription scheduled tasks (via Celery Beat)
- Add celery_task_id column to job tables for Flower integration
- Add Flower dashboard link to admin background tasks page
- Update docker-compose.yml with worker, beat, and flower services
- Add Makefile targets: celery-worker, celery-beat, celery-dev, flower

When USE_CELERY=false (default), system falls back to FastAPI BackgroundTasks
for development without Redis dependency.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 17:35:16 +01:00

233 lines
7.7 KiB
Python

# models/database/letzshop.py
"""
Database models for Letzshop marketplace integration.
Provides models for:
- VendorLetzshopCredentials: Per-vendor API key storage (encrypted)
- LetzshopFulfillmentQueue: Outbound operation queue with retry
- LetzshopSyncLog: Audit trail for sync operations
- LetzshopHistoricalImportJob: Progress tracking for historical imports
Note: Orders are now stored in the unified `orders` table with channel='letzshop'.
The LetzshopOrder model has been removed in favor of the unified Order model.
"""
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
Index,
Integer,
String,
Text,
)
from sqlalchemy.dialects.sqlite import JSON
from sqlalchemy.orm import relationship
from app.core.database import Base
from models.database.base import TimestampMixin
class VendorLetzshopCredentials(Base, TimestampMixin):
"""
Per-vendor Letzshop API credentials.
Stores encrypted API keys and sync settings for each vendor's
Letzshop integration.
"""
__tablename__ = "vendor_letzshop_credentials"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(
Integer, ForeignKey("vendors.id"), unique=True, nullable=False, index=True
)
# Encrypted API credentials
api_key_encrypted = Column(Text, nullable=False)
api_endpoint = Column(String(255), default="https://letzshop.lu/graphql")
# Sync settings
auto_sync_enabled = Column(Boolean, default=False)
sync_interval_minutes = Column(Integer, default=15)
# Test mode (disables API mutations when enabled)
test_mode_enabled = Column(Boolean, default=False)
# Default carrier settings
default_carrier = Column(String(50), nullable=True) # greco, colissimo, xpresslogistics
# Carrier label URL prefixes
carrier_greco_label_url = Column(
String(500), default="https://dispatchweb.fr/Tracky/Home/"
)
carrier_colissimo_label_url = Column(String(500), nullable=True)
carrier_xpresslogistics_label_url = Column(String(500), nullable=True)
# Last sync status
last_sync_at = Column(DateTime(timezone=True), nullable=True)
last_sync_status = Column(String(50), nullable=True) # success, failed, partial
last_sync_error = Column(Text, nullable=True)
# Relationships
vendor = relationship("Vendor", back_populates="letzshop_credentials")
def __repr__(self):
return f"<VendorLetzshopCredentials(vendor_id={self.vendor_id}, auto_sync={self.auto_sync_enabled})>"
class LetzshopFulfillmentQueue(Base, TimestampMixin):
"""
Queue for outbound fulfillment operations to Letzshop.
Supports retry logic for failed operations.
References the unified orders table.
"""
__tablename__ = "letzshop_fulfillment_queue"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
order_id = Column(Integer, ForeignKey("orders.id"), nullable=False, index=True)
# Operation type
operation = Column(
String(50), nullable=False
) # confirm_item, decline_item, set_tracking
# Operation payload
payload = Column(JSON, nullable=False)
# Status and retry
status = Column(
String(50), default="pending"
) # pending, processing, completed, failed
attempts = Column(Integer, default=0)
max_attempts = Column(Integer, default=3)
last_attempt_at = Column(DateTime(timezone=True), nullable=True)
next_retry_at = Column(DateTime(timezone=True), nullable=True)
error_message = Column(Text, nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Response from Letzshop
response_data = Column(JSON, nullable=True)
# Relationships
vendor = relationship("Vendor")
order = relationship("Order")
__table_args__ = (
Index("idx_fulfillment_queue_status", "status", "vendor_id"),
Index("idx_fulfillment_queue_retry", "status", "next_retry_at"),
Index("idx_fulfillment_queue_order", "order_id"),
)
def __repr__(self):
return f"<LetzshopFulfillmentQueue(id={self.id}, order_id={self.order_id}, operation='{self.operation}', status='{self.status}')>"
class LetzshopSyncLog(Base, TimestampMixin):
"""
Audit log for all Letzshop sync operations.
"""
__tablename__ = "letzshop_sync_logs"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
# Operation details
operation_type = Column(
String(50), nullable=False
) # order_import, confirm_inventory, set_tracking, etc.
direction = Column(String(10), nullable=False) # inbound, outbound
# Status
status = Column(String(50), nullable=False) # success, failed, partial
# Details
records_processed = Column(Integer, default=0)
records_succeeded = Column(Integer, default=0)
records_failed = Column(Integer, default=0)
error_details = Column(JSON, nullable=True)
# Timestamps
started_at = Column(DateTime(timezone=True), nullable=False)
completed_at = Column(DateTime(timezone=True), nullable=True)
duration_seconds = Column(Integer, nullable=True)
# Triggered by
triggered_by = Column(String(100), nullable=True) # user_id, scheduler, webhook
# Relationships
vendor = relationship("Vendor")
__table_args__ = (
Index("idx_sync_log_vendor_type", "vendor_id", "operation_type"),
Index("idx_sync_log_vendor_date", "vendor_id", "started_at"),
)
def __repr__(self):
return f"<LetzshopSyncLog(id={self.id}, type='{self.operation_type}', status='{self.status}')>"
class LetzshopHistoricalImportJob(Base, TimestampMixin):
"""
Track progress of historical order imports from Letzshop.
Enables real-time progress tracking via polling for long-running imports.
"""
__tablename__ = "letzshop_historical_import_jobs"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Status: pending | fetching | processing | completed | failed
status = Column(String(50), default="pending", nullable=False)
# Current phase: "confirmed" | "declined"
current_phase = Column(String(20), nullable=True)
# Fetch progress
current_page = Column(Integer, default=0)
total_pages = Column(Integer, nullable=True) # null = unknown yet
shipments_fetched = Column(Integer, default=0)
# Processing progress
orders_processed = Column(Integer, default=0)
orders_imported = Column(Integer, default=0)
orders_updated = Column(Integer, default=0)
orders_skipped = Column(Integer, default=0)
# EAN matching stats
products_matched = Column(Integer, default=0)
products_not_found = Column(Integer, default=0)
# Phase-specific stats (stored as JSON for combining confirmed + declined)
confirmed_stats = Column(JSON, nullable=True)
declined_stats = Column(JSON, nullable=True)
# Error handling
error_message = Column(Text, nullable=True)
# Celery task tracking (optional - for USE_CELERY=true)
celery_task_id = Column(String(255), nullable=True, index=True)
# Timing
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
vendor = relationship("Vendor")
user = relationship("User")
__table_args__ = (
Index("idx_historical_import_vendor", "vendor_id", "status"),
)
def __repr__(self):
return f"<LetzshopHistoricalImportJob(id={self.id}, status='{self.status}', phase='{self.current_phase}')>"