From d0bb87f66de7d5388d6afbfbea42d167c67c155d Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Sun, 19 Oct 2025 16:05:04 +0200 Subject: [PATCH] marketplace import task --- tasks/marketplace_import.py | 100 ++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/tasks/marketplace_import.py b/tasks/marketplace_import.py index 552e1a7e..6c318097 100644 --- a/tasks/marketplace_import.py +++ b/tasks/marketplace_import.py @@ -1 +1,101 @@ # Marketplace CSV import tasks +# app/tasks/background_tasks.py +import logging +from datetime import datetime, timezone + +from app.core.database import SessionLocal +from models.database.marketplace_import_job import MarketplaceImportJob +from models.database.vendor import Vendor +from app.utils.csv_processor import CSVProcessor + +logger = logging.getLogger(__name__) + + +async def process_marketplace_import( + job_id: int, + url: str, + marketplace: str, + vendor_id: int, # FIXED: Changed from vendor_name to vendor_id + batch_size: int = 1000 +): + """Background task to process marketplace CSV import.""" + db = SessionLocal() + csv_processor = CSVProcessor() + job = None + + try: + # Get the import job + job = ( + db.query(MarketplaceImportJob) + .filter(MarketplaceImportJob.id == job_id) + .first() + ) + if not job: + logger.error(f"Import job {job_id} not found") + return + + # Get vendor information + vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() + if not vendor: + logger.error(f"Vendor {vendor_id} not found for import job {job_id}") + job.status = "failed" + job.error_message = f"Vendor {vendor_id} not found" + job.completed_at = datetime.now(timezone.utc) + db.commit() + return + + # Update job status + job.status = "processing" + job.started_at = datetime.now(timezone.utc) + db.commit() + + logger.info( + f"Processing import: Job {job_id}, Marketplace: {marketplace}, " + f"Vendor: {vendor.name} ({vendor.vendor_code})" + ) + + # Process CSV with vendor_id + result = await csv_processor.process_marketplace_csv_from_url( + url, + marketplace, + vendor_id, # FIXED: Pass vendor_id instead of vendor_name + batch_size, + db + ) + + # Update job with results + job.status = "completed" + job.completed_at = datetime.now(timezone.utc) + job.imported_count = result["imported"] + job.updated_count = result["updated"] + job.error_count = result.get("errors", 0) + job.total_processed = result["total_processed"] + + if result.get("errors", 0) > 0: + job.status = "completed_with_errors" + job.error_message = f"{result['errors']} rows had errors" + + db.commit() + logger.info( + f"Import job {job_id} completed: " + f"imported={result['imported']}, updated={result['updated']}, " + f"errors={result.get('errors', 0)}" + ) + + except Exception as e: + logger.error(f"Import job {job_id} failed: {e}", exc_info=True) + if job is not None: + try: + job.status = "failed" + job.error_message = str(e) + job.completed_at = datetime.now(timezone.utc) + db.commit() + except Exception as commit_error: + logger.error(f"Failed to update job status: {commit_error}") + db.rollback() + finally: + if hasattr(db, "close") and callable(getattr(db, "close")): + try: + db.close() + except Exception as close_error: + logger.error(f"Error closing database session: {close_error}")