Moved utils folder to app/utils folder

This commit is contained in:
2025-09-21 21:11:12 +02:00
parent 6c4310a594
commit e348476b65
10 changed files with 8 additions and 8 deletions

0
app/utils/__init__.py Normal file
View File

332
app/utils/csv_processor.py Normal file
View File

@@ -0,0 +1,332 @@
# app/utils/csv_processor.py
"""CSV processor utilities ....
This module provides classes and functions for:
- ....
- ....
- ....
"""
import logging
from datetime import datetime
from io import StringIO
from typing import Any, Dict
import pandas as pd
import requests
from sqlalchemy import literal
from sqlalchemy.orm import Session
from models.database.product import Product
logger = logging.getLogger(__name__)
class CSVProcessor:
"""Handles CSV import with robust parsing and batching."""
ENCODINGS = ["utf-8", "latin-1", "iso-8859-1", "cp1252", "utf-8-sig"]
PARSING_CONFIGS = [
# Try auto-detection first
{"sep": None, "engine": "python"},
# Try semicolon (common in European CSVs)
{"sep": ";", "engine": "python"},
# Try comma
{"sep": ",", "engine": "python"},
# Try tab
{"sep": "\t", "engine": "python"},
]
COLUMN_MAPPING = {
# Standard variations
"id": "product_id",
"ID": "product_id",
"Product ID": "product_id",
"name": "title",
"Name": "title",
"product_name": "title",
"Product Name": "title",
# Google Shopping feed standard
"g:id": "product_id",
"g:title": "title",
"g:description": "description",
"g:link": "link",
"g:image_link": "image_link",
"g:availability": "availability",
"g:price": "price",
"g:brand": "brand",
"g:gtin": "gtin",
"g:mpn": "mpn",
"g:condition": "condition",
"g:adult": "adult",
"g:multipack": "multipack",
"g:is_bundle": "is_bundle",
"g:age_group": "age_group",
"g:color": "color",
"g:gender": "gender",
"g:material": "material",
"g:pattern": "pattern",
"g:size": "size",
"g:size_type": "size_type",
"g:size_system": "size_system",
"g:item_group_id": "item_group_id",
"g:google_product_category": "google_product_category",
"g:product_type": "product_type",
"g:custom_label_0": "custom_label_0",
"g:custom_label_1": "custom_label_1",
"g:custom_label_2": "custom_label_2",
"g:custom_label_3": "custom_label_3",
"g:custom_label_4": "custom_label_4",
# Handle complex shipping column
"shipping(country:price:max_handling_time:min_transit_time:max_transit_time)": "shipping",
}
def __init__(self):
"""Class constructor."""
from app.utils.data_processing import GTINProcessor, PriceProcessor
self.gtin_processor = GTINProcessor()
self.price_processor = PriceProcessor()
def download_csv(self, url: str) -> str:
"""Download and decode CSV with multiple encoding attempts."""
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
content = response.content
# Try different encodings
for encoding in self.ENCODINGS:
try:
decoded_content = content.decode(encoding)
logger.info(f"Successfully decoded CSV with encoding: {encoding}")
return decoded_content
except UnicodeDecodeError:
continue
# Fallback with error ignoring
decoded_content = content.decode("utf-8", errors="ignore")
logger.warning("Used UTF-8 with error ignoring for CSV decoding")
return decoded_content
except requests.RequestException as e:
logger.error(f"Error downloading CSV: {e}")
raise
def parse_csv(self, csv_content: str) -> pd.DataFrame:
"""Parse CSV with multiple separator attempts."""
for config in self.PARSING_CONFIGS:
try:
df = pd.read_csv(
StringIO(csv_content),
on_bad_lines="skip",
quotechar='"',
skip_blank_lines=True,
skipinitialspace=True,
**config,
)
logger.info(f"Successfully parsed CSV with config: {config}")
return df
except pd.errors.ParserError:
continue
raise pd.errors.ParserError("Could not parse CSV with any configuration")
def normalize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize column names using mapping."""
# Clean column names
df.columns = df.columns.str.strip()
# Apply mapping
df = df.rename(columns=self.COLUMN_MAPPING)
logger.info(f"Normalized columns: {list(df.columns)}")
return df
def _clean_row_data(self, row_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single row with data normalization."""
# Handle NaN values
processed_data = {k: (v if pd.notna(v) else None) for k, v in row_data.items()}
# Process GTIN
if processed_data.get("gtin"):
processed_data["gtin"] = self.gtin_processor.normalize(
processed_data["gtin"]
)
# Process price and currency
if processed_data.get("price"):
parsed_price, currency = self.price_processor.parse_price_currency(
processed_data["price"]
)
processed_data["price"] = parsed_price
processed_data["currency"] = currency
# Process sale_price
if processed_data.get("sale_price"):
parsed_sale_price, _ = self.price_processor.parse_price_currency(
processed_data["sale_price"]
)
processed_data["sale_price"] = parsed_sale_price
# Clean MPN (remove .0 endings)
if processed_data.get("mpn"):
mpn_str = str(processed_data["mpn"]).strip()
if mpn_str.endswith(".0"):
processed_data["mpn"] = mpn_str[:-2]
# Handle multipack type conversion
if processed_data.get("multipack") is not None:
try:
processed_data["multipack"] = int(float(processed_data["multipack"]))
except (ValueError, TypeError):
processed_data["multipack"] = None
return processed_data
async def process_marketplace_csv_from_url(
self, url: str, marketplace: str, shop_name: str, batch_size: int, db: Session
) -> Dict[str, Any]:
"""
Process CSV from URL with marketplace and shop information.
Args:
url: URL to the CSV file
marketplace: Name of the marketplace (e.g., 'Letzshop', 'Amazon')
shop_name: Name of the shop
batch_size: Number of rows to process in each batch
db: Database session
Returns:
Dictionary with processing results
"""
logger.info(
f"Starting marketplace CSV import from {url} for {marketplace} -> {shop_name}"
)
# Download and parse CSV
csv_content = self.download_csv(url)
df = self.parse_csv(csv_content)
df = self.normalize_columns(df)
logger.info(f"Processing CSV with {len(df)} rows and {len(df.columns)} columns")
imported = 0
updated = 0
errors = 0
# Process in batches
for i in range(0, len(df), batch_size):
batch_df = df.iloc[i : i + batch_size]
batch_result = await self._process_marketplace_batch(
batch_df, marketplace, shop_name, db, i // batch_size + 1
)
imported += batch_result["imported"]
updated += batch_result["updated"]
errors += batch_result["errors"]
logger.info(f"Processed batch {i // batch_size + 1}: {batch_result}")
return {
"total_processed": imported + updated + errors,
"imported": imported,
"updated": updated,
"errors": errors,
"marketplace": marketplace,
"shop_name": shop_name,
}
async def _process_marketplace_batch(
self,
batch_df: pd.DataFrame,
marketplace: str,
shop_name: str,
db: Session,
batch_num: int,
) -> Dict[str, int]:
"""Process a batch of CSV rows with marketplace information."""
imported = 0
updated = 0
errors = 0
logger.info(
f"Processing batch {batch_num} with {len(batch_df)} rows for "
f"{marketplace} -> {shop_name}"
)
for index, row in batch_df.iterrows():
try:
# Convert row to dictionary and clean up
product_data = self._clean_row_data(row.to_dict())
# Add marketplace and shop information
product_data["marketplace"] = marketplace
product_data["shop_name"] = shop_name
# Validate required fields
if not product_data.get("product_id"):
logger.warning(f"Row {index}: Missing product_id, skipping")
errors += 1
continue
if not product_data.get("title"):
logger.warning(f"Row {index}: Missing title, skipping")
errors += 1
continue
# Check if product exists
existing_product = (
db.query(Product)
.filter(Product.product_id == literal(product_data["product_id"]))
.first()
)
if existing_product:
# Update existing product
for key, value in product_data.items():
if key not in ["id", "created_at"] and hasattr(
existing_product, key
):
setattr(existing_product, key, value)
existing_product.updated_at = datetime.utcnow()
updated += 1
logger.debug(
f"Updated product {product_data['product_id']} for "
f"{marketplace} and shop {shop_name}"
)
else:
# Create new product
filtered_data = {
k: v
for k, v in product_data.items()
if k not in ["id", "created_at", "updated_at"]
and hasattr(Product, k)
}
new_product = Product(**filtered_data)
db.add(new_product)
imported += 1
logger.debug(
f"Imported new product {product_data['product_id']} "
f"for {marketplace} and shop {shop_name}"
)
except Exception as e:
logger.error(f"Error processing row: {e}")
errors += 1
continue
# Commit the batch
try:
db.commit()
logger.info(f"Batch {batch_num} committed successfully")
except Exception as e:
logger.error(f"Failed to commit batch {batch_num}: {e}")
db.rollback()
# Count all rows in this batch as errors
errors = len(batch_df)
imported = 0
updated = 0
return {"imported": imported, "updated": updated, "errors": errors}

