feat: complete marketplace module migration (Phase 6)

Migrates marketplace module to self-contained structure:
- Create app/modules/marketplace/services/ re-exporting from existing locations
- Create app/modules/marketplace/models/ with marketplace & letzshop models
- Create app/modules/marketplace/schemas/ with product & import schemas
- Create app/modules/marketplace/tasks/ with 5 Celery tasks:
  - process_marketplace_import - CSV product import
  - process_historical_import - Letzshop order import
  - sync_vendor_directory - Scheduled daily vendor sync
  - export_vendor_products_to_folder - Multi-language export
  - export_marketplace_products - Admin export
- Create app/modules/marketplace/exceptions.py
- Update definition.py with is_self_contained=True and scheduled_tasks

Celery task migration:
- process_marketplace_import, process_historical_import -> import_tasks.py
- sync_vendor_directory -> sync_tasks.py (scheduled daily at 02:00)
- export_vendor_products_to_folder, export_marketplace_products -> export_tasks.py

Backward compatibility:
- Legacy task files now re-export from new locations
- Remove marketplace/letzshop/export from LEGACY_TASK_MODULES

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-27 23:19:31 +01:00
parent 4f379b472b
commit eb47daec8b
15 changed files with 1088 additions and 704 deletions

View File

@@ -49,13 +49,15 @@ if SENTRY_DSN:
# TASK DISCOVERY # TASK DISCOVERY
# ============================================================================= # =============================================================================
# Legacy tasks (will be migrated to modules over time) # Legacy tasks (will be migrated to modules over time)
# NOTE: Most subscription tasks have been migrated to app.modules.billing.tasks # MIGRATION STATUS:
# The subscription module is kept for capture_capacity_snapshot (will move to monitoring) # - subscription: MIGRATED to billing module (kept for capture_capacity_snapshot -> monitoring)
# - marketplace, letzshop, export: MIGRATED to marketplace module
# - code_quality, test_runner: Will migrate to dev-tools module
LEGACY_TASK_MODULES = [ LEGACY_TASK_MODULES = [
"app.tasks.celery_tasks.marketplace", # "app.tasks.celery_tasks.marketplace", # MIGRATED to marketplace module
"app.tasks.celery_tasks.letzshop", # "app.tasks.celery_tasks.letzshop", # MIGRATED to marketplace module
"app.tasks.celery_tasks.subscription", # Kept for capture_capacity_snapshot only "app.tasks.celery_tasks.subscription", # Kept for capture_capacity_snapshot only
"app.tasks.celery_tasks.export", # "app.tasks.celery_tasks.export", # MIGRATED to marketplace module
"app.tasks.celery_tasks.code_quality", "app.tasks.celery_tasks.code_quality",
"app.tasks.celery_tasks.test_runner", "app.tasks.celery_tasks.test_runner",
] ]

View File

@@ -3,10 +3,11 @@
Marketplace Module - Letzshop integration. Marketplace Module - Letzshop integration.
This module provides: This module provides:
- Letzshop product sync - Product import from marketplace CSV feeds
- Marketplace import operations - Order import from Letzshop API
- Product catalog synchronization - Vendor directory synchronization
- Order import from marketplace - Product export to Letzshop CSV format
- Scheduled sync tasks
Dependencies: Dependencies:
- Requires: inventory module (for product management) - Requires: inventory module (for product management)
@@ -18,8 +19,20 @@ Routes:
Menu Items: Menu Items:
- Admin: marketplace-letzshop - Admin: marketplace-letzshop
- Vendor: marketplace, letzshop - Vendor: marketplace, letzshop
Usage:
from app.modules.marketplace import marketplace_module
from app.modules.marketplace.services import letzshop_export_service
from app.modules.marketplace.models import MarketplaceProduct
from app.modules.marketplace.exceptions import LetzshopClientError
""" """
from app.modules.marketplace.definition import marketplace_module from app.modules.marketplace.definition import (
marketplace_module,
get_marketplace_module_with_routers,
)
__all__ = ["marketplace_module"] __all__ = [
"marketplace_module",
"get_marketplace_module_with_routers",
]

View File

@@ -3,12 +3,12 @@
Marketplace module definition. Marketplace module definition.
Defines the marketplace module including its features, menu items, Defines the marketplace module including its features, menu items,
dependencies, and route configurations. dependencies, route configurations, and scheduled tasks.
Note: This module requires the inventory module to be enabled. Note: This module requires the inventory module to be enabled.
""" """
from app.modules.base import ModuleDefinition from app.modules.base import ModuleDefinition, ScheduledTask
from models.database.admin_menu_config import FrontendType from models.database.admin_menu_config import FrontendType
@@ -34,6 +34,7 @@ marketplace_module = ModuleDefinition(
"Letzshop marketplace integration for product sync, order import, " "Letzshop marketplace integration for product sync, order import, "
"and catalog synchronization." "and catalog synchronization."
), ),
version="1.0.0",
requires=["inventory"], # Depends on inventory module requires=["inventory"], # Depends on inventory module
features=[ features=[
"letzshop_sync", # Sync products with Letzshop "letzshop_sync", # Sync products with Letzshop
@@ -52,6 +53,26 @@ marketplace_module = ModuleDefinition(
], ],
}, },
is_core=False, is_core=False,
# =========================================================================
# Self-Contained Module Configuration
# =========================================================================
is_self_contained=True,
services_path="app.modules.marketplace.services",
models_path="app.modules.marketplace.models",
schemas_path="app.modules.marketplace.schemas",
exceptions_path="app.modules.marketplace.exceptions",
tasks_path="app.modules.marketplace.tasks",
# =========================================================================
# Scheduled Tasks
# =========================================================================
scheduled_tasks=[
ScheduledTask(
name="marketplace.sync_vendor_directory",
task="app.modules.marketplace.tasks.sync_tasks.sync_vendor_directory",
schedule="0 2 * * *", # Daily at 02:00
options={"queue": "scheduled"},
),
],
) )

View File

