feat: add inventory CSV import with warehouse/bin locations
- Add warehouse and bin_location columns to Inventory model - Create inventory_import_service for bulk TSV/CSV import - Add POST /api/v1/admin/inventory/import endpoint - Add Import button and modal to inventory admin page - Support both single-unit rows and explicit QUANTITY column File format: BIN, EAN, PRODUCT (optional), QUANTITY (optional) Products matched by GTIN/EAN, unmatched items reported. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
250
app/services/inventory_import_service.py
Normal file
250
app/services/inventory_import_service.py
Normal file
@@ -0,0 +1,250 @@
|
||||
# app/services/inventory_import_service.py
|
||||
"""
|
||||
Inventory import service for bulk importing stock from TSV/CSV files.
|
||||
|
||||
Supports two formats:
|
||||
1. One row per unit (quantity = count of rows):
|
||||
BIN EAN PRODUCT
|
||||
SA-10-02 0810050910101 Product Name
|
||||
SA-10-02 0810050910101 Product Name (2nd unit)
|
||||
|
||||
2. With explicit quantity column:
|
||||
BIN EAN PRODUCT QUANTITY
|
||||
SA-10-02 0810050910101 Product Name 12
|
||||
|
||||
Products are matched by GTIN/EAN to existing vendor products.
|
||||
"""
|
||||
|
||||
import csv
|
||||
import io
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models.database.inventory import Inventory
|
||||
from models.database.product import Product
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImportResult:
|
||||
"""Result of an inventory import operation."""
|
||||
|
||||
success: bool = True
|
||||
total_rows: int = 0
|
||||
entries_created: int = 0
|
||||
entries_updated: int = 0
|
||||
quantity_imported: int = 0
|
||||
unmatched_gtins: list = field(default_factory=list)
|
||||
errors: list = field(default_factory=list)
|
||||
|
||||
|
||||
class InventoryImportService:
|
||||
"""Service for importing inventory from TSV/CSV files."""
|
||||
|
||||
def import_from_text(
|
||||
self,
|
||||
db: Session,
|
||||
content: str,
|
||||
vendor_id: int,
|
||||
warehouse: str = "strassen",
|
||||
delimiter: str = "\t",
|
||||
clear_existing: bool = False,
|
||||
) -> ImportResult:
|
||||
"""
|
||||
Import inventory from TSV/CSV text content.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
content: TSV/CSV content as string
|
||||
vendor_id: Vendor ID for inventory
|
||||
warehouse: Warehouse name (default: "strassen")
|
||||
delimiter: Column delimiter (default: tab)
|
||||
clear_existing: If True, clear existing inventory before import
|
||||
|
||||
Returns:
|
||||
ImportResult with summary and errors
|
||||
"""
|
||||
result = ImportResult()
|
||||
|
||||
try:
|
||||
# Parse CSV/TSV
|
||||
reader = csv.DictReader(io.StringIO(content), delimiter=delimiter)
|
||||
|
||||
# Normalize headers (case-insensitive, strip whitespace)
|
||||
if reader.fieldnames:
|
||||
reader.fieldnames = [h.strip().upper() for h in reader.fieldnames]
|
||||
|
||||
# Validate required columns
|
||||
required = {"BIN", "EAN"}
|
||||
if not reader.fieldnames or not required.issubset(set(reader.fieldnames)):
|
||||
result.success = False
|
||||
result.errors.append(
|
||||
f"Missing required columns. Found: {reader.fieldnames}, Required: {required}"
|
||||
)
|
||||
return result
|
||||
|
||||
has_quantity = "QUANTITY" in reader.fieldnames
|
||||
|
||||
# Group entries by (EAN, BIN)
|
||||
# Key: (ean, bin) -> quantity
|
||||
inventory_data: dict[tuple[str, str], int] = defaultdict(int)
|
||||
product_names: dict[str, str] = {} # EAN -> product name (for logging)
|
||||
|
||||
for row in reader:
|
||||
result.total_rows += 1
|
||||
|
||||
ean = row.get("EAN", "").strip()
|
||||
bin_loc = row.get("BIN", "").strip()
|
||||
product_name = row.get("PRODUCT", "").strip()
|
||||
|
||||
if not ean or not bin_loc:
|
||||
result.errors.append(f"Row {result.total_rows}: Missing EAN or BIN")
|
||||
continue
|
||||
|
||||
# Get quantity
|
||||
if has_quantity:
|
||||
try:
|
||||
qty = int(row.get("QUANTITY", "1").strip())
|
||||
except ValueError:
|
||||
result.errors.append(
|
||||
f"Row {result.total_rows}: Invalid quantity '{row.get('QUANTITY')}'"
|
||||
)
|
||||
continue
|
||||
else:
|
||||
qty = 1 # Each row = 1 unit
|
||||
|
||||
inventory_data[(ean, bin_loc)] += qty
|
||||
if product_name:
|
||||
product_names[ean] = product_name
|
||||
|
||||
# Clear existing inventory if requested
|
||||
if clear_existing:
|
||||
db.query(Inventory).filter(
|
||||
Inventory.vendor_id == vendor_id,
|
||||
Inventory.warehouse == warehouse,
|
||||
).delete()
|
||||
db.flush()
|
||||
|
||||
# Build EAN to Product mapping for this vendor
|
||||
products = (
|
||||
db.query(Product)
|
||||
.filter(
|
||||
Product.vendor_id == vendor_id,
|
||||
Product.gtin.isnot(None),
|
||||
)
|
||||
.all()
|
||||
)
|
||||
ean_to_product: dict[str, Product] = {p.gtin: p for p in products if p.gtin}
|
||||
|
||||
# Track unmatched GTINs
|
||||
unmatched: dict[str, int] = {} # EAN -> total quantity
|
||||
|
||||
# Process inventory entries
|
||||
for (ean, bin_loc), quantity in inventory_data.items():
|
||||
product = ean_to_product.get(ean)
|
||||
|
||||
if not product:
|
||||
# Track unmatched
|
||||
if ean not in unmatched:
|
||||
unmatched[ean] = 0
|
||||
unmatched[ean] += quantity
|
||||
continue
|
||||
|
||||
# Upsert inventory entry
|
||||
existing = (
|
||||
db.query(Inventory)
|
||||
.filter(
|
||||
Inventory.product_id == product.id,
|
||||
Inventory.warehouse == warehouse,
|
||||
Inventory.bin_location == bin_loc,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if existing:
|
||||
existing.quantity = quantity
|
||||
existing.gtin = ean
|
||||
result.entries_updated += 1
|
||||
else:
|
||||
inv = Inventory(
|
||||
product_id=product.id,
|
||||
vendor_id=vendor_id,
|
||||
warehouse=warehouse,
|
||||
bin_location=bin_loc,
|
||||
location=bin_loc, # Legacy field
|
||||
quantity=quantity,
|
||||
gtin=ean,
|
||||
)
|
||||
db.add(inv)
|
||||
result.entries_created += 1
|
||||
|
||||
result.quantity_imported += quantity
|
||||
|
||||
db.flush()
|
||||
|
||||
# Format unmatched GTINs for result
|
||||
for ean, qty in unmatched.items():
|
||||
product_name = product_names.get(ean, "Unknown")
|
||||
result.unmatched_gtins.append(
|
||||
{"gtin": ean, "quantity": qty, "product_name": product_name}
|
||||
)
|
||||
|
||||
if result.unmatched_gtins:
|
||||
logger.warning(
|
||||
f"Import had {len(result.unmatched_gtins)} unmatched GTINs"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Inventory import failed")
|
||||
result.success = False
|
||||
result.errors.append(str(e))
|
||||
|
||||
return result
|
||||
|
||||
def import_from_file(
|
||||
self,
|
||||
db: Session,
|
||||
file_path: str,
|
||||
vendor_id: int,
|
||||
warehouse: str = "strassen",
|
||||
clear_existing: bool = False,
|
||||
) -> ImportResult:
|
||||
"""
|
||||
Import inventory from a TSV/CSV file.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
file_path: Path to TSV/CSV file
|
||||
vendor_id: Vendor ID for inventory
|
||||
warehouse: Warehouse name
|
||||
clear_existing: If True, clear existing inventory before import
|
||||
|
||||
Returns:
|
||||
ImportResult with summary and errors
|
||||
"""
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except Exception as e:
|
||||
return ImportResult(success=False, errors=[f"Failed to read file: {e}"])
|
||||
|
||||
# Detect delimiter
|
||||
first_line = content.split("\n")[0] if content else ""
|
||||
delimiter = "\t" if "\t" in first_line else ","
|
||||
|
||||
return self.import_from_text(
|
||||
db=db,
|
||||
content=content,
|
||||
vendor_id=vendor_id,
|
||||
warehouse=warehouse,
|
||||
delimiter=delimiter,
|
||||
clear_existing=clear_existing,
|
||||
)
|
||||
|
||||
|
||||
# Singleton instance
|
||||
inventory_import_service = InventoryImportService()
|
||||
Reference in New Issue
Block a user