# utils/csv_processor.py import pandas as pd import requests from io import StringIO from typing import Dict, Any, Optional from sqlalchemy.orm import Session from models.database_models import Product from datetime import datetime import logging logger = logging.getLogger(__name__) class CSVProcessor: """Handles CSV import with robust parsing and batching""" ENCODINGS = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252', 'utf-8-sig'] COLUMN_MAPPING = { # Standard variations 'id': 'product_id', 'ID': 'product_id', 'Product ID': 'product_id', 'name': 'title', 'Name': 'title', 'product_name': 'title', 'Product Name': 'title', # Google Shopping feed standard 'g:id': 'product_id', 'g:title': 'title', 'g:description': 'description', 'g:link': 'link', 'g:image_link': 'image_link', 'g:availability': 'availability', 'g:price': 'price', 'g:brand': 'brand', 'g:gtin': 'gtin', 'g:mpn': 'mpn', 'g:condition': 'condition', 'g:adult': 'adult', 'g:multipack': 'multipack', 'g:is_bundle': 'is_bundle', 'g:age_group': 'age_group', 'g:color': 'color', 'g:gender': 'gender', 'g:material': 'material', 'g:pattern': 'pattern', 'g:size': 'size', 'g:size_type': 'size_type', 'g:size_system': 'size_system', 'g:item_group_id': 'item_group_id', 'g:google_product_category': 'google_product_category', 'g:product_type': 'product_type', 'g:custom_label_0': 'custom_label_0', 'g:custom_label_1': 'custom_label_1', 'g:custom_label_2': 'custom_label_2', 'g:custom_label_3': 'custom_label_3', 'g:custom_label_4': 'custom_label_4', # Handle complex shipping column 'shipping(country:price:max_handling_time:min_transit_time:max_transit_time)': 'shipping' } def __init__(self): from utils.data_processing import GTINProcessor, PriceProcessor self.gtin_processor = GTINProcessor() self.price_processor = PriceProcessor() def download_csv(self, url: str) -> str: """Download and decode CSV with multiple encoding attempts""" try: response = requests.get(url, timeout=30) response.raise_for_status() content = response.content # Try different encodings for encoding in self.ENCODINGS: try: decoded_content = content.decode(encoding) logger.info(f"Successfully decoded CSV with encoding: {encoding}") return decoded_content except UnicodeDecodeError: continue # Fallback with error ignoring decoded_content = content.decode('utf-8', errors='ignore') logger.warning("Used UTF-8 with error ignoring for CSV decoding") return decoded_content except requests.RequestException as e: logger.error(f"Error downloading CSV: {e}") raise def parse_csv(self, csv_content: str) -> pd.DataFrame: """Parse CSV with multiple separator attempts""" parsing_configs = [ # Try auto-detection first {'sep': None, 'engine': 'python'}, # Try semicolon (common in European CSVs) {'sep': ';', 'engine': 'python'}, # Try comma {'sep': ',', 'engine': 'python'}, # Try tab {'sep': '\t', 'engine': 'python'}, ] for config in parsing_configs: try: df = pd.read_csv( StringIO(csv_content), on_bad_lines='skip', quotechar='"', skip_blank_lines=True, skipinitialspace=True, **config ) logger.info(f"Successfully parsed CSV with config: {config}") return df except pd.errors.ParserError: continue raise pd.errors.ParserError("Could not parse CSV with any configuration") def normalize_columns(self, df: pd.DataFrame) -> pd.DataFrame: """Normalize column names using mapping""" # Clean column names df.columns = df.columns.str.strip() # Apply mapping df = df.rename(columns=self.COLUMN_MAPPING) logger.info(f"Normalized columns: {list(df.columns)}") return df def _clean_row_data(self, row_data: Dict[str, Any]) -> Dict[str, Any]: """Process a single row with data normalization""" # Handle NaN values processed_data = {k: (v if pd.notna(v) else None) for k, v in row_data.items()} # Process GTIN if processed_data.get('gtin'): processed_data['gtin'] = self.gtin_processor.normalize(processed_data['gtin']) # Process price and currency if processed_data.get('price'): parsed_price, currency = self.price_processor.parse_price_currency(processed_data['price']) processed_data['price'] = parsed_price processed_data['currency'] = currency # Process sale_price if processed_data.get('sale_price'): parsed_sale_price, _ = self.price_processor.parse_price_currency(processed_data['sale_price']) processed_data['sale_price'] = parsed_sale_price # Clean MPN (remove .0 endings) if processed_data.get('mpn'): mpn_str = str(processed_data['mpn']).strip() if mpn_str.endswith('.0'): processed_data['mpn'] = mpn_str[:-2] # Handle multipack type conversion if processed_data.get('multipack') is not None: try: processed_data['multipack'] = int(float(processed_data['multipack'])) except (ValueError, TypeError): processed_data['multipack'] = None return processed_data async def process_marketplace_csv_from_url( self, url: str, marketplace: str, shop_name: str, batch_size: int, db: Session ) -> Dict[str, Any]: """ Process CSV from URL with marketplace and shop information Args: url: URL to the CSV file marketplace: Name of the marketplace (e.g., 'Letzshop', 'Amazon') shop_name: Name of the shop batch_size: Number of rows to process in each batch db: Database session Returns: Dictionary with processing results """ logger.info(f"Starting marketplace CSV import from {url} for {marketplace} -> {shop_name}") # Download and parse CSV csv_content = self.download_csv(url) df = self.parse_csv(csv_content) df = self.normalize_columns(df) logger.info(f"Processing CSV with {len(df)} rows and {len(df.columns)} columns") imported = 0 updated = 0 errors = 0 # Process in batches for i in range(0, len(df), batch_size): batch_df = df.iloc[i:i + batch_size] batch_result = await self._process_marketplace_batch( batch_df, marketplace, shop_name, db, i // batch_size + 1 ) imported += batch_result['imported'] updated += batch_result['updated'] errors += batch_result['errors'] logger.info(f"Processed batch {i // batch_size + 1}: {batch_result}") return { 'total_processed': imported + updated + errors, 'imported': imported, 'updated': updated, 'errors': errors, 'marketplace': marketplace, 'shop_name': shop_name } async def _process_marketplace_batch( self, batch_df: pd.DataFrame, marketplace: str, shop_name: str, db: Session, batch_num: int ) -> Dict[str, int]: """Process a batch of CSV rows with marketplace information""" imported = 0 updated = 0 errors = 0 logger.info(f"Processing batch {batch_num} with {len(batch_df)} rows for {marketplace} -> {shop_name}") for index, row in batch_df.iterrows(): try: # Convert row to dictionary and clean up product_data = self._clean_row_data(row.to_dict()) # Add marketplace and shop information product_data['marketplace'] = marketplace product_data['shop_name'] = shop_name # Validate required fields if not product_data.get('product_id'): logger.warning(f"Row {index}: Missing product_id, skipping") errors += 1 continue if not product_data.get('title'): logger.warning(f"Row {index}: Missing title, skipping") errors += 1 continue # Check if product exists existing_product = db.query(Product).filter( Product.product_id == product_data['product_id'] ).first() if existing_product: # Update existing product for key, value in product_data.items(): if key not in ['id', 'created_at'] and hasattr(existing_product, key): setattr(existing_product, key, value) existing_product.updated_at = datetime.utcnow() updated += 1 logger.debug(f"Updated product {product_data['product_id']} for {marketplace} and shop {shop_name}") else: # Create new product filtered_data = {k: v for k, v in product_data.items() if k not in ['id', 'created_at', 'updated_at'] and hasattr(Product, k)} new_product = Product(**filtered_data) db.add(new_product) imported += 1 logger.debug(f"Imported new product {product_data['product_id']} for {marketplace} and shop " f"{shop_name}") except Exception as e: logger.error(f"Error processing row: {e}") errors += 1 continue # Commit the batch try: db.commit() logger.info(f"Batch {batch_num} committed successfully") except Exception as e: logger.error(f"Failed to commit batch {batch_num}: {e}") db.rollback() # Count all rows in this batch as errors errors = len(batch_df) imported = 0 updated = 0 return { 'imported': imported, 'updated': updated, 'errors': errors }