@@ -0,0 +1,105 @@
# app/modules/marketplace/exceptions.py
"""
Marketplace module exceptions.
Custom exceptions for Letzshop integration, product import/export, and sync operations.
"""
from app.exceptions import BusinessLogicException, ResourceNotFoundException
class MarketplaceException(BusinessLogicException):
"""Base exception for marketplace module errors."""
pass
class LetzshopClientError(MarketplaceException):
"""Raised when Letzshop API call fails."""
def __init__(self, message: str, status_code: int | None = None, response: str | None = None):
super().__init__(message)
self.status_code = status_code
self.response = response
class LetzshopAuthenticationError(LetzshopClientError):
"""Raised when Letzshop authentication fails."""
def __init__(self, message: str = "Letzshop authentication failed"):
super().__init__(message, status_code=401)
class LetzshopCredentialsNotFoundException(ResourceNotFoundException):
"""Raised when Letzshop credentials not found for vendor."""
def __init__(self, vendor_id: int):
super().__init__("LetzshopCredentials", str(vendor_id))
self.vendor_id = vendor_id
class ImportJobNotFoundException(ResourceNotFoundException):
"""Raised when a marketplace import job is not found."""
def __init__(self, job_id: int):
super().__init__("MarketplaceImportJob", str(job_id))
class HistoricalImportJobNotFoundException(ResourceNotFoundException):
"""Raised when a historical import job is not found."""
def __init__(self, job_id: int):
super().__init__("LetzshopHistoricalImportJob", str(job_id))
class VendorNotFoundException(ResourceNotFoundException):
"""Raised when a vendor is not found."""
def __init__(self, vendor_id: int):
super().__init__("Vendor", str(vendor_id))
class ProductNotFoundException(ResourceNotFoundException):
"""Raised when a marketplace product is not found."""
def __init__(self, product_id: str | int):
super().__init__("MarketplaceProduct", str(product_id))
class ImportValidationError(MarketplaceException):
"""Raised when import data validation fails."""
def __init__(self, message: str, errors: list[dict] | None = None):
super().__init__(message)
self.errors = errors or []
class ExportError(MarketplaceException):
"""Raised when product export fails."""
def __init__(self, message: str, language: str | None = None):
super().__init__(message)
self.language = language
class SyncError(MarketplaceException):
"""Raised when vendor directory sync fails."""
def __init__(self, message: str, vendor_code: str | None = None):
super().__init__(message)
self.vendor_code = vendor_code
__all__ = [
"MarketplaceException",
"LetzshopClientError",
"LetzshopAuthenticationError",
"LetzshopCredentialsNotFoundException",
"ImportJobNotFoundException",
"HistoricalImportJobNotFoundException",
"VendorNotFoundException",
"ProductNotFoundException",
"ImportValidationError",
"ExportError",
"SyncError",
]

View File

@@ -0,0 +1,42 @@
# app/modules/marketplace/models/__init__.py
"""
Marketplace module models.
Re-exports marketplace and Letzshop models from the central models location.
Models remain in models/database/ for now to avoid breaking existing imports.
Usage:
from app.modules.marketplace.models import (
MarketplaceProduct,
MarketplaceImportJob,
VendorLetzshopCredentials,
LetzshopHistoricalImportJob,
)
"""
from models.database.marketplace_product import MarketplaceProduct
from models.database.marketplace_product_translation import MarketplaceProductTranslation
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.letzshop import (
# Letzshop credentials and sync
VendorLetzshopCredentials,
LetzshopFulfillmentQueue,
LetzshopVendorCache,
LetzshopSyncLog,
# Import jobs
LetzshopHistoricalImportJob,
)
__all__ = [
# Marketplace products
"MarketplaceProduct",
"MarketplaceProductTranslation",
# Import jobs
"MarketplaceImportJob",
"LetzshopHistoricalImportJob",
# Letzshop models
"VendorLetzshopCredentials",
"LetzshopFulfillmentQueue",
"LetzshopVendorCache",
"LetzshopSyncLog",
]

View File

@@ -0,0 +1,56 @@
# app/modules/marketplace/schemas/__init__.py
"""
Marketplace module Pydantic schemas.
Re-exports marketplace schemas from the central schemas location.
Provides a module-local import path while maintaining backwards compatibility.
Usage:
from app.modules.marketplace.schemas import (
MarketplaceImportJobRequest,
MarketplaceProductResponse,
MarketplaceProductCreate,
)
"""
from models.schema.marketplace_import_job import (
MarketplaceImportJobRequest,
AdminMarketplaceImportJobRequest,
MarketplaceImportJobResponse,
MarketplaceImportJobListResponse,
)
from models.schema.marketplace_product import (
# Translation schemas
MarketplaceProductTranslationSchema,
# Base schemas
MarketplaceProductBase,
# CRUD schemas
MarketplaceProductCreate,
MarketplaceProductUpdate,
# Response schemas
MarketplaceProductResponse,
MarketplaceProductListResponse,
MarketplaceProductDetailResponse,
# Import schemas
MarketplaceImportRequest,
MarketplaceImportResponse,
)
__all__ = [
# Import job schemas
"MarketplaceImportJobRequest",
"AdminMarketplaceImportJobRequest",
"MarketplaceImportJobResponse",
"MarketplaceImportJobListResponse",
# Product schemas
"MarketplaceProductTranslationSchema",
"MarketplaceProductBase",
"MarketplaceProductCreate",
"MarketplaceProductUpdate",
"MarketplaceProductResponse",
"MarketplaceProductListResponse",
"MarketplaceProductDetailResponse",
# Import schemas
"MarketplaceImportRequest",
"MarketplaceImportResponse",
]

View File

@@ -0,0 +1,61 @@
# app/modules/marketplace/services/__init__.py
"""
Marketplace module services.
Re-exports Letzshop and marketplace services from their current locations.
Services remain in app/services/ for now to avoid breaking existing imports.
Usage:
from app.modules.marketplace.services import (
letzshop_export_service,
marketplace_import_job_service,
marketplace_product_service,
)
from app.modules.marketplace.services.letzshop import (
LetzshopClient,
LetzshopCredentialsService,
LetzshopOrderService,
LetzshopVendorSyncService,
)
"""
# Re-export from existing locations for convenience
from app.services.letzshop_export_service import (
LetzshopExportService,
letzshop_export_service,
)
from app.services.marketplace_import_job_service import (
MarketplaceImportJobService,
marketplace_import_job_service,
)
from app.services.marketplace_product_service import (
MarketplaceProductService,
marketplace_product_service,
)
# Letzshop submodule re-exports
from app.services.letzshop import (
LetzshopClient,
LetzshopClientError,
)
from app.services.letzshop.credentials_service import LetzshopCredentialsService
from app.services.letzshop.order_service import LetzshopOrderService
from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService
__all__ = [
# Export service
"LetzshopExportService",
"letzshop_export_service",
# Import job service
"MarketplaceImportJobService",
"marketplace_import_job_service",
# Product service
"MarketplaceProductService",
"marketplace_product_service",
# Letzshop services
"LetzshopClient",
"LetzshopClientError",
"LetzshopCredentialsService",
"LetzshopOrderService",
"LetzshopVendorSyncService",
]

View File

@@ -0,0 +1,33 @@
# app/modules/marketplace/tasks/__init__.py
"""
Marketplace module Celery tasks.
Tasks for:
- CSV product import from marketplace feeds
- Historical order imports from Letzshop API
- Vendor directory synchronization
- Product export to Letzshop CSV format
"""
from app.modules.marketplace.tasks.import_tasks import (
process_marketplace_import,
process_historical_import,
)
from app.modules.marketplace.tasks.sync_tasks import (
sync_vendor_directory,
)
from app.modules.marketplace.tasks.export_tasks import (
export_vendor_products_to_folder,
export_marketplace_products,
)
__all__ = [
# Import tasks
"process_marketplace_import",
"process_historical_import",
# Sync tasks
"sync_vendor_directory",
# Export tasks
"export_vendor_products_to_folder",
"export_marketplace_products",
]

