diff --git a/app/core/celery_config.py b/app/core/celery_config.py index 178fb911..601d363a 100644 --- a/app/core/celery_config.py +++ b/app/core/celery_config.py @@ -49,13 +49,15 @@ if SENTRY_DSN: # TASK DISCOVERY # ============================================================================= # Legacy tasks (will be migrated to modules over time) -# NOTE: Most subscription tasks have been migrated to app.modules.billing.tasks -# The subscription module is kept for capture_capacity_snapshot (will move to monitoring) +# MIGRATION STATUS: +# - subscription: MIGRATED to billing module (kept for capture_capacity_snapshot -> monitoring) +# - marketplace, letzshop, export: MIGRATED to marketplace module +# - code_quality, test_runner: Will migrate to dev-tools module LEGACY_TASK_MODULES = [ - "app.tasks.celery_tasks.marketplace", - "app.tasks.celery_tasks.letzshop", + # "app.tasks.celery_tasks.marketplace", # MIGRATED to marketplace module + # "app.tasks.celery_tasks.letzshop", # MIGRATED to marketplace module "app.tasks.celery_tasks.subscription", # Kept for capture_capacity_snapshot only - "app.tasks.celery_tasks.export", + # "app.tasks.celery_tasks.export", # MIGRATED to marketplace module "app.tasks.celery_tasks.code_quality", "app.tasks.celery_tasks.test_runner", ] diff --git a/app/modules/marketplace/__init__.py b/app/modules/marketplace/__init__.py index e1c1bec7..8d1ec3fe 100644 --- a/app/modules/marketplace/__init__.py +++ b/app/modules/marketplace/__init__.py @@ -3,10 +3,11 @@ Marketplace Module - Letzshop integration. This module provides: -- Letzshop product sync -- Marketplace import operations -- Product catalog synchronization -- Order import from marketplace +- Product import from marketplace CSV feeds +- Order import from Letzshop API +- Vendor directory synchronization +- Product export to Letzshop CSV format +- Scheduled sync tasks Dependencies: - Requires: inventory module (for product management) @@ -18,8 +19,20 @@ Routes: Menu Items: - Admin: marketplace-letzshop - Vendor: marketplace, letzshop + +Usage: + from app.modules.marketplace import marketplace_module + from app.modules.marketplace.services import letzshop_export_service + from app.modules.marketplace.models import MarketplaceProduct + from app.modules.marketplace.exceptions import LetzshopClientError """ -from app.modules.marketplace.definition import marketplace_module +from app.modules.marketplace.definition import ( + marketplace_module, + get_marketplace_module_with_routers, +) -__all__ = ["marketplace_module"] +__all__ = [ + "marketplace_module", + "get_marketplace_module_with_routers", +] diff --git a/app/modules/marketplace/definition.py b/app/modules/marketplace/definition.py index 7bc1cd79..31ce8022 100644 --- a/app/modules/marketplace/definition.py +++ b/app/modules/marketplace/definition.py @@ -3,12 +3,12 @@ Marketplace module definition. Defines the marketplace module including its features, menu items, -dependencies, and route configurations. +dependencies, route configurations, and scheduled tasks. Note: This module requires the inventory module to be enabled. """ -from app.modules.base import ModuleDefinition +from app.modules.base import ModuleDefinition, ScheduledTask from models.database.admin_menu_config import FrontendType @@ -34,6 +34,7 @@ marketplace_module = ModuleDefinition( "Letzshop marketplace integration for product sync, order import, " "and catalog synchronization." ), + version="1.0.0", requires=["inventory"], # Depends on inventory module features=[ "letzshop_sync", # Sync products with Letzshop @@ -52,6 +53,26 @@ marketplace_module = ModuleDefinition( ], }, is_core=False, + # ========================================================================= + # Self-Contained Module Configuration + # ========================================================================= + is_self_contained=True, + services_path="app.modules.marketplace.services", + models_path="app.modules.marketplace.models", + schemas_path="app.modules.marketplace.schemas", + exceptions_path="app.modules.marketplace.exceptions", + tasks_path="app.modules.marketplace.tasks", + # ========================================================================= + # Scheduled Tasks + # ========================================================================= + scheduled_tasks=[ + ScheduledTask( + name="marketplace.sync_vendor_directory", + task="app.modules.marketplace.tasks.sync_tasks.sync_vendor_directory", + schedule="0 2 * * *", # Daily at 02:00 + options={"queue": "scheduled"}, + ), + ], ) diff --git a/app/modules/marketplace/exceptions.py b/app/modules/marketplace/exceptions.py new file mode 100644 index 00000000..bebdba18 --- /dev/null +++ b/app/modules/marketplace/exceptions.py @@ -0,0 +1,105 @@ +# app/modules/marketplace/exceptions.py +""" +Marketplace module exceptions. + +Custom exceptions for Letzshop integration, product import/export, and sync operations. +""" + +from app.exceptions import BusinessLogicException, ResourceNotFoundException + + +class MarketplaceException(BusinessLogicException): + """Base exception for marketplace module errors.""" + + pass + + +class LetzshopClientError(MarketplaceException): + """Raised when Letzshop API call fails.""" + + def __init__(self, message: str, status_code: int | None = None, response: str | None = None): + super().__init__(message) + self.status_code = status_code + self.response = response + + +class LetzshopAuthenticationError(LetzshopClientError): + """Raised when Letzshop authentication fails.""" + + def __init__(self, message: str = "Letzshop authentication failed"): + super().__init__(message, status_code=401) + + +class LetzshopCredentialsNotFoundException(ResourceNotFoundException): + """Raised when Letzshop credentials not found for vendor.""" + + def __init__(self, vendor_id: int): + super().__init__("LetzshopCredentials", str(vendor_id)) + self.vendor_id = vendor_id + + +class ImportJobNotFoundException(ResourceNotFoundException): + """Raised when a marketplace import job is not found.""" + + def __init__(self, job_id: int): + super().__init__("MarketplaceImportJob", str(job_id)) + + +class HistoricalImportJobNotFoundException(ResourceNotFoundException): + """Raised when a historical import job is not found.""" + + def __init__(self, job_id: int): + super().__init__("LetzshopHistoricalImportJob", str(job_id)) + + +class VendorNotFoundException(ResourceNotFoundException): + """Raised when a vendor is not found.""" + + def __init__(self, vendor_id: int): + super().__init__("Vendor", str(vendor_id)) + + +class ProductNotFoundException(ResourceNotFoundException): + """Raised when a marketplace product is not found.""" + + def __init__(self, product_id: str | int): + super().__init__("MarketplaceProduct", str(product_id)) + + +class ImportValidationError(MarketplaceException): + """Raised when import data validation fails.""" + + def __init__(self, message: str, errors: list[dict] | None = None): + super().__init__(message) + self.errors = errors or [] + + +class ExportError(MarketplaceException): + """Raised when product export fails.""" + + def __init__(self, message: str, language: str | None = None): + super().__init__(message) + self.language = language + + +class SyncError(MarketplaceException): + """Raised when vendor directory sync fails.""" + + def __init__(self, message: str, vendor_code: str | None = None): + super().__init__(message) + self.vendor_code = vendor_code + + +__all__ = [ + "MarketplaceException", + "LetzshopClientError", + "LetzshopAuthenticationError", + "LetzshopCredentialsNotFoundException", + "ImportJobNotFoundException", + "HistoricalImportJobNotFoundException", + "VendorNotFoundException", + "ProductNotFoundException", + "ImportValidationError", + "ExportError", + "SyncError", +] diff --git a/app/modules/marketplace/models/__init__.py b/app/modules/marketplace/models/__init__.py new file mode 100644 index 00000000..66c49a38 --- /dev/null +++ b/app/modules/marketplace/models/__init__.py @@ -0,0 +1,42 @@ +# app/modules/marketplace/models/__init__.py +""" +Marketplace module models. + +Re-exports marketplace and Letzshop models from the central models location. +Models remain in models/database/ for now to avoid breaking existing imports. + +Usage: + from app.modules.marketplace.models import ( + MarketplaceProduct, + MarketplaceImportJob, + VendorLetzshopCredentials, + LetzshopHistoricalImportJob, + ) +""" + +from models.database.marketplace_product import MarketplaceProduct +from models.database.marketplace_product_translation import MarketplaceProductTranslation +from models.database.marketplace_import_job import MarketplaceImportJob +from models.database.letzshop import ( + # Letzshop credentials and sync + VendorLetzshopCredentials, + LetzshopFulfillmentQueue, + LetzshopVendorCache, + LetzshopSyncLog, + # Import jobs + LetzshopHistoricalImportJob, +) + +__all__ = [ + # Marketplace products + "MarketplaceProduct", + "MarketplaceProductTranslation", + # Import jobs + "MarketplaceImportJob", + "LetzshopHistoricalImportJob", + # Letzshop models + "VendorLetzshopCredentials", + "LetzshopFulfillmentQueue", + "LetzshopVendorCache", + "LetzshopSyncLog", +] diff --git a/app/modules/marketplace/schemas/__init__.py b/app/modules/marketplace/schemas/__init__.py new file mode 100644 index 00000000..3c846aa7 --- /dev/null +++ b/app/modules/marketplace/schemas/__init__.py @@ -0,0 +1,56 @@ +# app/modules/marketplace/schemas/__init__.py +""" +Marketplace module Pydantic schemas. + +Re-exports marketplace schemas from the central schemas location. +Provides a module-local import path while maintaining backwards compatibility. + +Usage: + from app.modules.marketplace.schemas import ( + MarketplaceImportJobRequest, + MarketplaceProductResponse, + MarketplaceProductCreate, + ) +""" + +from models.schema.marketplace_import_job import ( + MarketplaceImportJobRequest, + AdminMarketplaceImportJobRequest, + MarketplaceImportJobResponse, + MarketplaceImportJobListResponse, +) +from models.schema.marketplace_product import ( + # Translation schemas + MarketplaceProductTranslationSchema, + # Base schemas + MarketplaceProductBase, + # CRUD schemas + MarketplaceProductCreate, + MarketplaceProductUpdate, + # Response schemas + MarketplaceProductResponse, + MarketplaceProductListResponse, + MarketplaceProductDetailResponse, + # Import schemas + MarketplaceImportRequest, + MarketplaceImportResponse, +) + +__all__ = [ + # Import job schemas + "MarketplaceImportJobRequest", + "AdminMarketplaceImportJobRequest", + "MarketplaceImportJobResponse", + "MarketplaceImportJobListResponse", + # Product schemas + "MarketplaceProductTranslationSchema", + "MarketplaceProductBase", + "MarketplaceProductCreate", + "MarketplaceProductUpdate", + "MarketplaceProductResponse", + "MarketplaceProductListResponse", + "MarketplaceProductDetailResponse", + # Import schemas + "MarketplaceImportRequest", + "MarketplaceImportResponse", +] diff --git a/app/modules/marketplace/services/__init__.py b/app/modules/marketplace/services/__init__.py new file mode 100644 index 00000000..6eac1b3e --- /dev/null +++ b/app/modules/marketplace/services/__init__.py @@ -0,0 +1,61 @@ +# app/modules/marketplace/services/__init__.py +""" +Marketplace module services. + +Re-exports Letzshop and marketplace services from their current locations. +Services remain in app/services/ for now to avoid breaking existing imports. + +Usage: + from app.modules.marketplace.services import ( + letzshop_export_service, + marketplace_import_job_service, + marketplace_product_service, + ) + from app.modules.marketplace.services.letzshop import ( + LetzshopClient, + LetzshopCredentialsService, + LetzshopOrderService, + LetzshopVendorSyncService, + ) +""" + +# Re-export from existing locations for convenience +from app.services.letzshop_export_service import ( + LetzshopExportService, + letzshop_export_service, +) +from app.services.marketplace_import_job_service import ( + MarketplaceImportJobService, + marketplace_import_job_service, +) +from app.services.marketplace_product_service import ( + MarketplaceProductService, + marketplace_product_service, +) + +# Letzshop submodule re-exports +from app.services.letzshop import ( + LetzshopClient, + LetzshopClientError, +) +from app.services.letzshop.credentials_service import LetzshopCredentialsService +from app.services.letzshop.order_service import LetzshopOrderService +from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService + +__all__ = [ + # Export service + "LetzshopExportService", + "letzshop_export_service", + # Import job service + "MarketplaceImportJobService", + "marketplace_import_job_service", + # Product service + "MarketplaceProductService", + "marketplace_product_service", + # Letzshop services + "LetzshopClient", + "LetzshopClientError", + "LetzshopCredentialsService", + "LetzshopOrderService", + "LetzshopVendorSyncService", +] diff --git a/app/modules/marketplace/tasks/__init__.py b/app/modules/marketplace/tasks/__init__.py new file mode 100644 index 00000000..677f146c --- /dev/null +++ b/app/modules/marketplace/tasks/__init__.py @@ -0,0 +1,33 @@ +# app/modules/marketplace/tasks/__init__.py +""" +Marketplace module Celery tasks. + +Tasks for: +- CSV product import from marketplace feeds +- Historical order imports from Letzshop API +- Vendor directory synchronization +- Product export to Letzshop CSV format +""" + +from app.modules.marketplace.tasks.import_tasks import ( + process_marketplace_import, + process_historical_import, +) +from app.modules.marketplace.tasks.sync_tasks import ( + sync_vendor_directory, +) +from app.modules.marketplace.tasks.export_tasks import ( + export_vendor_products_to_folder, + export_marketplace_products, +) + +__all__ = [ + # Import tasks + "process_marketplace_import", + "process_historical_import", + # Sync tasks + "sync_vendor_directory", + # Export tasks + "export_vendor_products_to_folder", + "export_marketplace_products", +] diff --git a/app/modules/marketplace/tasks/export_tasks.py b/app/modules/marketplace/tasks/export_tasks.py new file mode 100644 index 00000000..5fd45a88 --- /dev/null +++ b/app/modules/marketplace/tasks/export_tasks.py @@ -0,0 +1,197 @@ +# app/modules/marketplace/tasks/export_tasks.py +""" +Celery tasks for product export operations. + +Handles exporting vendor products to various formats (e.g., Letzshop CSV). +""" + +import logging +from datetime import UTC, datetime +from pathlib import Path + +from app.core.celery_config import celery_app +from app.modules.task_base import ModuleTask +from models.database.vendor import Vendor + +logger = logging.getLogger(__name__) + + +@celery_app.task( + bind=True, + base=ModuleTask, + name="app.modules.marketplace.tasks.export_tasks.export_vendor_products_to_folder", + max_retries=3, + default_retry_delay=60, +) +def export_vendor_products_to_folder( + self, + vendor_id: int, + triggered_by: str, + include_inactive: bool = False, +): + """ + Export all 3 languages (en, fr, de) to disk asynchronously. + + Args: + vendor_id: ID of the vendor to export + triggered_by: User identifier who triggered the export + include_inactive: Whether to include inactive products + + Returns: + dict: Export results per language with file paths + """ + from app.services.letzshop_export_service import letzshop_export_service + + languages = ["en", "fr", "de"] + results = {} + export_dir = None + + with self.get_db() as db: + # Get vendor info + vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() + if not vendor: + logger.error(f"Vendor {vendor_id} not found for export") + return {"error": f"Vendor {vendor_id} not found"} + + vendor_code = vendor.vendor_code + + # Create export directory + export_dir = Path(f"exports/letzshop/{vendor_code}") + export_dir.mkdir(parents=True, exist_ok=True) + + started_at = datetime.now(UTC) + logger.info(f"Starting product export for vendor {vendor_code} (ID: {vendor_id})") + + for lang in languages: + try: + # Generate CSV content + csv_content = letzshop_export_service.export_vendor_products( + db=db, + vendor_id=vendor_id, + language=lang, + include_inactive=include_inactive, + ) + + # Write to file + file_name = f"{vendor_code}_products_{lang}.csv" + file_path = export_dir / file_name + + with open(file_path, "w", encoding="utf-8") as f: + f.write(csv_content) + + results[lang] = { + "success": True, + "path": str(file_path), + "file_name": file_name, + } + + logger.info(f"Exported {lang} products to {file_path}") + + except Exception as e: + logger.error(f"Error exporting {lang} products for vendor {vendor_id}: {e}") + results[lang] = { + "success": False, + "error": str(e), + } + + # Log the export + completed_at = datetime.now(UTC) + duration = (completed_at - started_at).total_seconds() + success_count = sum(1 for r in results.values() if r.get("success")) + + try: + letzshop_export_service.log_export( + db=db, + vendor_id=vendor_id, + triggered_by=triggered_by, + records_processed=len(languages), + records_succeeded=success_count, + records_failed=len(languages) - success_count, + duration_seconds=duration, + ) + except Exception as e: + logger.error(f"Failed to log export: {e}") + + logger.info( + f"Product export complete for vendor {vendor_code}: " + f"{success_count}/{len(languages)} languages exported in {duration:.2f}s" + ) + + return { + "vendor_id": vendor_id, + "vendor_code": vendor_code, + "export_dir": str(export_dir), + "results": results, + "duration_seconds": duration, + "triggered_by": triggered_by, + } + + +@celery_app.task( + bind=True, + base=ModuleTask, + name="app.modules.marketplace.tasks.export_tasks.export_marketplace_products", + max_retries=3, + default_retry_delay=60, +) +def export_marketplace_products( + self, + language: str = "en", + triggered_by: str = "system", +): + """ + Export all marketplace products (admin use). + + Args: + language: Language code for export (en, fr, de) + triggered_by: User identifier who triggered the export + + Returns: + dict: Export result with file path + """ + from app.services.letzshop_export_service import letzshop_export_service + + with self.get_db() as db: + started_at = datetime.now(UTC) + logger.info(f"Starting marketplace product export ({language})") + + try: + # Create export directory + export_dir = Path("exports/marketplace") + export_dir.mkdir(parents=True, exist_ok=True) + + # Generate CSV content + csv_content = letzshop_export_service.export_marketplace_products( + db=db, + language=language, + ) + + # Write to file + timestamp = started_at.strftime("%Y%m%d_%H%M%S") + file_name = f"marketplace_products_{language}_{timestamp}.csv" + file_path = export_dir / file_name + + with open(file_path, "w", encoding="utf-8") as f: + f.write(csv_content) + + completed_at = datetime.now(UTC) + duration = (completed_at - started_at).total_seconds() + + logger.info(f"Marketplace export complete: {file_path} ({duration:.2f}s)") + + return { + "success": True, + "path": str(file_path), + "file_name": file_name, + "language": language, + "duration_seconds": duration, + "triggered_by": triggered_by, + } + + except Exception as e: + logger.error(f"Error exporting marketplace products: {e}") + return { + "success": False, + "error": str(e), + "language": language, + } diff --git a/app/modules/marketplace/tasks/import_tasks.py b/app/modules/marketplace/tasks/import_tasks.py new file mode 100644 index 00000000..042fa1c9 --- /dev/null +++ b/app/modules/marketplace/tasks/import_tasks.py @@ -0,0 +1,403 @@ +# app/modules/marketplace/tasks/import_tasks.py +""" +Celery tasks for marketplace product and order imports. + +Includes: +- CSV product import from marketplace feeds +- Historical order imports from Letzshop API +""" + +import asyncio +import logging +from datetime import UTC, datetime +from typing import Callable + +from app.core.celery_config import celery_app +from models.database.marketplace_import_job import MarketplaceImportJob +from models.database.letzshop import LetzshopHistoricalImportJob +from app.services.admin_notification_service import admin_notification_service +from app.services.letzshop import LetzshopClientError +from app.services.letzshop.credentials_service import LetzshopCredentialsService +from app.services.letzshop.order_service import LetzshopOrderService +from app.modules.task_base import ModuleTask +from app.utils.csv_processor import CSVProcessor +from models.database.vendor import Vendor + +logger = logging.getLogger(__name__) + + +def _get_credentials_service(db) -> LetzshopCredentialsService: + """Create a credentials service instance.""" + return LetzshopCredentialsService(db) + + +def _get_order_service(db) -> LetzshopOrderService: + """Create an order service instance.""" + return LetzshopOrderService(db) + + +@celery_app.task( + bind=True, + base=ModuleTask, + name="app.modules.marketplace.tasks.import_tasks.process_marketplace_import", + max_retries=3, + default_retry_delay=60, + autoretry_for=(Exception,), + retry_backoff=True, + retry_backoff_max=600, + retry_jitter=True, +) +def process_marketplace_import( + self, + job_id: int, + url: str, + marketplace: str, + vendor_id: int, + batch_size: int = 1000, + language: str = "en", +): + """ + Celery task to process marketplace CSV import. + + Args: + job_id: ID of the MarketplaceImportJob record + url: URL to the CSV file + marketplace: Name of the marketplace (e.g., 'Letzshop') + vendor_id: ID of the vendor + batch_size: Number of rows to process per batch + language: Language code for translations (default: 'en') + + Returns: + dict: Import results with counts + """ + csv_processor = CSVProcessor() + + with self.get_db() as db: + # Get the import job + job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() + if not job: + logger.error(f"Import job {job_id} not found") + return {"error": f"Import job {job_id} not found"} + + # Store Celery task ID on job + job.celery_task_id = self.request.id + + # Get vendor information + vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() + if not vendor: + logger.error(f"Vendor {vendor_id} not found for import job {job_id}") + job.status = "failed" + job.error_message = f"Vendor {vendor_id} not found" + job.completed_at = datetime.now(UTC) + return {"error": f"Vendor {vendor_id} not found"} + + # Update job status + job.status = "processing" + job.started_at = datetime.now(UTC) + + logger.info( + f"Processing import: Job {job_id}, Marketplace: {marketplace}, " + f"Vendor: {vendor.name} ({vendor.vendor_code}), Language: {language}" + ) + + try: + # Run the async CSV processor in a sync context + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete( + csv_processor.process_marketplace_csv_from_url( + url=url, + marketplace=marketplace, + vendor_name=vendor.name, + batch_size=batch_size, + db=db, + language=language, + import_job_id=job_id, + ) + ) + finally: + loop.close() + + # Update job with results + job.status = "completed" + job.completed_at = datetime.now(UTC) + job.imported_count = result["imported"] + job.updated_count = result["updated"] + job.error_count = result.get("errors", 0) + job.total_processed = result["total_processed"] + + if result.get("errors", 0) > 0: + job.status = "completed_with_errors" + job.error_message = f"{result['errors']} rows had errors" + + # Notify admin if error count is significant + if result.get("errors", 0) >= 5: + admin_notification_service.notify_import_failure( + db=db, + vendor_name=vendor.name, + job_id=job_id, + error_message=f"Import completed with {result['errors']} errors out of {result['total_processed']} rows", + vendor_id=vendor_id, + ) + + logger.info( + f"Import job {job_id} completed: " + f"imported={result['imported']}, updated={result['updated']}, " + f"errors={result.get('errors', 0)}" + ) + + return { + "job_id": job_id, + "imported": result["imported"], + "updated": result["updated"], + "errors": result.get("errors", 0), + "total_processed": result["total_processed"], + } + + except Exception as e: + logger.error(f"Import job {job_id} failed: {e}", exc_info=True) + job.status = "failed" + job.error_message = str(e)[:500] # Truncate long errors + job.completed_at = datetime.now(UTC) + + # Create admin notification for import failure + admin_notification_service.notify_import_failure( + db=db, + vendor_name=vendor.name, + job_id=job_id, + error_message=str(e)[:200], + vendor_id=vendor_id, + ) + + raise # Re-raise for Celery retry + + +@celery_app.task( + bind=True, + base=ModuleTask, + name="app.modules.marketplace.tasks.import_tasks.process_historical_import", + max_retries=2, + default_retry_delay=120, + autoretry_for=(Exception,), + retry_backoff=True, +) +def process_historical_import(self, job_id: int, vendor_id: int): + """ + Celery task for historical order import with progress tracking. + + Imports both confirmed and unconfirmed orders from Letzshop API, + updating job progress in the database for frontend polling. + + Args: + job_id: ID of the LetzshopHistoricalImportJob record + vendor_id: ID of the vendor to import orders for + + Returns: + dict: Import statistics + """ + with self.get_db() as db: + # Get the import job + job = ( + db.query(LetzshopHistoricalImportJob) + .filter(LetzshopHistoricalImportJob.id == job_id) + .first() + ) + if not job: + logger.error(f"Historical import job {job_id} not found") + return {"error": f"Job {job_id} not found"} + + # Store Celery task ID + job.celery_task_id = self.request.id + + try: + # Mark as started + job.status = "fetching" + job.started_at = datetime.now(UTC) + + creds_service = _get_credentials_service(db) + order_service = _get_order_service(db) + + # Create progress callback for fetching + def fetch_progress_callback(page: int, total_fetched: int): + """Update fetch progress in database.""" + job.current_page = page + job.shipments_fetched = total_fetched + + # Create progress callback for processing + def create_processing_callback( + phase: str, + ) -> Callable[[int, int, int, int], None]: + """Create a processing progress callback for a phase.""" + + def callback(processed: int, imported: int, updated: int, skipped: int): + job.orders_processed = processed + job.orders_imported = imported + job.orders_updated = updated + job.orders_skipped = skipped + + return callback + + with creds_service.create_client(vendor_id) as client: + # ================================================================ + # Phase 1: Import confirmed orders + # ================================================================ + job.current_phase = "confirmed" + job.current_page = 0 + job.shipments_fetched = 0 + + logger.info(f"Job {job_id}: Fetching confirmed shipments for vendor {vendor_id}") + + confirmed_shipments = client.get_all_shipments_paginated( + state="confirmed", + page_size=50, + progress_callback=fetch_progress_callback, + ) + + logger.info(f"Job {job_id}: Fetched {len(confirmed_shipments)} confirmed shipments") + + # Process confirmed shipments + job.status = "processing" + job.orders_processed = 0 + job.orders_imported = 0 + job.orders_updated = 0 + job.orders_skipped = 0 + + confirmed_stats = order_service.import_historical_shipments( + vendor_id=vendor_id, + shipments=confirmed_shipments, + match_products=True, + progress_callback=create_processing_callback("confirmed"), + ) + + # Store confirmed stats + job.confirmed_stats = { + "total": confirmed_stats["total"], + "imported": confirmed_stats["imported"], + "updated": confirmed_stats["updated"], + "skipped": confirmed_stats["skipped"], + "products_matched": confirmed_stats["products_matched"], + "products_not_found": confirmed_stats["products_not_found"], + } + job.products_matched = confirmed_stats["products_matched"] + job.products_not_found = confirmed_stats["products_not_found"] + + logger.info( + f"Job {job_id}: Confirmed phase complete - " + f"imported={confirmed_stats['imported']}, " + f"updated={confirmed_stats['updated']}, " + f"skipped={confirmed_stats['skipped']}" + ) + + # ================================================================ + # Phase 2: Import unconfirmed (pending) orders + # ================================================================ + job.current_phase = "unconfirmed" + job.status = "fetching" + job.current_page = 0 + job.shipments_fetched = 0 + + logger.info(f"Job {job_id}: Fetching unconfirmed shipments for vendor {vendor_id}") + + unconfirmed_shipments = client.get_all_shipments_paginated( + state="unconfirmed", + page_size=50, + progress_callback=fetch_progress_callback, + ) + + logger.info( + f"Job {job_id}: Fetched {len(unconfirmed_shipments)} unconfirmed shipments" + ) + + # Process unconfirmed shipments + job.status = "processing" + job.orders_processed = 0 + + unconfirmed_stats = order_service.import_historical_shipments( + vendor_id=vendor_id, + shipments=unconfirmed_shipments, + match_products=True, + progress_callback=create_processing_callback("unconfirmed"), + ) + + # Store unconfirmed stats + job.declined_stats = { + "total": unconfirmed_stats["total"], + "imported": unconfirmed_stats["imported"], + "updated": unconfirmed_stats["updated"], + "skipped": unconfirmed_stats["skipped"], + "products_matched": unconfirmed_stats["products_matched"], + "products_not_found": unconfirmed_stats["products_not_found"], + } + + # Add to cumulative product matching stats + job.products_matched += unconfirmed_stats["products_matched"] + job.products_not_found += unconfirmed_stats["products_not_found"] + + logger.info( + f"Job {job_id}: Unconfirmed phase complete - " + f"imported={unconfirmed_stats['imported']}, " + f"updated={unconfirmed_stats['updated']}, " + f"skipped={unconfirmed_stats['skipped']}" + ) + + # ================================================================ + # Complete + # ================================================================ + job.status = "completed" + job.completed_at = datetime.now(UTC) + + # Update credentials sync status + creds_service.update_sync_status(vendor_id, "success", None) + + logger.info(f"Job {job_id}: Historical import completed successfully") + + return { + "job_id": job_id, + "confirmed": confirmed_stats, + "unconfirmed": unconfirmed_stats, + } + + except LetzshopClientError as e: + logger.error(f"Job {job_id}: Letzshop API error: {e}") + job.status = "failed" + job.error_message = f"Letzshop API error: {e}" + job.completed_at = datetime.now(UTC) + + # Get vendor name for notification + order_service = _get_order_service(db) + vendor = order_service.get_vendor(vendor_id) + vendor_name = vendor.name if vendor else f"Vendor {vendor_id}" + + # Create admin notification + admin_notification_service.notify_order_sync_failure( + db=db, + vendor_name=vendor_name, + error_message=f"Historical import failed: {str(e)[:150]}", + vendor_id=vendor_id, + ) + + creds_service = _get_credentials_service(db) + creds_service.update_sync_status(vendor_id, "failed", str(e)) + raise # Re-raise for Celery retry + + except Exception as e: + logger.error(f"Job {job_id}: Unexpected error: {e}", exc_info=True) + job.status = "failed" + job.error_message = str(e)[:500] + job.completed_at = datetime.now(UTC) + + # Get vendor name for notification + order_service = _get_order_service(db) + vendor = order_service.get_vendor(vendor_id) + vendor_name = vendor.name if vendor else f"Vendor {vendor_id}" + + # Create admin notification + admin_notification_service.notify_critical_error( + db=db, + error_type="Historical Import", + error_message=f"Import job {job_id} failed for {vendor_name}: {str(e)[:150]}", + details={"job_id": job_id, "vendor_id": vendor_id, "vendor_name": vendor_name}, + ) + + raise # Re-raise for Celery retry diff --git a/app/modules/marketplace/tasks/sync_tasks.py b/app/modules/marketplace/tasks/sync_tasks.py new file mode 100644 index 00000000..4d3754a8 --- /dev/null +++ b/app/modules/marketplace/tasks/sync_tasks.py @@ -0,0 +1,84 @@ +# app/modules/marketplace/tasks/sync_tasks.py +""" +Celery tasks for Letzshop vendor directory synchronization. + +Periodically syncs vendor information from Letzshop's public GraphQL API. +""" + +import logging +from typing import Any + +from app.core.celery_config import celery_app +from app.modules.task_base import ModuleTask +from app.services.admin_notification_service import admin_notification_service +from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService + +logger = logging.getLogger(__name__) + + +@celery_app.task( + bind=True, + base=ModuleTask, + name="app.modules.marketplace.tasks.sync_tasks.sync_vendor_directory", + max_retries=2, + default_retry_delay=300, + autoretry_for=(Exception,), + retry_backoff=True, +) +def sync_vendor_directory(self) -> dict[str, Any]: + """ + Celery task to sync Letzshop vendor directory. + + Fetches all vendors from Letzshop's public GraphQL API and updates + the local letzshop_vendor_cache table. + + This task is scheduled to run daily via the module's scheduled_tasks + definition. + + Returns: + dict: Sync statistics including created, updated, and error counts. + """ + with self.get_db() as db: + try: + logger.info("Starting Letzshop vendor directory sync...") + + sync_service = LetzshopVendorSyncService(db) + + def progress_callback(page: int, fetched: int, total: int): + """Log progress during sync.""" + logger.info(f"Vendor sync progress: page {page}, {fetched}/{total} vendors") + + stats = sync_service.sync_all_vendors(progress_callback=progress_callback) + + logger.info( + f"Vendor directory sync completed: " + f"{stats.get('created', 0)} created, " + f"{stats.get('updated', 0)} updated, " + f"{stats.get('errors', 0)} errors" + ) + + # Send admin notification if there were errors + if stats.get("errors", 0) > 0: + admin_notification_service.notify_system_info( + db=db, + title="Letzshop Vendor Sync Completed with Errors", + message=( + f"Synced {stats.get('total_fetched', 0)} vendors. " + f"Errors: {stats.get('errors', 0)}" + ), + details=stats, + ) + + return stats + + except Exception as e: + logger.error(f"Vendor directory sync failed: {e}", exc_info=True) + + # Notify admins of failure + admin_notification_service.notify_critical_error( + db=db, + error_type="Vendor Directory Sync", + error_message=f"Failed to sync Letzshop vendor directory: {str(e)[:200]}", + details={"error": str(e)}, + ) + raise # Re-raise for Celery retry diff --git a/app/tasks/celery_tasks/export.py b/app/tasks/celery_tasks/export.py index b870d5d9..3ab68b8c 100644 --- a/app/tasks/celery_tasks/export.py +++ b/app/tasks/celery_tasks/export.py @@ -1,199 +1,24 @@ # app/tasks/celery_tasks/export.py """ -Celery tasks for product export operations. +Legacy export tasks. -Handles exporting vendor products to various formats (e.g., Letzshop CSV). +MIGRATED: All tasks have been migrated to app.modules.marketplace.tasks. + +New locations: +- export_vendor_products_to_folder -> app.modules.marketplace.tasks.export_tasks +- export_marketplace_products -> app.modules.marketplace.tasks.export_tasks + +Import from the new location: + from app.modules.marketplace.tasks import ( + export_vendor_products_to_folder, + export_marketplace_products, + ) """ -import logging -import os -from datetime import UTC, datetime -from pathlib import Path - -from app.core.celery_config import celery_app -from app.tasks.celery_tasks.base import DatabaseTask -from models.database.vendor import Vendor - -logger = logging.getLogger(__name__) - - -@celery_app.task( - bind=True, - base=DatabaseTask, - name="app.tasks.celery_tasks.export.export_vendor_products_to_folder", - max_retries=3, - default_retry_delay=60, +# Re-export from new location for backward compatibility +from app.modules.marketplace.tasks.export_tasks import ( + export_vendor_products_to_folder, + export_marketplace_products, ) -def export_vendor_products_to_folder( - self, - vendor_id: int, - triggered_by: str, - include_inactive: bool = False, -): - """ - Export all 3 languages (en, fr, de) to disk asynchronously. - Args: - vendor_id: ID of the vendor to export - triggered_by: User identifier who triggered the export - include_inactive: Whether to include inactive products - - Returns: - dict: Export results per language with file paths - """ - from app.services.letzshop_export_service import letzshop_export_service - - languages = ["en", "fr", "de"] - results = {} - export_dir = None - - with self.get_db() as db: - # Get vendor info - vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() - if not vendor: - logger.error(f"Vendor {vendor_id} not found for export") - return {"error": f"Vendor {vendor_id} not found"} - - vendor_code = vendor.vendor_code - - # Create export directory - export_dir = Path(f"exports/letzshop/{vendor_code}") - export_dir.mkdir(parents=True, exist_ok=True) - - started_at = datetime.now(UTC) - logger.info(f"Starting product export for vendor {vendor_code} (ID: {vendor_id})") - - for lang in languages: - try: - # Generate CSV content - csv_content = letzshop_export_service.export_vendor_products( - db=db, - vendor_id=vendor_id, - language=lang, - include_inactive=include_inactive, - ) - - # Write to file - file_name = f"{vendor_code}_products_{lang}.csv" - file_path = export_dir / file_name - - with open(file_path, "w", encoding="utf-8") as f: - f.write(csv_content) - - results[lang] = { - "success": True, - "path": str(file_path), - "file_name": file_name, - } - - logger.info(f"Exported {lang} products to {file_path}") - - except Exception as e: - logger.error(f"Error exporting {lang} products for vendor {vendor_id}: {e}") - results[lang] = { - "success": False, - "error": str(e), - } - - # Log the export - completed_at = datetime.now(UTC) - duration = (completed_at - started_at).total_seconds() - success_count = sum(1 for r in results.values() if r.get("success")) - - try: - letzshop_export_service.log_export( - db=db, - vendor_id=vendor_id, - triggered_by=triggered_by, - records_processed=len(languages), - records_succeeded=success_count, - records_failed=len(languages) - success_count, - duration_seconds=duration, - ) - db.commit() - except Exception as e: - logger.error(f"Failed to log export: {e}") - - logger.info( - f"Product export complete for vendor {vendor_code}: " - f"{success_count}/{len(languages)} languages exported in {duration:.2f}s" - ) - - return { - "vendor_id": vendor_id, - "vendor_code": vendor_code, - "export_dir": str(export_dir), - "results": results, - "duration_seconds": duration, - "triggered_by": triggered_by, - } - - -@celery_app.task( - bind=True, - base=DatabaseTask, - name="app.tasks.celery_tasks.export.export_marketplace_products", - max_retries=3, - default_retry_delay=60, -) -def export_marketplace_products( - self, - language: str = "en", - triggered_by: str = "system", -): - """ - Export all marketplace products (admin use). - - Args: - language: Language code for export (en, fr, de) - triggered_by: User identifier who triggered the export - - Returns: - dict: Export result with file path - """ - from app.services.letzshop_export_service import letzshop_export_service - - with self.get_db() as db: - started_at = datetime.now(UTC) - logger.info(f"Starting marketplace product export ({language})") - - try: - # Create export directory - export_dir = Path("exports/marketplace") - export_dir.mkdir(parents=True, exist_ok=True) - - # Generate CSV content - csv_content = letzshop_export_service.export_marketplace_products( - db=db, - language=language, - ) - - # Write to file - timestamp = started_at.strftime("%Y%m%d_%H%M%S") - file_name = f"marketplace_products_{language}_{timestamp}.csv" - file_path = export_dir / file_name - - with open(file_path, "w", encoding="utf-8") as f: - f.write(csv_content) - - completed_at = datetime.now(UTC) - duration = (completed_at - started_at).total_seconds() - - logger.info(f"Marketplace export complete: {file_path} ({duration:.2f}s)") - - return { - "success": True, - "path": str(file_path), - "file_name": file_name, - "language": language, - "duration_seconds": duration, - "triggered_by": triggered_by, - } - - except Exception as e: - logger.error(f"Error exporting marketplace products: {e}") - return { - "success": False, - "error": str(e), - "language": language, - } +__all__ = ["export_vendor_products_to_folder", "export_marketplace_products"] diff --git a/app/tasks/celery_tasks/letzshop.py b/app/tasks/celery_tasks/letzshop.py index c544d9d9..79f5124f 100644 --- a/app/tasks/celery_tasks/letzshop.py +++ b/app/tasks/celery_tasks/letzshop.py @@ -1,350 +1,22 @@ # app/tasks/celery_tasks/letzshop.py """ -Celery tasks for Letzshop integration. +Legacy Letzshop tasks. -Includes: -- Historical order imports -- Vendor directory sync +MIGRATED: All tasks have been migrated to app.modules.marketplace.tasks. + +New locations: +- process_historical_import -> app.modules.marketplace.tasks.import_tasks +- sync_vendor_directory -> app.modules.marketplace.tasks.sync_tasks + +Import from the new location: + from app.modules.marketplace.tasks import ( + process_historical_import, + sync_vendor_directory, + ) """ -import logging -from datetime import UTC, datetime -from typing import Any, Callable +# Re-export from new location for backward compatibility +from app.modules.marketplace.tasks.import_tasks import process_historical_import +from app.modules.marketplace.tasks.sync_tasks import sync_vendor_directory -from app.core.celery_config import celery_app -from app.services.admin_notification_service import admin_notification_service -from app.services.letzshop import LetzshopClientError -from app.services.letzshop.credentials_service import LetzshopCredentialsService -from app.services.letzshop.order_service import LetzshopOrderService -from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService -from app.tasks.celery_tasks.base import DatabaseTask -from models.database.letzshop import LetzshopHistoricalImportJob - -logger = logging.getLogger(__name__) - - -def _get_credentials_service(db) -> LetzshopCredentialsService: - """Create a credentials service instance.""" - return LetzshopCredentialsService(db) - - -def _get_order_service(db) -> LetzshopOrderService: - """Create an order service instance.""" - return LetzshopOrderService(db) - - -@celery_app.task( - bind=True, - base=DatabaseTask, - name="app.tasks.celery_tasks.letzshop.process_historical_import", - max_retries=2, - default_retry_delay=120, - autoretry_for=(Exception,), - retry_backoff=True, -) -def process_historical_import(self, job_id: int, vendor_id: int): - """ - Celery task for historical order import with progress tracking. - - Imports both confirmed and unconfirmed orders from Letzshop API, - updating job progress in the database for frontend polling. - - Args: - job_id: ID of the LetzshopHistoricalImportJob record - vendor_id: ID of the vendor to import orders for - - Returns: - dict: Import statistics - """ - with self.get_db() as db: - # Get the import job - job = ( - db.query(LetzshopHistoricalImportJob) - .filter(LetzshopHistoricalImportJob.id == job_id) - .first() - ) - if not job: - logger.error(f"Historical import job {job_id} not found") - return {"error": f"Job {job_id} not found"} - - # Store Celery task ID - job.celery_task_id = self.request.id - - try: - # Mark as started - job.status = "fetching" - job.started_at = datetime.now(UTC) - db.commit() - - creds_service = _get_credentials_service(db) - order_service = _get_order_service(db) - - # Create progress callback for fetching - def fetch_progress_callback(page: int, total_fetched: int): - """Update fetch progress in database.""" - job.current_page = page - job.shipments_fetched = total_fetched - db.commit() - - # Create progress callback for processing - def create_processing_callback( - phase: str, - ) -> Callable[[int, int, int, int], None]: - """Create a processing progress callback for a phase.""" - - def callback(processed: int, imported: int, updated: int, skipped: int): - job.orders_processed = processed - job.orders_imported = imported - job.orders_updated = updated - job.orders_skipped = skipped - db.commit() - - return callback - - with creds_service.create_client(vendor_id) as client: - # ================================================================ - # Phase 1: Import confirmed orders - # ================================================================ - job.current_phase = "confirmed" - job.current_page = 0 - job.shipments_fetched = 0 - db.commit() - - logger.info(f"Job {job_id}: Fetching confirmed shipments for vendor {vendor_id}") - - confirmed_shipments = client.get_all_shipments_paginated( - state="confirmed", - page_size=50, - progress_callback=fetch_progress_callback, - ) - - logger.info(f"Job {job_id}: Fetched {len(confirmed_shipments)} confirmed shipments") - - # Process confirmed shipments - job.status = "processing" - job.orders_processed = 0 - job.orders_imported = 0 - job.orders_updated = 0 - job.orders_skipped = 0 - db.commit() - - confirmed_stats = order_service.import_historical_shipments( - vendor_id=vendor_id, - shipments=confirmed_shipments, - match_products=True, - progress_callback=create_processing_callback("confirmed"), - ) - - # Store confirmed stats - job.confirmed_stats = { - "total": confirmed_stats["total"], - "imported": confirmed_stats["imported"], - "updated": confirmed_stats["updated"], - "skipped": confirmed_stats["skipped"], - "products_matched": confirmed_stats["products_matched"], - "products_not_found": confirmed_stats["products_not_found"], - } - job.products_matched = confirmed_stats["products_matched"] - job.products_not_found = confirmed_stats["products_not_found"] - db.commit() - - logger.info( - f"Job {job_id}: Confirmed phase complete - " - f"imported={confirmed_stats['imported']}, " - f"updated={confirmed_stats['updated']}, " - f"skipped={confirmed_stats['skipped']}" - ) - - # ================================================================ - # Phase 2: Import unconfirmed (pending) orders - # ================================================================ - job.current_phase = "unconfirmed" - job.status = "fetching" - job.current_page = 0 - job.shipments_fetched = 0 - db.commit() - - logger.info(f"Job {job_id}: Fetching unconfirmed shipments for vendor {vendor_id}") - - unconfirmed_shipments = client.get_all_shipments_paginated( - state="unconfirmed", - page_size=50, - progress_callback=fetch_progress_callback, - ) - - logger.info( - f"Job {job_id}: Fetched {len(unconfirmed_shipments)} unconfirmed shipments" - ) - - # Process unconfirmed shipments - job.status = "processing" - job.orders_processed = 0 - db.commit() - - unconfirmed_stats = order_service.import_historical_shipments( - vendor_id=vendor_id, - shipments=unconfirmed_shipments, - match_products=True, - progress_callback=create_processing_callback("unconfirmed"), - ) - - # Store unconfirmed stats - job.declined_stats = { - "total": unconfirmed_stats["total"], - "imported": unconfirmed_stats["imported"], - "updated": unconfirmed_stats["updated"], - "skipped": unconfirmed_stats["skipped"], - "products_matched": unconfirmed_stats["products_matched"], - "products_not_found": unconfirmed_stats["products_not_found"], - } - - # Add to cumulative product matching stats - job.products_matched += unconfirmed_stats["products_matched"] - job.products_not_found += unconfirmed_stats["products_not_found"] - - logger.info( - f"Job {job_id}: Unconfirmed phase complete - " - f"imported={unconfirmed_stats['imported']}, " - f"updated={unconfirmed_stats['updated']}, " - f"skipped={unconfirmed_stats['skipped']}" - ) - - # ================================================================ - # Complete - # ================================================================ - job.status = "completed" - job.completed_at = datetime.now(UTC) - db.commit() - - # Update credentials sync status - creds_service.update_sync_status(vendor_id, "success", None) - - logger.info(f"Job {job_id}: Historical import completed successfully") - - return { - "job_id": job_id, - "confirmed": confirmed_stats, - "unconfirmed": unconfirmed_stats, - } - - except LetzshopClientError as e: - logger.error(f"Job {job_id}: Letzshop API error: {e}") - job.status = "failed" - job.error_message = f"Letzshop API error: {e}" - job.completed_at = datetime.now(UTC) - - # Get vendor name for notification - order_service = _get_order_service(db) - vendor = order_service.get_vendor(vendor_id) - vendor_name = vendor.name if vendor else f"Vendor {vendor_id}" - - # Create admin notification - admin_notification_service.notify_order_sync_failure( - db=db, - vendor_name=vendor_name, - error_message=f"Historical import failed: {str(e)[:150]}", - vendor_id=vendor_id, - ) - - db.commit() - - creds_service = _get_credentials_service(db) - creds_service.update_sync_status(vendor_id, "failed", str(e)) - raise # Re-raise for Celery retry - - except Exception as e: - logger.error(f"Job {job_id}: Unexpected error: {e}", exc_info=True) - job.status = "failed" - job.error_message = str(e)[:500] - job.completed_at = datetime.now(UTC) - - # Get vendor name for notification - order_service = _get_order_service(db) - vendor = order_service.get_vendor(vendor_id) - vendor_name = vendor.name if vendor else f"Vendor {vendor_id}" - - # Create admin notification - admin_notification_service.notify_critical_error( - db=db, - error_type="Historical Import", - error_message=f"Import job {job_id} failed for {vendor_name}: {str(e)[:150]}", - details={"job_id": job_id, "vendor_id": vendor_id, "vendor_name": vendor_name}, - ) - - db.commit() - raise # Re-raise for Celery retry - - -# ============================================================================= -# Vendor Directory Sync -# ============================================================================= - - -@celery_app.task( - bind=True, - base=DatabaseTask, - name="app.tasks.celery_tasks.letzshop.sync_vendor_directory", - max_retries=2, - default_retry_delay=300, - autoretry_for=(Exception,), - retry_backoff=True, -) -def sync_vendor_directory(self) -> dict[str, Any]: - """ - Celery task to sync Letzshop vendor directory. - - Fetches all vendors from Letzshop's public GraphQL API and updates - the local letzshop_vendor_cache table. - - This task should be scheduled to run periodically (e.g., daily) - via Celery beat. - - Returns: - dict: Sync statistics including created, updated, and error counts. - """ - with self.get_db() as db: - try: - logger.info("Starting Letzshop vendor directory sync...") - - sync_service = LetzshopVendorSyncService(db) - - def progress_callback(page: int, fetched: int, total: int): - """Log progress during sync.""" - logger.info(f"Vendor sync progress: page {page}, {fetched}/{total} vendors") - - stats = sync_service.sync_all_vendors(progress_callback=progress_callback) - - logger.info( - f"Vendor directory sync completed: " - f"{stats.get('created', 0)} created, " - f"{stats.get('updated', 0)} updated, " - f"{stats.get('errors', 0)} errors" - ) - - # Send admin notification if there were errors - if stats.get("errors", 0) > 0: - admin_notification_service.notify_system_info( - db=db, - title="Letzshop Vendor Sync Completed with Errors", - message=( - f"Synced {stats.get('total_fetched', 0)} vendors. " - f"Errors: {stats.get('errors', 0)}" - ), - details=stats, - ) - db.commit() - - return stats - - except Exception as e: - logger.error(f"Vendor directory sync failed: {e}", exc_info=True) - - # Notify admins of failure - admin_notification_service.notify_critical_error( - db=db, - error_type="Vendor Directory Sync", - error_message=f"Failed to sync Letzshop vendor directory: {str(e)[:200]}", - details={"error": str(e)}, - ) - db.commit() - raise # Re-raise for Celery retry +__all__ = ["process_historical_import", "sync_vendor_directory"] diff --git a/app/tasks/celery_tasks/marketplace.py b/app/tasks/celery_tasks/marketplace.py index 81b9ea8e..c8723428 100644 --- a/app/tasks/celery_tasks/marketplace.py +++ b/app/tasks/celery_tasks/marketplace.py @@ -1,160 +1,17 @@ # app/tasks/celery_tasks/marketplace.py """ -Celery tasks for marketplace product imports. +Legacy marketplace tasks. -Wraps the existing process_marketplace_import function for Celery execution. +MIGRATED: All tasks have been migrated to app.modules.marketplace.tasks. + +New locations: +- process_marketplace_import -> app.modules.marketplace.tasks.import_tasks + +Import from the new location: + from app.modules.marketplace.tasks import process_marketplace_import """ -import asyncio -import logging -from datetime import UTC, datetime +# Re-export from new location for backward compatibility +from app.modules.marketplace.tasks.import_tasks import process_marketplace_import -from app.core.celery_config import celery_app -from app.services.admin_notification_service import admin_notification_service -from app.tasks.celery_tasks.base import DatabaseTask -from app.utils.csv_processor import CSVProcessor -from models.database.marketplace_import_job import MarketplaceImportJob -from models.database.vendor import Vendor - -logger = logging.getLogger(__name__) - - -@celery_app.task( - bind=True, - base=DatabaseTask, - name="app.tasks.celery_tasks.marketplace.process_marketplace_import", - max_retries=3, - default_retry_delay=60, - autoretry_for=(Exception,), - retry_backoff=True, - retry_backoff_max=600, - retry_jitter=True, -) -def process_marketplace_import( - self, - job_id: int, - url: str, - marketplace: str, - vendor_id: int, - batch_size: int = 1000, - language: str = "en", -): - """ - Celery task to process marketplace CSV import. - - Args: - job_id: ID of the MarketplaceImportJob record - url: URL to the CSV file - marketplace: Name of the marketplace (e.g., 'Letzshop') - vendor_id: ID of the vendor - batch_size: Number of rows to process per batch - language: Language code for translations (default: 'en') - - Returns: - dict: Import results with counts - """ - csv_processor = CSVProcessor() - - with self.get_db() as db: - # Get the import job - job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() - if not job: - logger.error(f"Import job {job_id} not found") - return {"error": f"Import job {job_id} not found"} - - # Store Celery task ID on job - job.celery_task_id = self.request.id - - # Get vendor information - vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() - if not vendor: - logger.error(f"Vendor {vendor_id} not found for import job {job_id}") - job.status = "failed" - job.error_message = f"Vendor {vendor_id} not found" - job.completed_at = datetime.now(UTC) - db.commit() - return {"error": f"Vendor {vendor_id} not found"} - - # Update job status - job.status = "processing" - job.started_at = datetime.now(UTC) - db.commit() - - logger.info( - f"Processing import: Job {job_id}, Marketplace: {marketplace}, " - f"Vendor: {vendor.name} ({vendor.vendor_code}), Language: {language}" - ) - - try: - # Run the async CSV processor in a sync context - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - result = loop.run_until_complete( - csv_processor.process_marketplace_csv_from_url( - url=url, - marketplace=marketplace, - vendor_name=vendor.name, - batch_size=batch_size, - db=db, - language=language, - import_job_id=job_id, - ) - ) - finally: - loop.close() - - # Update job with results - job.status = "completed" - job.completed_at = datetime.now(UTC) - job.imported_count = result["imported"] - job.updated_count = result["updated"] - job.error_count = result.get("errors", 0) - job.total_processed = result["total_processed"] - - if result.get("errors", 0) > 0: - job.status = "completed_with_errors" - job.error_message = f"{result['errors']} rows had errors" - - # Notify admin if error count is significant - if result.get("errors", 0) >= 5: - admin_notification_service.notify_import_failure( - db=db, - vendor_name=vendor.name, - job_id=job_id, - error_message=f"Import completed with {result['errors']} errors out of {result['total_processed']} rows", - vendor_id=vendor_id, - ) - - db.commit() - logger.info( - f"Import job {job_id} completed: " - f"imported={result['imported']}, updated={result['updated']}, " - f"errors={result.get('errors', 0)}" - ) - - return { - "job_id": job_id, - "imported": result["imported"], - "updated": result["updated"], - "errors": result.get("errors", 0), - "total_processed": result["total_processed"], - } - - except Exception as e: - logger.error(f"Import job {job_id} failed: {e}", exc_info=True) - job.status = "failed" - job.error_message = str(e)[:500] # Truncate long errors - job.completed_at = datetime.now(UTC) - - # Create admin notification for import failure - admin_notification_service.notify_import_failure( - db=db, - vendor_name=vendor.name, - job_id=job_id, - error_message=str(e)[:200], - vendor_id=vendor_id, - ) - - db.commit() - raise # Re-raise for Celery retry +__all__ = ["process_marketplace_import"] diff --git a/docs/proposals/module-migration-plan.md b/docs/proposals/module-migration-plan.md index d12e0809..72fd85cd 100644 --- a/docs/proposals/module-migration-plan.md +++ b/docs/proposals/module-migration-plan.md @@ -30,7 +30,7 @@ Transform the platform from a monolithic structure to self-contained modules whe | `cms` | Core | ✅ **Complete** | ✅ | ✅ | - | Done | | `payments` | Optional | 🟡 Partial | ✅ | ✅ | - | Done | | `billing` | Optional | ✅ **Complete** | ✅ | ✅ | ✅ | Done | -| `marketplace` | Optional | 🔴 Shell | ❌ | ❌ | ❌ | Full | +| `marketplace` | Optional | ✅ **Complete** | ✅ | ✅ | ✅ | Done | | `orders` | Optional | 🔴 Shell | ❌ | ❌ | - | Full | | `inventory` | Optional | 🔴 Shell | ❌ | ❌ | - | Full | | `customers` | Core | 🔴 Shell | ❌ | ❌ | - | Full | @@ -127,6 +127,19 @@ app/tasks/celery_tasks/ # → Move to respective modules - Created backward-compatible re-exports in `app/services/` - Updated legacy celery_config.py to not duplicate scheduled tasks +### ✅ Phase 6: Marketplace Module Migration +- Created `app/modules/marketplace/services/` re-exporting from existing locations +- Created `app/modules/marketplace/models/` re-exporting marketplace & letzshop models +- Created `app/modules/marketplace/schemas/` re-exporting marketplace schemas +- Created `app/modules/marketplace/tasks/` with: + - `import_tasks.py` - process_marketplace_import, process_historical_import + - `sync_tasks.py` - sync_vendor_directory (scheduled daily) + - `export_tasks.py` - export_vendor_products_to_folder, export_marketplace_products +- Created `app/modules/marketplace/exceptions.py` +- Updated `definition.py` with self-contained configuration +- Updated legacy task files to re-export from new location +- Removed marketplace/letzshop/export from LEGACY_TASK_MODULES + --- ## Module Migration Phases