Files
orion/app/utils/csv_processor.py
Samir Boulahtit f2af3aae29 feat: update CSV import to support multi-language translations
- Add language parameter to import endpoints and background tasks
- Extract translation fields (title, description, short_description)
- Create/update MarketplaceProductTranslation records during import
- Add MarketplaceProductTranslationSchema for API responses
- Map product_type column to product_type_raw to avoid enum conflict
- Parse prices to numeric format (price_numeric, sale_price_numeric)
- Update marketplace product service for translation-based lookups
- Update CSV export to retrieve titles from translations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 17:29:13 +01:00

468 lines
17 KiB
Python

# app/utils/csv_processor.py
"""CSV processor utilities for marketplace product imports.
This module provides classes and functions for:
- Downloading and parsing CSV files with multiple encoding support
- Normalizing column names to match database schema
- Creating/updating MarketplaceProduct records with translations
"""
import logging
import re
from datetime import UTC, datetime
from io import StringIO
from typing import Any
import pandas as pd
import requests
from sqlalchemy import literal
from sqlalchemy.orm import Session
from models.database.marketplace_product import MarketplaceProduct
from models.database.marketplace_product_translation import MarketplaceProductTranslation
logger = logging.getLogger(__name__)
class CSVProcessor:
"""Handles CSV import with robust parsing and batching."""
ENCODINGS = ["utf-8", "latin-1", "iso-8859-1", "cp1252", "utf-8-sig"]
PARSING_CONFIGS = [
# Try auto-detection first
{"sep": None, "engine": "python"},
# Try semicolon (common in European CSVs)
{"sep": ";", "engine": "python"},
# Try comma
{"sep": ",", "engine": "python"},
# Try tab
{"sep": "\t", "engine": "python"},
]
# Fields that belong to the translation table, not MarketplaceProduct
TRANSLATION_FIELDS = {"title", "description", "short_description"}
COLUMN_MAPPING = {
# Standard variations
"id": "marketplace_product_id",
"ID": "marketplace_product_id",
"MarketplaceProduct ID": "marketplace_product_id",
"name": "title",
"Name": "title",
"product_name": "title",
"MarketplaceProduct Name": "title",
# Google Shopping feed standard
"g:id": "marketplace_product_id",
"g:title": "title",
"g:description": "description",
"g:link": "link",
"g:image_link": "image_link",
"g:availability": "availability",
"g:price": "price",
"g:brand": "brand",
"g:gtin": "gtin",
"g:mpn": "mpn",
"g:condition": "condition",
"g:adult": "adult",
"g:multipack": "multipack",
"g:is_bundle": "is_bundle",
"g:age_group": "age_group",
"g:color": "color",
"g:gender": "gender",
"g:material": "material",
"g:pattern": "pattern",
"g:size": "size",
"g:size_type": "size_type",
"g:size_system": "size_system",
"g:item_group_id": "item_group_id",
"g:google_product_category": "google_product_category",
"g:product_type": "product_type_raw", # Maps to product_type_raw (renamed)
"product_type": "product_type_raw", # Also map plain product_type
"g:custom_label_0": "custom_label_0",
"g:custom_label_1": "custom_label_1",
"g:custom_label_2": "custom_label_2",
"g:custom_label_3": "custom_label_3",
"g:custom_label_4": "custom_label_4",
# Handle complex shipping column
"shipping(country:price:max_handling_time:min_transit_time:max_transit_time)": "shipping",
}
def __init__(self):
"""Class constructor."""
from app.utils.data_processing import GTINProcessor, PriceProcessor
self.gtin_processor = GTINProcessor()
self.price_processor = PriceProcessor()
def download_csv(self, url: str) -> str:
"""Download and decode CSV with multiple encoding attempts."""
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
content = response.content
# Try different encodings
for encoding in self.ENCODINGS:
try:
decoded_content = content.decode(encoding)
logger.info(f"Successfully decoded CSV with encoding: {encoding}")
return decoded_content
except UnicodeDecodeError:
continue
# Fallback with error ignoring
decoded_content = content.decode("utf-8", errors="ignore")
logger.warning("Used UTF-8 with error ignoring for CSV decoding")
return decoded_content
except requests.RequestException as e:
logger.error(f"Error downloading CSV: {e}")
raise
def parse_csv(self, csv_content: str) -> pd.DataFrame:
"""Parse CSV with multiple separator attempts."""
for config in self.PARSING_CONFIGS:
try:
df = pd.read_csv(
StringIO(csv_content),
on_bad_lines="skip",
quotechar='"',
skip_blank_lines=True,
skipinitialspace=True,
**config,
)
logger.info(f"Successfully parsed CSV with config: {config}")
return df
except pd.errors.ParserError:
continue
raise pd.errors.ParserError("Could not parse CSV with any configuration")
def normalize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize column names using mapping."""
# Clean column names
df.columns = df.columns.str.strip()
# Apply mapping
df = df.rename(columns=self.COLUMN_MAPPING)
logger.info(f"Normalized columns: {list(df.columns)}")
return df
def _parse_price_to_numeric(self, price_str: str | None) -> float | None:
"""Parse price string like '19.99 EUR' to float."""
if not price_str:
return None
# Extract numeric value
numbers = re.findall(r"[\d.,]+", str(price_str))
if numbers:
num_str = numbers[0].replace(",", ".")
try:
return float(num_str)
except ValueError:
pass
return None
def _clean_row_data(self, row_data: dict[str, Any]) -> dict[str, Any]:
"""Process a single row with data normalization."""
# Handle NaN values
processed_data = {k: (v if pd.notna(v) else None) for k, v in row_data.items()}
# Process GTIN
if processed_data.get("gtin"):
processed_data["gtin"] = self.gtin_processor.normalize(
processed_data["gtin"]
)
# Process price and currency
if processed_data.get("price"):
parsed_price, currency = self.price_processor.parse_price_currency(
processed_data["price"]
)
# Store both raw price string and numeric value
raw_price = processed_data["price"]
processed_data["price"] = parsed_price
processed_data["price_numeric"] = self._parse_price_to_numeric(raw_price)
processed_data["currency"] = currency
# Process sale_price
if processed_data.get("sale_price"):
raw_sale_price = processed_data["sale_price"]
parsed_sale_price, _ = self.price_processor.parse_price_currency(
processed_data["sale_price"]
)
processed_data["sale_price"] = parsed_sale_price
processed_data["sale_price_numeric"] = self._parse_price_to_numeric(
raw_sale_price
)
# Clean MPN (remove .0 endings)
if processed_data.get("mpn"):
mpn_str = str(processed_data["mpn"]).strip()
if mpn_str.endswith(".0"):
processed_data["mpn"] = mpn_str[:-2]
# Handle multipack type conversion
if processed_data.get("multipack") is not None:
try:
processed_data["multipack"] = int(float(processed_data["multipack"]))
except (ValueError, TypeError):
processed_data["multipack"] = None
return processed_data
def _extract_translation_data(
self, product_data: dict[str, Any]
) -> dict[str, Any]:
"""Extract translation fields from product data.
Returns a dict with title, description, etc. that belong
in the translation table. Removes these fields from product_data in place.
"""
translation_data = {}
for field in self.TRANSLATION_FIELDS:
if field in product_data:
translation_data[field] = product_data.pop(field)
return translation_data
def _create_or_update_translation(
self,
db: Session,
marketplace_product: MarketplaceProduct,
translation_data: dict[str, Any],
language: str = "en",
source_file: str | None = None,
) -> None:
"""Create or update a translation record for the marketplace product."""
if not translation_data.get("title"):
# Title is required for translations
return
# Check if translation exists
existing_translation = (
db.query(MarketplaceProductTranslation)
.filter(
MarketplaceProductTranslation.marketplace_product_id
== marketplace_product.id,
MarketplaceProductTranslation.language == language,
)
.first()
)
if existing_translation:
# Update existing translation
for key, value in translation_data.items():
if hasattr(existing_translation, key):
setattr(existing_translation, key, value)
existing_translation.updated_at = datetime.now(UTC)
if source_file:
existing_translation.source_file = source_file
else:
# Create new translation
new_translation = MarketplaceProductTranslation(
marketplace_product_id=marketplace_product.id,
language=language,
title=translation_data.get("title"),
description=translation_data.get("description"),
short_description=translation_data.get("short_description"),
source_file=source_file,
)
db.add(new_translation)
async def process_marketplace_csv_from_url(
self,
url: str,
marketplace: str,
vendor_name: str,
batch_size: int,
db: Session,
language: str = "en",
) -> dict[str, Any]:
"""
Process CSV from URL with marketplace and vendor information.
Args:
url: URL to the CSV file
marketplace: Name of the marketplace (e.g., 'Letzshop', 'Amazon')
vendor_name: Name of the vendor
batch_size: Number of rows to process in each batch
db: Database session
language: Language code for translations (default: 'en')
Returns:
Dictionary with processing results
"""
logger.info(
f"Starting marketplace CSV import from {url} for {marketplace} -> {vendor_name} (lang={language})"
)
# Download and parse CSV
csv_content = self.download_csv(url)
df = self.parse_csv(csv_content)
df = self.normalize_columns(df)
logger.info(f"Processing CSV with {len(df)} rows and {len(df.columns)} columns")
imported = 0
updated = 0
errors = 0
# Extract source file name from URL
source_file = url.split("/")[-1] if "/" in url else url
# Process in batches
for i in range(0, len(df), batch_size):
batch_df = df.iloc[i : i + batch_size]
batch_result = await self._process_marketplace_batch(
batch_df,
marketplace,
vendor_name,
db,
i // batch_size + 1,
language=language,
source_file=source_file,
)
imported += batch_result["imported"]
updated += batch_result["updated"]
errors += batch_result["errors"]
logger.info(f"Processed batch {i // batch_size + 1}: {batch_result}")
return {
"total_processed": imported + updated + errors,
"imported": imported,
"updated": updated,
"errors": errors,
"marketplace": marketplace,
"vendor_name": vendor_name,
"language": language,
}
async def _process_marketplace_batch(
self,
batch_df: pd.DataFrame,
marketplace: str,
vendor_name: str,
db: Session,
batch_num: int,
language: str = "en",
source_file: str | None = None,
) -> dict[str, int]:
"""Process a batch of CSV rows with marketplace information."""
imported = 0
updated = 0
errors = 0
logger.info(
f"Processing batch {batch_num} with {len(batch_df)} rows for "
f"{marketplace} -> {vendor_name}"
)
for index, row in batch_df.iterrows():
try:
# Convert row to dictionary and clean up
product_data = self._clean_row_data(row.to_dict())
# Extract translation fields BEFORE processing product
translation_data = self._extract_translation_data(product_data)
# Add marketplace and vendor information
product_data["marketplace"] = marketplace
product_data["vendor_name"] = vendor_name
# Validate required fields
if not product_data.get("marketplace_product_id"):
logger.warning(
f"Row {index}: Missing marketplace_product_id, skipping"
)
errors += 1
continue
# Title is now required in translation_data
if not translation_data.get("title"):
logger.warning(f"Row {index}: Missing title, skipping")
errors += 1
continue
# Check if product exists
existing_product = (
db.query(MarketplaceProduct)
.filter(
MarketplaceProduct.marketplace_product_id
== literal(product_data["marketplace_product_id"])
)
.first()
)
if existing_product:
# Update existing product (only non-translation fields)
for key, value in product_data.items():
if key not in ["id", "created_at"] and hasattr(
existing_product, key
):
setattr(existing_product, key, value)
existing_product.updated_at = datetime.now(UTC)
# Update or create translation
self._create_or_update_translation(
db,
existing_product,
translation_data,
language=language,
source_file=source_file,
)
updated += 1
logger.debug(
f"Updated product {product_data['marketplace_product_id']} for "
f"{marketplace} and vendor {vendor_name}"
)
else:
# Create new product (filter to valid model fields)
filtered_data = {
k: v
for k, v in product_data.items()
if k not in ["id", "created_at", "updated_at"]
and hasattr(MarketplaceProduct, k)
}
new_product = MarketplaceProduct(**filtered_data)
db.add(new_product)
db.flush() # Get the ID for the translation
# Create translation for new product
self._create_or_update_translation(
db,
new_product,
translation_data,
language=language,
source_file=source_file,
)
imported += 1
logger.debug(
f"Imported new product {product_data['marketplace_product_id']} "
f"for {marketplace} and vendor {vendor_name}"
)
except Exception as e:
logger.error(f"Error processing row: {e}")
errors += 1
continue
# Commit the batch
try:
db.commit()
logger.info(f"Batch {batch_num} committed successfully")
except Exception as e:
logger.error(f"Failed to commit batch {batch_num}: {e}")
db.rollback()
# Count all rows in this batch as errors
errors = len(batch_df)
imported = 0
updated = 0
return {"imported": imported, "updated": updated, "errors": errors}