View File

@@ -0,0 +1,197 @@
# app/modules/marketplace/tasks/export_tasks.py
"""
Celery tasks for product export operations.
Handles exporting vendor products to various formats (e.g., Letzshop CSV).
"""
import logging
from datetime import UTC, datetime
from pathlib import Path
from app.core.celery_config import celery_app
from app.modules.task_base import ModuleTask
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True,
base=ModuleTask,
name="app.modules.marketplace.tasks.export_tasks.export_vendor_products_to_folder",
max_retries=3,
default_retry_delay=60,
)
def export_vendor_products_to_folder(
self,
vendor_id: int,
triggered_by: str,
include_inactive: bool = False,
):
"""
Export all 3 languages (en, fr, de) to disk asynchronously.
Args:
vendor_id: ID of the vendor to export
triggered_by: User identifier who triggered the export
include_inactive: Whether to include inactive products
Returns:
dict: Export results per language with file paths
"""
from app.services.letzshop_export_service import letzshop_export_service
languages = ["en", "fr", "de"]
results = {}
export_dir = None
with self.get_db() as db:
# Get vendor info
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
logger.error(f"Vendor {vendor_id} not found for export")
return {"error": f"Vendor {vendor_id} not found"}
vendor_code = vendor.vendor_code
# Create export directory
export_dir = Path(f"exports/letzshop/{vendor_code}")
export_dir.mkdir(parents=True, exist_ok=True)
started_at = datetime.now(UTC)
logger.info(f"Starting product export for vendor {vendor_code} (ID: {vendor_id})")
for lang in languages:
try:
# Generate CSV content
csv_content = letzshop_export_service.export_vendor_products(
db=db,
vendor_id=vendor_id,
language=lang,
include_inactive=include_inactive,
)
# Write to file
file_name = f"{vendor_code}_products_{lang}.csv"
file_path = export_dir / file_name
with open(file_path, "w", encoding="utf-8") as f:
f.write(csv_content)
results[lang] = {
"success": True,
"path": str(file_path),
"file_name": file_name,
}
logger.info(f"Exported {lang} products to {file_path}")
except Exception as e:
logger.error(f"Error exporting {lang} products for vendor {vendor_id}: {e}")
results[lang] = {
"success": False,
"error": str(e),
}
# Log the export
completed_at = datetime.now(UTC)
duration = (completed_at - started_at).total_seconds()
success_count = sum(1 for r in results.values() if r.get("success"))
try:
letzshop_export_service.log_export(
db=db,
vendor_id=vendor_id,
triggered_by=triggered_by,
records_processed=len(languages),
records_succeeded=success_count,
records_failed=len(languages) - success_count,
duration_seconds=duration,
)
except Exception as e:
logger.error(f"Failed to log export: {e}")
logger.info(
f"Product export complete for vendor {vendor_code}: "
f"{success_count}/{len(languages)} languages exported in {duration:.2f}s"
)
return {
"vendor_id": vendor_id,
"vendor_code": vendor_code,
"export_dir": str(export_dir),
"results": results,
"duration_seconds": duration,
"triggered_by": triggered_by,
}
@celery_app.task(
bind=True,
base=ModuleTask,
name="app.modules.marketplace.tasks.export_tasks.export_marketplace_products",
max_retries=3,
default_retry_delay=60,
)
def export_marketplace_products(
self,
language: str = "en",
triggered_by: str = "system",
):
"""
Export all marketplace products (admin use).
Args:
language: Language code for export (en, fr, de)
triggered_by: User identifier who triggered the export
Returns:
dict: Export result with file path
"""
from app.services.letzshop_export_service import letzshop_export_service
with self.get_db() as db:
started_at = datetime.now(UTC)
logger.info(f"Starting marketplace product export ({language})")
try:
# Create export directory
export_dir = Path("exports/marketplace")
export_dir.mkdir(parents=True, exist_ok=True)
# Generate CSV content
csv_content = letzshop_export_service.export_marketplace_products(
db=db,
language=language,
)
# Write to file
timestamp = started_at.strftime("%Y%m%d_%H%M%S")
file_name = f"marketplace_products_{language}_{timestamp}.csv"
file_path = export_dir / file_name
with open(file_path, "w", encoding="utf-8") as f:
f.write(csv_content)
completed_at = datetime.now(UTC)
duration = (completed_at - started_at).total_seconds()
logger.info(f"Marketplace export complete: {file_path} ({duration:.2f}s)")
return {
"success": True,
"path": str(file_path),
"file_name": file_name,
"language": language,
"duration_seconds": duration,
"triggered_by": triggered_by,
}
except Exception as e:
logger.error(f"Error exporting marketplace products: {e}")
return {
"success": False,
"error": str(e),
"language": language,
}

View File

