130 lines
4.3 KiB
Python
130 lines
4.3 KiB
Python
# utils/data_processing.py
|
|
import re
|
|
import pandas as pd
|
|
from typing import Tuple, Optional
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class GTINProcessor:
|
|
"""Handles GTIN normalization and validation"""
|
|
|
|
VALID_LENGTHS = [8, 12, 13, 14]
|
|
|
|
def normalize(self, gtin_value: any) -> Optional[str]:
|
|
"""
|
|
Normalize GTIN to proper format
|
|
Returns None for invalid GTINs
|
|
"""
|
|
if not gtin_value or pd.isna(gtin_value):
|
|
return None
|
|
|
|
gtin_str = str(gtin_value).strip()
|
|
if not gtin_str:
|
|
return None
|
|
|
|
# Remove decimal point (e.g., "889698116923.0" -> "889698116923")
|
|
if '.' in gtin_str:
|
|
gtin_str = gtin_str.split('.')[0]
|
|
|
|
# Keep only digits
|
|
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
|
|
|
|
if not gtin_clean:
|
|
return None
|
|
|
|
# Validate and normalize length
|
|
length = len(gtin_clean)
|
|
|
|
if length in self.VALID_LENGTHS:
|
|
# Standard lengths - pad appropriately
|
|
if length == 8:
|
|
return gtin_clean.zfill(8) # EAN-8
|
|
elif length == 12:
|
|
return gtin_clean.zfill(12) # UPC-A
|
|
elif length == 13:
|
|
return gtin_clean.zfill(13) # EAN-13
|
|
elif length == 14:
|
|
return gtin_clean.zfill(14) # GTIN-14
|
|
|
|
elif length > 14:
|
|
# Too long - truncate to EAN-13
|
|
logger.warning(f"GTIN too long, truncating: {gtin_clean}")
|
|
return gtin_clean[-13:]
|
|
|
|
elif 0 < length < 8:
|
|
# Too short - pad to UPC-A
|
|
logger.warning(f"GTIN too short, padding: {gtin_clean}")
|
|
return gtin_clean.zfill(12)
|
|
|
|
logger.warning(f"Invalid GTIN format: '{gtin_value}'")
|
|
return None
|
|
|
|
def validate(self, gtin: str) -> bool:
|
|
"""Validate GTIN format"""
|
|
if not gtin:
|
|
return False
|
|
return len(gtin) in self.VALID_LENGTHS and gtin.isdigit()
|
|
|
|
|
|
class PriceProcessor:
|
|
"""Handles price parsing and currency extraction"""
|
|
|
|
CURRENCY_PATTERNS = {
|
|
# Amount followed by currency
|
|
r'([0-9.,]+)\s*(EUR|€)': lambda m: (m.group(1), 'EUR'),
|
|
r'([0-9.,]+)\s*(USD|\$)': lambda m: (m.group(1), 'USD'),
|
|
r'([0-9.,]+)\s*(GBP|£)': lambda m: (m.group(1), 'GBP'),
|
|
r'([0-9.,]+)\s*(CHF)': lambda m: (m.group(1), 'CHF'),
|
|
r'([0-9.,]+)\s*(CAD|AUD|JPY|¥)': lambda m: (m.group(1), m.group(2).upper()),
|
|
|
|
# Currency followed by amount
|
|
r'(EUR|€)\s*([0-9.,]+)': lambda m: (m.group(2), 'EUR'),
|
|
r'(USD|\$)\s*([0-9.,]+)': lambda m: (m.group(2), 'USD'),
|
|
r'(GBP|£)\s*([0-9.,]+)': lambda m: (m.group(2), 'GBP'),
|
|
|
|
# Generic 3-letter currency codes
|
|
r'([0-9.,]+)\s*([A-Z]{3})': lambda m: (m.group(1), m.group(2)),
|
|
r'([A-Z]{3})\s*([0-9.,]+)': lambda m: (m.group(2), m.group(1)),
|
|
}
|
|
|
|
def parse_price_currency(self, price_str: any) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Parse price string into (price, currency) tuple
|
|
Returns (None, None) if parsing fails
|
|
"""
|
|
if not price_str or pd.isna(price_str):
|
|
return None, None
|
|
|
|
price_str = str(price_str).strip()
|
|
if not price_str:
|
|
return None, None
|
|
|
|
# Try each pattern
|
|
for pattern, extract_func in self.CURRENCY_PATTERNS.items():
|
|
match = re.search(pattern, price_str, re.IGNORECASE)
|
|
if match:
|
|
try:
|
|
price_val, currency_val = extract_func(match)
|
|
# Normalize price (remove spaces, handle comma as decimal)
|
|
price_val = price_val.replace(' ', '').replace(',', '.')
|
|
# Validate numeric
|
|
float(price_val)
|
|
return price_val, currency_val.upper()
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
|
|
# Fallback: extract just numbers
|
|
number_match = re.search(r'([0-9.,]+)', price_str)
|
|
if number_match:
|
|
try:
|
|
price_val = number_match.group(1).replace(',', '.')
|
|
float(price_val) # Validate
|
|
return price_val, None
|
|
except ValueError:
|
|
pass
|
|
|
|
logger.warning(f"Could not parse price: '{price_str}'")
|
|
return price_str, None
|