# app/tasks/celery_tasks/export.py """ Celery tasks for product export operations. Handles exporting vendor products to various formats (e.g., Letzshop CSV). """ import logging import os from datetime import UTC, datetime from pathlib import Path from app.core.celery_config import celery_app from app.tasks.celery_tasks.base import DatabaseTask from models.database.vendor import Vendor logger = logging.getLogger(__name__) @celery_app.task( bind=True, base=DatabaseTask, name="app.tasks.celery_tasks.export.export_vendor_products_to_folder", max_retries=3, default_retry_delay=60, ) def export_vendor_products_to_folder( self, vendor_id: int, triggered_by: str, include_inactive: bool = False, ): """ Export all 3 languages (en, fr, de) to disk asynchronously. Args: vendor_id: ID of the vendor to export triggered_by: User identifier who triggered the export include_inactive: Whether to include inactive products Returns: dict: Export results per language with file paths """ from app.services.letzshop_export_service import letzshop_export_service languages = ["en", "fr", "de"] results = {} export_dir = None with self.get_db() as db: # Get vendor info vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() if not vendor: logger.error(f"Vendor {vendor_id} not found for export") return {"error": f"Vendor {vendor_id} not found"} vendor_code = vendor.vendor_code # Create export directory export_dir = Path(f"exports/letzshop/{vendor_code}") export_dir.mkdir(parents=True, exist_ok=True) started_at = datetime.now(UTC) logger.info(f"Starting product export for vendor {vendor_code} (ID: {vendor_id})") for lang in languages: try: # Generate CSV content csv_content = letzshop_export_service.export_vendor_products( db=db, vendor_id=vendor_id, language=lang, include_inactive=include_inactive, ) # Write to file file_name = f"{vendor_code}_products_{lang}.csv" file_path = export_dir / file_name with open(file_path, "w", encoding="utf-8") as f: f.write(csv_content) results[lang] = { "success": True, "path": str(file_path), "file_name": file_name, } logger.info(f"Exported {lang} products to {file_path}") except Exception as e: logger.error(f"Error exporting {lang} products for vendor {vendor_id}: {e}") results[lang] = { "success": False, "error": str(e), } # Log the export completed_at = datetime.now(UTC) duration = (completed_at - started_at).total_seconds() success_count = sum(1 for r in results.values() if r.get("success")) try: letzshop_export_service.log_export( db=db, vendor_id=vendor_id, triggered_by=triggered_by, records_processed=len(languages), records_succeeded=success_count, records_failed=len(languages) - success_count, duration_seconds=duration, ) db.commit() except Exception as e: logger.error(f"Failed to log export: {e}") logger.info( f"Product export complete for vendor {vendor_code}: " f"{success_count}/{len(languages)} languages exported in {duration:.2f}s" ) return { "vendor_id": vendor_id, "vendor_code": vendor_code, "export_dir": str(export_dir), "results": results, "duration_seconds": duration, "triggered_by": triggered_by, } @celery_app.task( bind=True, base=DatabaseTask, name="app.tasks.celery_tasks.export.export_marketplace_products", max_retries=3, default_retry_delay=60, ) def export_marketplace_products( self, language: str = "en", triggered_by: str = "system", ): """ Export all marketplace products (admin use). Args: language: Language code for export (en, fr, de) triggered_by: User identifier who triggered the export Returns: dict: Export result with file path """ from app.services.letzshop_export_service import letzshop_export_service with self.get_db() as db: started_at = datetime.now(UTC) logger.info(f"Starting marketplace product export ({language})") try: # Create export directory export_dir = Path("exports/marketplace") export_dir.mkdir(parents=True, exist_ok=True) # Generate CSV content csv_content = letzshop_export_service.export_marketplace_products( db=db, language=language, ) # Write to file timestamp = started_at.strftime("%Y%m%d_%H%M%S") file_name = f"marketplace_products_{language}_{timestamp}.csv" file_path = export_dir / file_name with open(file_path, "w", encoding="utf-8") as f: f.write(csv_content) completed_at = datetime.now(UTC) duration = (completed_at - started_at).total_seconds() logger.info(f"Marketplace export complete: {file_path} ({duration:.2f}s)") return { "success": True, "path": str(file_path), "file_name": file_name, "language": language, "duration_seconds": duration, "triggered_by": triggered_by, } except Exception as e: logger.error(f"Error exporting marketplace products: {e}") return { "success": False, "error": str(e), "language": language, }