@@ -0,0 +1,403 @@
# app/modules/marketplace/tasks/import_tasks.py
"""
Celery tasks for marketplace product and order imports.
Includes:
- CSV product import from marketplace feeds
- Historical order imports from Letzshop API
"""
import asyncio
import logging
from datetime import UTC, datetime
from typing import Callable
from app.core.celery_config import celery_app
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.letzshop import LetzshopHistoricalImportJob
from app.services.admin_notification_service import admin_notification_service
from app.services.letzshop import LetzshopClientError
from app.services.letzshop.credentials_service import LetzshopCredentialsService
from app.services.letzshop.order_service import LetzshopOrderService
from app.modules.task_base import ModuleTask
from app.utils.csv_processor import CSVProcessor
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
def _get_credentials_service(db) -> LetzshopCredentialsService:
"""Create a credentials service instance."""
return LetzshopCredentialsService(db)
def _get_order_service(db) -> LetzshopOrderService:
"""Create an order service instance."""
return LetzshopOrderService(db)
@celery_app.task(
bind=True,
base=ModuleTask,
name="app.modules.marketplace.tasks.import_tasks.process_marketplace_import",
max_retries=3,
default_retry_delay=60,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def process_marketplace_import(
self,
job_id: int,
url: str,
marketplace: str,
vendor_id: int,
batch_size: int = 1000,
language: str = "en",
):
"""
Celery task to process marketplace CSV import.
Args:
job_id: ID of the MarketplaceImportJob record
url: URL to the CSV file
marketplace: Name of the marketplace (e.g., 'Letzshop')
vendor_id: ID of the vendor
batch_size: Number of rows to process per batch
language: Language code for translations (default: 'en')
Returns:
dict: Import results with counts
"""
csv_processor = CSVProcessor()
with self.get_db() as db:
# Get the import job
job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
if not job:
logger.error(f"Import job {job_id} not found")
return {"error": f"Import job {job_id} not found"}
# Store Celery task ID on job
job.celery_task_id = self.request.id
# Get vendor information
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
logger.error(f"Vendor {vendor_id} not found for import job {job_id}")
job.status = "failed"
job.error_message = f"Vendor {vendor_id} not found"
job.completed_at = datetime.now(UTC)
return {"error": f"Vendor {vendor_id} not found"}
# Update job status
job.status = "processing"
job.started_at = datetime.now(UTC)
logger.info(
f"Processing import: Job {job_id}, Marketplace: {marketplace}, "
f"Vendor: {vendor.name} ({vendor.vendor_code}), Language: {language}"
)
try:
# Run the async CSV processor in a sync context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
csv_processor.process_marketplace_csv_from_url(
url=url,
marketplace=marketplace,
vendor_name=vendor.name,
batch_size=batch_size,
db=db,
language=language,
import_job_id=job_id,
)
)
finally:
loop.close()
# Update job with results
job.status = "completed"
job.completed_at = datetime.now(UTC)
job.imported_count = result["imported"]
job.updated_count = result["updated"]
job.error_count = result.get("errors", 0)
job.total_processed = result["total_processed"]
if result.get("errors", 0) > 0:
job.status = "completed_with_errors"
job.error_message = f"{result['errors']} rows had errors"
# Notify admin if error count is significant
if result.get("errors", 0) >= 5:
admin_notification_service.notify_import_failure(
db=db,
vendor_name=vendor.name,
job_id=job_id,
error_message=f"Import completed with {result['errors']} errors out of {result['total_processed']} rows",
vendor_id=vendor_id,
)
logger.info(
f"Import job {job_id} completed: "
f"imported={result['imported']}, updated={result['updated']}, "
f"errors={result.get('errors', 0)}"
)
return {
"job_id": job_id,
"imported": result["imported"],
"updated": result["updated"],
"errors": result.get("errors", 0),
"total_processed": result["total_processed"],
}
except Exception as e:
logger.error(f"Import job {job_id} failed: {e}", exc_info=True)
job.status = "failed"
job.error_message = str(e)[:500] # Truncate long errors
job.completed_at = datetime.now(UTC)
# Create admin notification for import failure
admin_notification_service.notify_import_failure(
db=db,
vendor_name=vendor.name,
job_id=job_id,
error_message=str(e)[:200],
vendor_id=vendor_id,
)
raise # Re-raise for Celery retry
@celery_app.task(
bind=True,
base=ModuleTask,
name="app.modules.marketplace.tasks.import_tasks.process_historical_import",
max_retries=2,
default_retry_delay=120,
autoretry_for=(Exception,),
retry_backoff=True,
)
def process_historical_import(self, job_id: int, vendor_id: int):
"""
Celery task for historical order import with progress tracking.
Imports both confirmed and unconfirmed orders from Letzshop API,
updating job progress in the database for frontend polling.
Args:
job_id: ID of the LetzshopHistoricalImportJob record
vendor_id: ID of the vendor to import orders for
Returns:
dict: Import statistics
"""
with self.get_db() as db:
# Get the import job
job = (
db.query(LetzshopHistoricalImportJob)
.filter(LetzshopHistoricalImportJob.id == job_id)
.first()
)
if not job:
logger.error(f"Historical import job {job_id} not found")
return {"error": f"Job {job_id} not found"}
# Store Celery task ID
job.celery_task_id = self.request.id
try:
# Mark as started
job.status = "fetching"
job.started_at = datetime.now(UTC)
creds_service = _get_credentials_service(db)
order_service = _get_order_service(db)
# Create progress callback for fetching
def fetch_progress_callback(page: int, total_fetched: int):
"""Update fetch progress in database."""
job.current_page = page
job.shipments_fetched = total_fetched
# Create progress callback for processing
def create_processing_callback(
phase: str,
) -> Callable[[int, int, int, int], None]:
"""Create a processing progress callback for a phase."""
def callback(processed: int, imported: int, updated: int, skipped: int):
job.orders_processed = processed
job.orders_imported = imported
job.orders_updated = updated
job.orders_skipped = skipped
return callback
with creds_service.create_client(vendor_id) as client:
# ================================================================
# Phase 1: Import confirmed orders
# ================================================================
job.current_phase = "confirmed"
job.current_page = 0
job.shipments_fetched = 0
logger.info(f"Job {job_id}: Fetching confirmed shipments for vendor {vendor_id}")
confirmed_shipments = client.get_all_shipments_paginated(
state="confirmed",
page_size=50,
progress_callback=fetch_progress_callback,
)
logger.info(f"Job {job_id}: Fetched {len(confirmed_shipments)} confirmed shipments")
# Process confirmed shipments
job.status = "processing"
job.orders_processed = 0
job.orders_imported = 0
job.orders_updated = 0
job.orders_skipped = 0
confirmed_stats = order_service.import_historical_shipments(
vendor_id=vendor_id,
shipments=confirmed_shipments,
match_products=True,
progress_callback=create_processing_callback("confirmed"),
)
# Store confirmed stats
job.confirmed_stats = {
"total": confirmed_stats["total"],
"imported": confirmed_stats["imported"],
"updated": confirmed_stats["updated"],
"skipped": confirmed_stats["skipped"],
"products_matched": confirmed_stats["products_matched"],
"products_not_found": confirmed_stats["products_not_found"],
}
job.products_matched = confirmed_stats["products_matched"]
job.products_not_found = confirmed_stats["products_not_found"]
logger.info(
f"Job {job_id}: Confirmed phase complete - "
f"imported={confirmed_stats['imported']}, "
f"updated={confirmed_stats['updated']}, "
f"skipped={confirmed_stats['skipped']}"
)
# ================================================================
# Phase 2: Import unconfirmed (pending) orders
# ================================================================
job.current_phase = "unconfirmed"
job.status = "fetching"
job.current_page = 0
job.shipments_fetched = 0
logger.info(f"Job {job_id}: Fetching unconfirmed shipments for vendor {vendor_id}")
unconfirmed_shipments = client.get_all_shipments_paginated(
state="unconfirmed",
page_size=50,
progress_callback=fetch_progress_callback,
)
logger.info(
f"Job {job_id}: Fetched {len(unconfirmed_shipments)} unconfirmed shipments"
)
# Process unconfirmed shipments
job.status = "processing"
job.orders_processed = 0
unconfirmed_stats = order_service.import_historical_shipments(
vendor_id=vendor_id,
shipments=unconfirmed_shipments,
match_products=True,
progress_callback=create_processing_callback("unconfirmed"),
)
# Store unconfirmed stats
job.declined_stats = {
"total": unconfirmed_stats["total"],
"imported": unconfirmed_stats["imported"],
"updated": unconfirmed_stats["updated"],
"skipped": unconfirmed_stats["skipped"],
"products_matched": unconfirmed_stats["products_matched"],
"products_not_found": unconfirmed_stats["products_not_found"],
}
# Add to cumulative product matching stats
job.products_matched += unconfirmed_stats["products_matched"]
job.products_not_found += unconfirmed_stats["products_not_found"]
logger.info(
f"Job {job_id}: Unconfirmed phase complete - "
f"imported={unconfirmed_stats['imported']}, "
f"updated={unconfirmed_stats['updated']}, "
f"skipped={unconfirmed_stats['skipped']}"
)
# ================================================================
# Complete
# ================================================================
job.status = "completed"
job.completed_at = datetime.now(UTC)
# Update credentials sync status
creds_service.update_sync_status(vendor_id, "success", None)
logger.info(f"Job {job_id}: Historical import completed successfully")
return {
"job_id": job_id,
"confirmed": confirmed_stats,
"unconfirmed": unconfirmed_stats,
}
except LetzshopClientError as e:
logger.error(f"Job {job_id}: Letzshop API error: {e}")
job.status = "failed"
job.error_message = f"Letzshop API error: {e}"
job.completed_at = datetime.now(UTC)
# Get vendor name for notification
order_service = _get_order_service(db)
vendor = order_service.get_vendor(vendor_id)
vendor_name = vendor.name if vendor else f"Vendor {vendor_id}"
# Create admin notification
admin_notification_service.notify_order_sync_failure(
db=db,
vendor_name=vendor_name,
error_message=f"Historical import failed: {str(e)[:150]}",
vendor_id=vendor_id,
)
creds_service = _get_credentials_service(db)
creds_service.update_sync_status(vendor_id, "failed", str(e))
raise # Re-raise for Celery retry
except Exception as e:
logger.error(f"Job {job_id}: Unexpected error: {e}", exc_info=True)
job.status = "failed"
job.error_message = str(e)[:500]
job.completed_at = datetime.now(UTC)
# Get vendor name for notification
order_service = _get_order_service(db)
vendor = order_service.get_vendor(vendor_id)
vendor_name = vendor.name if vendor else f"Vendor {vendor_id}"
# Create admin notification
admin_notification_service.notify_critical_error(
db=db,
error_type="Historical Import",
error_message=f"Import job {job_id} failed for {vendor_name}: {str(e)[:150]}",
details={"job_id": job_id, "vendor_id": vendor_id, "vendor_name": vendor_name},
)
raise # Re-raise for Celery retry