View File

@@ -0,0 +1,140 @@
# utils/data_processing.py
"""Data processing utilities for GTIN validation and price parsing.
This module provides classes and functions for:
- GTIN (Global Trade Item Number) validation and normalization
- Price parsing with currency detection
- Data cleaning and validation utilities
"""
import logging
import re
from typing import Optional, Tuple
import pandas as pd
logger = logging.getLogger(__name__)
class GTINProcessor:
"""Handles GTIN normalization and validation."""
VALID_LENGTHS = [8, 12, 13, 14]
def normalize(self, gtin_value: any) -> Optional[str]:
"""
Normalize GTIN to proper format.
Returns None for invalid GTINs.
"""
if not gtin_value or pd.isna(gtin_value):
return None
gtin_str = str(gtin_value).strip()
if not gtin_str:
return None
# Remove decimal point (e.g., "889698116923.0" -> "889698116923")
if "." in gtin_str:
gtin_str = gtin_str.split(".")[0]
# Keep only digits
gtin_clean = "".join(filter(str.isdigit, gtin_str))
if not gtin_clean:
return None
# Validate and normalize length
length = len(gtin_clean)
if length in self.VALID_LENGTHS:
# Standard lengths - pad appropriately
if length == 8:
return gtin_clean.zfill(8) # EAN-8
elif length == 12:
return gtin_clean.zfill(12) # UPC-A
elif length == 13:
return gtin_clean.zfill(13) # EAN-13
elif length == 14:
return gtin_clean.zfill(14) # GTIN-14
elif length > 14:
# Too long - truncate to EAN-13
logger.warning(f"GTIN too long, truncating: {gtin_clean}")
return gtin_clean[-13:]
elif 0 < length < 8:
# Too short - pad to EAN-13
logger.warning(f"GTIN too short, padding: {gtin_clean}")
return gtin_clean.zfill(13)
logger.warning(f"Invalid GTIN format: '{gtin_value}'")
return None
def validate(self, gtin: str) -> bool:
"""Validate GTIN format."""
if not gtin:
return False
return len(gtin) in self.VALID_LENGTHS and gtin.isdigit()
class PriceProcessor:
"""Handles price parsing and currency extraction."""
CURRENCY_PATTERNS = {
# Amount followed by currency
r"([0-9.,]+)\s*(EUR|€)": lambda m: (m.group(1), "EUR"),
r"([0-9.,]+)\s*(USD|\$)": lambda m: (m.group(1), "USD"),
r"([0-9.,]+)\s*(GBP|£)": lambda m: (m.group(1), "GBP"),
r"([0-9.,]+)\s*(CHF)": lambda m: (m.group(1), "CHF"),
r"([0-9.,]+)\s*(CAD|AUD|JPY|¥)": lambda m: (m.group(1), m.group(2).upper()),
# Currency followed by amount
r"(EUR|€)\s*([0-9.,]+)": lambda m: (m.group(2), "EUR"),
r"(USD|\$)\s*([0-9.,]+)": lambda m: (m.group(2), "USD"),
r"(GBP|£)\s*([0-9.,]+)": lambda m: (m.group(2), "GBP"),
# Generic 3-letter currency codes
r"([0-9.,]+)\s*([A-Z]{3})": lambda m: (m.group(1), m.group(2)),
r"([A-Z]{3})\s*([0-9.,]+)": lambda m: (m.group(2), m.group(1)),
}
def parse_price_currency(
self, price_str: any
) -> Tuple[Optional[str], Optional[str]]:
"""
Parse price string into (price, currency) tuple.
Returns (None, None) if parsing fails
"""
if not price_str or pd.isna(price_str):
return None, None
price_str = str(price_str).strip()
if not price_str:
return None, None
# Try each pattern
for pattern, extract_func in self.CURRENCY_PATTERNS.items():
match = re.search(pattern, price_str, re.IGNORECASE)
if match:
try:
price_val, currency_val = extract_func(match)
# Normalize price (remove spaces, handle comma as decimal)
price_val = price_val.replace(" ", "").replace(",", ".")
# Validate numeric
float(price_val)
return price_val, currency_val.upper()
except (ValueError, AttributeError):
continue
# Fallback: extract just numbers
number_match = re.search(r"([0-9.,]+)", price_str)
if number_match:
try:
price_val = number_match.group(1).replace(",", ".")
float(price_val) # Validate
return price_val, None
except ValueError:
pass
logger.warning(f"Could not parse price: '{price_str}'")
return price_str, None

43
app/utils/database.py Normal file
View File

@@ -0,0 +1,43 @@
# utils/database.py
"""Database utilities ....
This module provides classes and functions for:
- ....
- ....
- ....
"""
import logging
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
logger = logging.getLogger(__name__)
def get_db_engine(database_url: str):
"""Create database engine with connection pooling."""
if database_url.startswith("sqlite"):
# SQLite configuration
engine = create_engine(
database_url, connect_args={"check_same_thread": False}, echo=False
)
else:
# PostgreSQL configuration with connection pooling
engine = create_engine(
database_url,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False,
)
logger.info(f"Database engine created for: " f"{database_url.split('@')[0]}@...")
return engine
def get_session_local(engine):
"""Create session factory."""
return sessionmaker(autocommit=False, autoflush=False, bind=engine)