View File

@@ -0,0 +1,84 @@
# app/modules/marketplace/tasks/sync_tasks.py
"""
Celery tasks for Letzshop vendor directory synchronization.
Periodically syncs vendor information from Letzshop's public GraphQL API.
"""
import logging
from typing import Any
from app.core.celery_config import celery_app
from app.modules.task_base import ModuleTask
from app.services.admin_notification_service import admin_notification_service
from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True,
base=ModuleTask,
name="app.modules.marketplace.tasks.sync_tasks.sync_vendor_directory",
max_retries=2,
default_retry_delay=300,
autoretry_for=(Exception,),
retry_backoff=True,
)
def sync_vendor_directory(self) -> dict[str, Any]:
"""
Celery task to sync Letzshop vendor directory.
Fetches all vendors from Letzshop's public GraphQL API and updates
the local letzshop_vendor_cache table.
This task is scheduled to run daily via the module's scheduled_tasks
definition.
Returns:
dict: Sync statistics including created, updated, and error counts.
"""
with self.get_db() as db:
try:
logger.info("Starting Letzshop vendor directory sync...")
sync_service = LetzshopVendorSyncService(db)
def progress_callback(page: int, fetched: int, total: int):
"""Log progress during sync."""
logger.info(f"Vendor sync progress: page {page}, {fetched}/{total} vendors")
stats = sync_service.sync_all_vendors(progress_callback=progress_callback)
logger.info(
f"Vendor directory sync completed: "
f"{stats.get('created', 0)} created, "
f"{stats.get('updated', 0)} updated, "
f"{stats.get('errors', 0)} errors"
)
# Send admin notification if there were errors
if stats.get("errors", 0) > 0:
admin_notification_service.notify_system_info(
db=db,
title="Letzshop Vendor Sync Completed with Errors",
message=(
f"Synced {stats.get('total_fetched', 0)} vendors. "
f"Errors: {stats.get('errors', 0)}"
),
details=stats,
)
return stats
except Exception as e:
logger.error(f"Vendor directory sync failed: {e}", exc_info=True)
# Notify admins of failure
admin_notification_service.notify_critical_error(
db=db,
error_type="Vendor Directory Sync",
error_message=f"Failed to sync Letzshop vendor directory: {str(e)[:200]}",
details={"error": str(e)},
)
raise # Re-raise for Celery retry

View File

@@ -1,199 +1,24 @@
# app/tasks/celery_tasks/export.py # app/tasks/celery_tasks/export.py
""" """
Celery tasks for product export operations. Legacy export tasks.
Handles exporting vendor products to various formats (e.g., Letzshop CSV). MIGRATED: All tasks have been migrated to app.modules.marketplace.tasks.
New locations:
- export_vendor_products_to_folder -> app.modules.marketplace.tasks.export_tasks
- export_marketplace_products -> app.modules.marketplace.tasks.export_tasks
Import from the new location:
from app.modules.marketplace.tasks import (
export_vendor_products_to_folder,
export_marketplace_products,
)
""" """
import logging # Re-export from new location for backward compatibility
import os from app.modules.marketplace.tasks.export_tasks import (
from datetime import UTC, datetime export_vendor_products_to_folder,
from pathlib import Path export_marketplace_products,
from app.core.celery_config import celery_app
from app.tasks.celery_tasks.base import DatabaseTask
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True,
base=DatabaseTask,
name="app.tasks.celery_tasks.export.export_vendor_products_to_folder",
max_retries=3,
default_retry_delay=60,
) )
def export_vendor_products_to_folder(
self,
vendor_id: int,
triggered_by: str,
include_inactive: bool = False,
):
"""
Export all 3 languages (en, fr, de) to disk asynchronously.
Args: __all__ = ["export_vendor_products_to_folder", "export_marketplace_products"]
vendor_id: ID of the vendor to export
triggered_by: User identifier who triggered the export
include_inactive: Whether to include inactive products
Returns:
dict: Export results per language with file paths
"""
from app.services.letzshop_export_service import letzshop_export_service
languages = ["en", "fr", "de"]
results = {}
export_dir = None
with self.get_db() as db:
# Get vendor info
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
logger.error(f"Vendor {vendor_id} not found for export")
return {"error": f"Vendor {vendor_id} not found"}
vendor_code = vendor.vendor_code
# Create export directory
export_dir = Path(f"exports/letzshop/{vendor_code}")
export_dir.mkdir(parents=True, exist_ok=True)
started_at = datetime.now(UTC)
logger.info(f"Starting product export for vendor {vendor_code} (ID: {vendor_id})")
for lang in languages:
try:
# Generate CSV content
csv_content = letzshop_export_service.export_vendor_products(
db=db,
vendor_id=vendor_id,
language=lang,
include_inactive=include_inactive,
)
# Write to file
file_name = f"{vendor_code}_products_{lang}.csv"
file_path = export_dir / file_name
with open(file_path, "w", encoding="utf-8") as f:
f.write(csv_content)
results[lang] = {
"success": True,
"path": str(file_path),
"file_name": file_name,
}
logger.info(f"Exported {lang} products to {file_path}")
except Exception as e:
logger.error(f"Error exporting {lang} products for vendor {vendor_id}: {e}")
results[lang] = {
"success": False,
"error": str(e),
}
# Log the export
completed_at = datetime.now(UTC)
duration = (completed_at - started_at).total_seconds()
success_count = sum(1 for r in results.values() if r.get("success"))
try:
letzshop_export_service.log_export(
db=db,
vendor_id=vendor_id,
triggered_by=triggered_by,
records_processed=len(languages),
records_succeeded=success_count,
records_failed=len(languages) - success_count,
duration_seconds=duration,
)
db.commit()
except Exception as e:
logger.error(f"Failed to log export: {e}")
logger.info(
f"Product export complete for vendor {vendor_code}: "
f"{success_count}/{len(languages)} languages exported in {duration:.2f}s"
)
return {
"vendor_id": vendor_id,
"vendor_code": vendor_code,
"export_dir": str(export_dir),
"results": results,
"duration_seconds": duration,
"triggered_by": triggered_by,
}
@celery_app.task(
bind=True,
base=DatabaseTask,
name="app.tasks.celery_tasks.export.export_marketplace_products",
max_retries=3,
default_retry_delay=60,
)
def export_marketplace_products(
self,
language: str = "en",
triggered_by: str = "system",
):
"""
Export all marketplace products (admin use).
Args:
language: Language code for export (en, fr, de)
triggered_by: User identifier who triggered the export
Returns:
dict: Export result with file path
"""
from app.services.letzshop_export_service import letzshop_export_service
with self.get_db() as db:
started_at = datetime.now(UTC)
logger.info(f"Starting marketplace product export ({language})")
try:
# Create export directory
export_dir = Path("exports/marketplace")
export_dir.mkdir(parents=True, exist_ok=True)
# Generate CSV content
csv_content = letzshop_export_service.export_marketplace_products(
db=db,
language=language,
)
# Write to file
timestamp = started_at.strftime("%Y%m%d_%H%M%S")
file_name = f"marketplace_products_{language}_{timestamp}.csv"
file_path = export_dir / file_name
with open(file_path, "w", encoding="utf-8") as f:
f.write(csv_content)
completed_at = datetime.now(UTC)
duration = (completed_at - started_at).total_seconds()
logger.info(f"Marketplace export complete: {file_path} ({duration:.2f}s)")
return {
"success": True,
"path": str(file_path),
"file_name": file_name,
"language": language,
"duration_seconds": duration,
"triggered_by": triggered_by,
}
except Exception as e:
logger.error(f"Error exporting marketplace products: {e}")
return {
"success": False,
"error": str(e),
"language": language,
}

View File

@@ -1,350 +1,22 @@
# app/tasks/celery_tasks/letzshop.py # app/tasks/celery_tasks/letzshop.py
""" """
Celery tasks for Letzshop integration. Legacy Letzshop tasks.
Includes: MIGRATED: All tasks have been migrated to app.modules.marketplace.tasks.
- Historical order imports
- Vendor directory sync New locations:
- process_historical_import -> app.modules.marketplace.tasks.import_tasks
- sync_vendor_directory -> app.modules.marketplace.tasks.sync_tasks
Import from the new location:
from app.modules.marketplace.tasks import (
process_historical_import,
sync_vendor_directory,
)
""" """
import logging # Re-export from new location for backward compatibility
from datetime import UTC, datetime from app.modules.marketplace.tasks.import_tasks import process_historical_import
from typing import Any, Callable from app.modules.marketplace.tasks.sync_tasks import sync_vendor_directory
from app.core.celery_config import celery_app __all__ = ["process_historical_import", "sync_vendor_directory"]
from app.services.admin_notification_service import admin_notification_service
from app.services.letzshop import LetzshopClientError
from app.services.letzshop.credentials_service import LetzshopCredentialsService
from app.services.letzshop.order_service import LetzshopOrderService
from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService
from app.tasks.celery_tasks.base import DatabaseTask
from models.database.letzshop import LetzshopHistoricalImportJob
logger = logging.getLogger(__name__)
def _get_credentials_service(db) -> LetzshopCredentialsService:
"""Create a credentials service instance."""
return LetzshopCredentialsService(db)
def _get_order_service(db) -> LetzshopOrderService:
"""Create an order service instance."""
return LetzshopOrderService(db)
@celery_app.task(
bind=True,
base=DatabaseTask,
name="app.tasks.celery_tasks.letzshop.process_historical_import",
max_retries=2,
default_retry_delay=120,
autoretry_for=(Exception,),
retry_backoff=True,
)
def process_historical_import(self, job_id: int, vendor_id: int):
"""
Celery task for historical order import with progress tracking.
Imports both confirmed and unconfirmed orders from Letzshop API,
updating job progress in the database for frontend polling.
Args:
job_id: ID of the LetzshopHistoricalImportJob record
vendor_id: ID of the vendor to import orders for
Returns:
dict: Import statistics
"""
with self.get_db() as db:
# Get the import job
job = (
db.query(LetzshopHistoricalImportJob)
.filter(LetzshopHistoricalImportJob.id == job_id)
.first()
)
if not job:
logger.error(f"Historical import job {job_id} not found")
return {"error": f"Job {job_id} not found"}
# Store Celery task ID
job.celery_task_id = self.request.id
try:
# Mark as started
job.status = "fetching"
job.started_at = datetime.now(UTC)
db.commit()
creds_service = _get_credentials_service(db)
order_service = _get_order_service(db)
# Create progress callback for fetching
def fetch_progress_callback(page: int, total_fetched: int):
"""Update fetch progress in database."""
job.current_page = page
job.shipments_fetched = total_fetched
db.commit()
# Create progress callback for processing
def create_processing_callback(
phase: str,
) -> Callable[[int, int, int, int], None]:
"""Create a processing progress callback for a phase."""
def callback(processed: int, imported: int, updated: int, skipped: int):
job.orders_processed = processed
job.orders_imported = imported
job.orders_updated = updated
job.orders_skipped = skipped
db.commit()
return callback
with creds_service.create_client(vendor_id) as client:
# ================================================================
# Phase 1: Import confirmed orders
# ================================================================
job.current_phase = "confirmed"
job.current_page = 0
job.shipments_fetched = 0
db.commit()
logger.info(f"Job {job_id}: Fetching confirmed shipments for vendor {vendor_id}")
confirmed_shipments = client.get_all_shipments_paginated(
state="confirmed",
page_size=50,
progress_callback=fetch_progress_callback,
)
logger.info(f"Job {job_id}: Fetched {len(confirmed_shipments)} confirmed shipments")
# Process confirmed shipments
job.status = "processing"
job.orders_processed = 0
job.orders_imported = 0
job.orders_updated = 0
job.orders_skipped = 0
db.commit()
confirmed_stats = order_service.import_historical_shipments(
vendor_id=vendor_id,
shipments=confirmed_shipments,
match_products=True,
progress_callback=create_processing_callback("confirmed"),
)
# Store confirmed stats
job.confirmed_stats = {
"total": confirmed_stats["total"],
"imported": confirmed_stats["imported"],
"updated": confirmed_stats["updated"],
"skipped": confirmed_stats["skipped"],
"products_matched": confirmed_stats["products_matched"],
"products_not_found": confirmed_stats["products_not_found"],
}
job.products_matched = confirmed_stats["products_matched"]
job.products_not_found = confirmed_stats["products_not_found"]
db.commit()
logger.info(
f"Job {job_id}: Confirmed phase complete - "
f"imported={confirmed_stats['imported']}, "
f"updated={confirmed_stats['updated']}, "
f"skipped={confirmed_stats['skipped']}"
)
# ================================================================
# Phase 2: Import unconfirmed (pending) orders
# ================================================================
job.current_phase = "unconfirmed"
job.status = "fetching"
job.current_page = 0
job.shipments_fetched = 0
db.commit()
logger.info(f"Job {job_id}: Fetching unconfirmed shipments for vendor {vendor_id}")
unconfirmed_shipments = client.get_all_shipments_paginated(
state="unconfirmed",
page_size=50,
progress_callback=fetch_progress_callback,
)
logger.info(
f"Job {job_id}: Fetched {len(unconfirmed_shipments)} unconfirmed shipments"
)
# Process unconfirmed shipments
job.status = "processing"
job.orders_processed = 0
db.commit()
unconfirmed_stats = order_service.import_historical_shipments(
vendor_id=vendor_id,
shipments=unconfirmed_shipments,
match_products=True,
progress_callback=create_processing_callback("unconfirmed"),
)
# Store unconfirmed stats
job.declined_stats = {
"total": unconfirmed_stats["total"],
"imported": unconfirmed_stats["imported"],
"updated": unconfirmed_stats["updated"],
"skipped": unconfirmed_stats["skipped"],
"products_matched": unconfirmed_stats["products_matched"],
"products_not_found": unconfirmed_stats["products_not_found"],
}
# Add to cumulative product matching stats
job.products_matched += unconfirmed_stats["products_matched"]
job.products_not_found += unconfirmed_stats["products_not_found"]
logger.info(
f"Job {job_id}: Unconfirmed phase complete - "
f"imported={unconfirmed_stats['imported']}, "
f"updated={unconfirmed_stats['updated']}, "
f"skipped={unconfirmed_stats['skipped']}"
)
# ================================================================
# Complete
# ================================================================
job.status = "completed"
job.completed_at = datetime.now(UTC)
db.commit()
# Update credentials sync status
creds_service.update_sync_status(vendor_id, "success", None)
logger.info(f"Job {job_id}: Historical import completed successfully")
return {
"job_id": job_id,
"confirmed": confirmed_stats,
"unconfirmed": unconfirmed_stats,
}
except LetzshopClientError as e:
logger.error(f"Job {job_id}: Letzshop API error: {e}")
job.status = "failed"
job.error_message = f"Letzshop API error: {e}"
job.completed_at = datetime.now(UTC)
# Get vendor name for notification
order_service = _get_order_service(db)
vendor = order_service.get_vendor(vendor_id)
vendor_name = vendor.name if vendor else f"Vendor {vendor_id}"
# Create admin notification
admin_notification_service.notify_order_sync_failure(
db=db,
vendor_name=vendor_name,
error_message=f"Historical import failed: {str(e)[:150]}",
vendor_id=vendor_id,
)
db.commit()
creds_service = _get_credentials_service(db)
creds_service.update_sync_status(vendor_id, "failed", str(e))
raise # Re-raise for Celery retry
except Exception as e:
logger.error(f"Job {job_id}: Unexpected error: {e}", exc_info=True)
job.status = "failed"
job.error_message = str(e)[:500]
job.completed_at = datetime.now(UTC)
# Get vendor name for notification
order_service = _get_order_service(db)
vendor = order_service.get_vendor(vendor_id)
vendor_name = vendor.name if vendor else f"Vendor {vendor_id}"
# Create admin notification
admin_notification_service.notify_critical_error(
db=db,
error_type="Historical Import",
error_message=f"Import job {job_id} failed for {vendor_name}: {str(e)[:150]}",
details={"job_id": job_id, "vendor_id": vendor_id, "vendor_name": vendor_name},
)
db.commit()
raise # Re-raise for Celery retry
# =============================================================================
# Vendor Directory Sync
# =============================================================================
@celery_app.task(
bind=True,
base=DatabaseTask,
name="app.tasks.celery_tasks.letzshop.sync_vendor_directory",
max_retries=2,
default_retry_delay=300,
autoretry_for=(Exception,),
retry_backoff=True,
)
def sync_vendor_directory(self) -> dict[str, Any]:
"""
Celery task to sync Letzshop vendor directory.
Fetches all vendors from Letzshop's public GraphQL API and updates
the local letzshop_vendor_cache table.
This task should be scheduled to run periodically (e.g., daily)
via Celery beat.
Returns:
dict: Sync statistics including created, updated, and error counts.
"""
with self.get_db() as db:
try:
logger.info("Starting Letzshop vendor directory sync...")
sync_service = LetzshopVendorSyncService(db)
def progress_callback(page: int, fetched: int, total: int):
"""Log progress during sync."""
logger.info(f"Vendor sync progress: page {page}, {fetched}/{total} vendors")
stats = sync_service.sync_all_vendors(progress_callback=progress_callback)
logger.info(
f"Vendor directory sync completed: "
f"{stats.get('created', 0)} created, "
f"{stats.get('updated', 0)} updated, "
f"{stats.get('errors', 0)} errors"
)
# Send admin notification if there were errors
if stats.get("errors", 0) > 0:
admin_notification_service.notify_system_info(
db=db,
title="Letzshop Vendor Sync Completed with Errors",
message=(
f"Synced {stats.get('total_fetched', 0)} vendors. "
f"Errors: {stats.get('errors', 0)}"
),
details=stats,
)
db.commit()
return stats
except Exception as e:
logger.error(f"Vendor directory sync failed: {e}", exc_info=True)
# Notify admins of failure
admin_notification_service.notify_critical_error(
db=db,
error_type="Vendor Directory Sync",
error_message=f"Failed to sync Letzshop vendor directory: {str(e)[:200]}",
details={"error": str(e)},
)
db.commit()
raise # Re-raise for Celery retry

View File

@@ -1,160 +1,17 @@
# app/tasks/celery_tasks/marketplace.py # app/tasks/celery_tasks/marketplace.py
""" """
Celery tasks for marketplace product imports. Legacy marketplace tasks.
Wraps the existing process_marketplace_import function for Celery execution. MIGRATED: All tasks have been migrated to app.modules.marketplace.tasks.
New locations:
- process_marketplace_import -> app.modules.marketplace.tasks.import_tasks
Import from the new location:
from app.modules.marketplace.tasks import process_marketplace_import
""" """
import asyncio # Re-export from new location for backward compatibility
import logging from app.modules.marketplace.tasks.import_tasks import process_marketplace_import
from datetime import UTC, datetime
from app.core.celery_config import celery_app __all__ = ["process_marketplace_import"]
from app.services.admin_notification_service import admin_notification_service
from app.tasks.celery_tasks.base import DatabaseTask
from app.utils.csv_processor import CSVProcessor
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True,
base=DatabaseTask,
name="app.tasks.celery_tasks.marketplace.process_marketplace_import",
max_retries=3,
default_retry_delay=60,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def process_marketplace_import(
self,
job_id: int,
url: str,
marketplace: str,
vendor_id: int,
batch_size: int = 1000,
language: str = "en",
):
"""
Celery task to process marketplace CSV import.
Args:
job_id: ID of the MarketplaceImportJob record
url: URL to the CSV file
marketplace: Name of the marketplace (e.g., 'Letzshop')
vendor_id: ID of the vendor
batch_size: Number of rows to process per batch
language: Language code for translations (default: 'en')
Returns:
dict: Import results with counts
"""
csv_processor = CSVProcessor()
with self.get_db() as db:
# Get the import job
job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
if not job:
logger.error(f"Import job {job_id} not found")
return {"error": f"Import job {job_id} not found"}
# Store Celery task ID on job
job.celery_task_id = self.request.id
# Get vendor information
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
logger.error(f"Vendor {vendor_id} not found for import job {job_id}")
job.status = "failed"
job.error_message = f"Vendor {vendor_id} not found"
job.completed_at = datetime.now(UTC)
db.commit()
return {"error": f"Vendor {vendor_id} not found"}
# Update job status
job.status = "processing"
job.started_at = datetime.now(UTC)
db.commit()
logger.info(
f"Processing import: Job {job_id}, Marketplace: {marketplace}, "
f"Vendor: {vendor.name} ({vendor.vendor_code}), Language: {language}"
)
try:
# Run the async CSV processor in a sync context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
csv_processor.process_marketplace_csv_from_url(
url=url,
marketplace=marketplace,
vendor_name=vendor.name,
batch_size=batch_size,
db=db,
language=language,
import_job_id=job_id,
)
)
finally:
loop.close()
# Update job with results
job.status = "completed"
job.completed_at = datetime.now(UTC)
job.imported_count = result["imported"]
job.updated_count = result["updated"]
job.error_count = result.get("errors", 0)
job.total_processed = result["total_processed"]
if result.get("errors", 0) > 0:
job.status = "completed_with_errors"
job.error_message = f"{result['errors']} rows had errors"
# Notify admin if error count is significant
if result.get("errors", 0) >= 5:
admin_notification_service.notify_import_failure(
db=db,
vendor_name=vendor.name,
job_id=job_id,
error_message=f"Import completed with {result['errors']} errors out of {result['total_processed']} rows",
vendor_id=vendor_id,
)
db.commit()
logger.info(
f"Import job {job_id} completed: "
f"imported={result['imported']}, updated={result['updated']}, "
f"errors={result.get('errors', 0)}"
)
return {
"job_id": job_id,
"imported": result["imported"],
"updated": result["updated"],
"errors": result.get("errors", 0),
"total_processed": result["total_processed"],
}
except Exception as e:
logger.error(f"Import job {job_id} failed: {e}", exc_info=True)
job.status = "failed"
job.error_message = str(e)[:500] # Truncate long errors
job.completed_at = datetime.now(UTC)
# Create admin notification for import failure
admin_notification_service.notify_import_failure(
db=db,
vendor_name=vendor.name,
job_id=job_id,
error_message=str(e)[:200],
vendor_id=vendor_id,
)
db.commit()
raise # Re-raise for Celery retry

View File

@@ -30,7 +30,7 @@ Transform the platform from a monolithic structure to self-contained modules whe
| `cms` | Core | ✅ **Complete** | ✅ | ✅ | - | Done | | `cms` | Core | ✅ **Complete** | ✅ | ✅ | - | Done |
| `payments` | Optional | 🟡 Partial | ✅ | ✅ | - | Done | | `payments` | Optional | 🟡 Partial | ✅ | ✅ | - | Done |
| `billing` | Optional | ✅ **Complete** | ✅ | ✅ | ✅ | Done | | `billing` | Optional | ✅ **Complete** | ✅ | ✅ | ✅ | Done |
| `marketplace` | Optional | 🔴 Shell | | | | Full | | `marketplace` | Optional | **Complete** | | | | Done |
| `orders` | Optional | 🔴 Shell | ❌ | ❌ | - | Full | | `orders` | Optional | 🔴 Shell | ❌ | ❌ | - | Full |
| `inventory` | Optional | 🔴 Shell | ❌ | ❌ | - | Full | | `inventory` | Optional | 🔴 Shell | ❌ | ❌ | - | Full |
| `customers` | Core | 🔴 Shell | ❌ | ❌ | - | Full | | `customers` | Core | 🔴 Shell | ❌ | ❌ | - | Full |
@@ -127,6 +127,19 @@ app/tasks/celery_tasks/ # → Move to respective modules
- Created backward-compatible re-exports in `app/services/` - Created backward-compatible re-exports in `app/services/`
- Updated legacy celery_config.py to not duplicate scheduled tasks - Updated legacy celery_config.py to not duplicate scheduled tasks
### ✅ Phase 6: Marketplace Module Migration
- Created `app/modules/marketplace/services/` re-exporting from existing locations
- Created `app/modules/marketplace/models/` re-exporting marketplace & letzshop models
- Created `app/modules/marketplace/schemas/` re-exporting marketplace schemas
- Created `app/modules/marketplace/tasks/` with:
- `import_tasks.py` - process_marketplace_import, process_historical_import
- `sync_tasks.py` - sync_vendor_directory (scheduled daily)
- `export_tasks.py` - export_vendor_products_to_folder, export_marketplace_products
- Created `app/modules/marketplace/exceptions.py`
- Updated `definition.py` with self-contained configuration
- Updated legacy task files to re-export from new location
- Removed marketplace/letzshop/export from LEGACY_TASK_MODULES
--- ---
## Module Migration Phases ## Module Migration Phases