feat: enhance Letzshop order import with EAN matching and stats

- Add historical order import with pagination support
- Add customer_locale, shipping_country_iso, billing_country_iso columns
- Add gtin/gtin_type columns to Product table for EAN matching
- Fix order stats to count all orders server-side (not just visible page)
- Add GraphQL introspection script with tracking workaround tests
- Enrich inventory units with EAN, MPN, SKU, product name
- Add LetzshopOrderStats schema for proper status counts

Migrations:
- a9a86cef6cca: Add locale and country fields to letzshop_orders
- cb88bc9b5f86: Add gtin columns to products table

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-18 21:04:33 +01:00
parent 6d6c8b44d3
commit 0ab10128ae
17 changed files with 3451 additions and 94 deletions

View File

@@ -20,6 +20,7 @@ from models.database.letzshop import (
VendorLetzshopCredentials,
)
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.product import Product
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
@@ -197,6 +198,31 @@ class LetzshopOrderService:
return orders, total
def get_order_stats(self, vendor_id: int) -> dict[str, int]:
"""
Get order counts by sync_status for a vendor.
Returns:
Dict with counts for each status: pending, confirmed, rejected, shipped
"""
status_counts = (
self.db.query(
LetzshopOrder.sync_status,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.sync_status)
.all()
)
# Convert to dict with default 0 for missing statuses
stats = {"pending": 0, "confirmed": 0, "rejected": 0, "shipped": 0}
for status, count in status_counts:
if status in stats:
stats[status] = count
return stats
def create_order(
self,
vendor_id: int,
@@ -205,21 +231,94 @@ class LetzshopOrderService:
"""Create a new Letzshop order from shipment data."""
order_data = shipment_data.get("order", {})
# Handle total - can be a string like "99.99 EUR" or just a number
total = order_data.get("total", "")
total_amount = str(total) if total else ""
# Default currency to EUR (Letzshop is Luxembourg-based)
currency = "EUR"
# Extract customer name from shipping address
ship_address = order_data.get("shipAddress", {}) or {}
first_name = ship_address.get("firstName", "") or ""
last_name = ship_address.get("lastName", "") or ""
customer_name = f"{first_name} {last_name}".strip() or None
# Extract customer locale (language preference for invoicing)
customer_locale = order_data.get("locale")
# Extract country codes
ship_country = ship_address.get("country", {}) or {}
shipping_country_iso = ship_country.get("iso")
bill_address = order_data.get("billAddress", {}) or {}
bill_country = bill_address.get("country", {}) or {}
billing_country_iso = bill_country.get("iso")
# inventoryUnits is a direct array, not wrapped in nodes
inventory_units_data = shipment_data.get("inventoryUnits", [])
if isinstance(inventory_units_data, dict):
# Handle legacy format with nodes wrapper
inventory_units_data = inventory_units_data.get("nodes", [])
# Extract enriched inventory unit data with product details
enriched_units = []
for unit in inventory_units_data:
variant = unit.get("variant", {}) or {}
product = variant.get("product", {}) or {}
trade_id = variant.get("tradeId") or {}
product_name = product.get("name", {}) or {}
enriched_unit = {
"id": unit.get("id"),
"state": unit.get("state"),
# Product identifiers
"ean": trade_id.get("number"),
"ean_type": trade_id.get("parser"),
"sku": variant.get("sku"),
"mpn": variant.get("mpn"),
# Product info
"product_name": (
product_name.get("en")
or product_name.get("fr")
or product_name.get("de")
),
"product_name_translations": product_name,
# Pricing
"price": variant.get("price"),
"variant_id": variant.get("id"),
}
enriched_units.append(enriched_unit)
# Map Letzshop state to sync_status
# Letzshop shipment states (from docs):
# - unconfirmed: needs to be confirmed/rejected
# - confirmed: at least one product confirmed
# - declined: all products rejected
# Note: "shipped" is not a state - tracking is set separately via tracking field
letzshop_state = shipment_data.get("state", "unconfirmed")
state_mapping = {
"unconfirmed": "pending",
"confirmed": "confirmed",
"declined": "rejected",
}
sync_status = state_mapping.get(letzshop_state, "confirmed")
order = LetzshopOrder(
vendor_id=vendor_id,
letzshop_order_id=order_data.get("id", ""),
letzshop_shipment_id=shipment_data["id"],
letzshop_order_number=order_data.get("number"),
letzshop_state=shipment_data.get("state"),
letzshop_state=letzshop_state,
customer_email=order_data.get("email"),
total_amount=str(order_data.get("totalPrice", {}).get("amount", "")),
currency=order_data.get("totalPrice", {}).get("currency", "EUR"),
customer_name=customer_name,
customer_locale=customer_locale,
shipping_country_iso=shipping_country_iso,
billing_country_iso=billing_country_iso,
total_amount=total_amount,
currency=currency,
raw_order_data=shipment_data,
inventory_units=[
{"id": u["id"], "state": u["state"]}
for u in shipment_data.get("inventoryUnits", {}).get("nodes", [])
],
sync_status="pending",
inventory_units=enriched_units,
sync_status=sync_status,
)
self.db.add(order)
return order
@@ -230,8 +329,66 @@ class LetzshopOrderService:
shipment_data: dict[str, Any],
) -> LetzshopOrder:
"""Update an existing order from shipment data."""
order.letzshop_state = shipment_data.get("state")
order_data = shipment_data.get("order", {})
# Update letzshop_state and sync_status
# Letzshop states: unconfirmed, confirmed, declined
letzshop_state = shipment_data.get("state", "unconfirmed")
state_mapping = {
"unconfirmed": "pending",
"confirmed": "confirmed",
"declined": "rejected",
}
order.letzshop_state = letzshop_state
order.sync_status = state_mapping.get(letzshop_state, "confirmed")
order.raw_order_data = shipment_data
# Update locale if not already set
if not order.customer_locale and order_data.get("locale"):
order.customer_locale = order_data.get("locale")
# Update country codes if not already set
if not order.shipping_country_iso:
ship_address = order_data.get("shipAddress", {}) or {}
ship_country = ship_address.get("country", {}) or {}
order.shipping_country_iso = ship_country.get("iso")
if not order.billing_country_iso:
bill_address = order_data.get("billAddress", {}) or {}
bill_country = bill_address.get("country", {}) or {}
order.billing_country_iso = bill_country.get("iso")
# Update enriched inventory units
inventory_units_data = shipment_data.get("inventoryUnits", [])
if isinstance(inventory_units_data, dict):
inventory_units_data = inventory_units_data.get("nodes", [])
enriched_units = []
for unit in inventory_units_data:
variant = unit.get("variant", {}) or {}
product = variant.get("product", {}) or {}
trade_id = variant.get("tradeId") or {}
product_name = product.get("name", {}) or {}
enriched_unit = {
"id": unit.get("id"),
"state": unit.get("state"),
"ean": trade_id.get("number"),
"ean_type": trade_id.get("parser"),
"sku": variant.get("sku"),
"mpn": variant.get("mpn"),
"product_name": (
product_name.get("en")
or product_name.get("fr")
or product_name.get("de")
),
"product_name_translations": product_name,
"price": variant.get("price"),
"variant_id": variant.get("id"),
}
enriched_units.append(enriched_unit)
order.inventory_units = enriched_units
return order
def mark_order_confirmed(self, order: LetzshopOrder) -> LetzshopOrder:
@@ -415,3 +572,222 @@ class LetzshopOrderService:
jobs = jobs[skip : skip + limit]
return jobs, total
# =========================================================================
# Historical Import Operations
# =========================================================================
def import_historical_shipments(
self,
vendor_id: int,
shipments: list[dict[str, Any]],
match_products: bool = True,
) -> dict[str, Any]:
"""
Import historical shipments into the database.
Args:
vendor_id: Vendor ID to import for.
shipments: List of shipment data from Letzshop API.
match_products: Whether to match EAN to local products.
Returns:
Dict with import statistics:
- total: Total shipments processed
- imported: New orders created
- updated: Existing orders updated
- skipped: Already up-to-date orders
- products_matched: Products matched by EAN
- products_not_found: Products not found in local catalog
"""
stats = {
"total": len(shipments),
"imported": 0,
"updated": 0,
"skipped": 0,
"products_matched": 0,
"products_not_found": 0,
"eans_processed": set(),
"eans_matched": set(),
"eans_not_found": set(),
}
for shipment in shipments:
shipment_id = shipment.get("id")
if not shipment_id:
continue
# Check if order already exists
existing_order = self.get_order_by_shipment_id(vendor_id, shipment_id)
if existing_order:
# Check if we need to update (e.g., state changed)
if existing_order.letzshop_state != shipment.get("state"):
self.update_order_from_shipment(existing_order, shipment)
stats["updated"] += 1
else:
stats["skipped"] += 1
else:
# Create new order
self.create_order(vendor_id, shipment)
stats["imported"] += 1
# Process EANs for matching
if match_products:
inventory_units = shipment.get("inventoryUnits", [])
for unit in inventory_units:
variant = unit.get("variant", {}) or {}
trade_id = variant.get("tradeId") or {}
ean = trade_id.get("number")
if ean:
stats["eans_processed"].add(ean)
# Match EANs to local products
if match_products and stats["eans_processed"]:
matched, not_found = self._match_eans_to_products(
vendor_id, list(stats["eans_processed"])
)
stats["eans_matched"] = matched
stats["eans_not_found"] = not_found
stats["products_matched"] = len(matched)
stats["products_not_found"] = len(not_found)
# Convert sets to lists for JSON serialization
stats["eans_processed"] = list(stats["eans_processed"])
stats["eans_matched"] = list(stats["eans_matched"])
stats["eans_not_found"] = list(stats["eans_not_found"])
return stats
def _match_eans_to_products(
self,
vendor_id: int,
eans: list[str],
) -> tuple[set[str], set[str]]:
"""
Match EAN codes to local products.
Args:
vendor_id: Vendor ID to search products for.
eans: List of EAN codes to match.
Returns:
Tuple of (matched_eans, not_found_eans).
"""
if not eans:
return set(), set()
# Query products by GTIN for this vendor
products = (
self.db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.gtin.in_(eans),
)
.all()
)
matched_eans = {p.gtin for p in products if p.gtin}
not_found_eans = set(eans) - matched_eans
logger.info(
f"EAN matching: {len(matched_eans)} matched, {len(not_found_eans)} not found"
)
return matched_eans, not_found_eans
def get_products_by_eans(
self,
vendor_id: int,
eans: list[str],
) -> dict[str, Product]:
"""
Get products by their EAN codes.
Args:
vendor_id: Vendor ID to search products for.
eans: List of EAN codes to search.
Returns:
Dict mapping EAN to Product.
"""
if not eans:
return {}
products = (
self.db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.gtin.in_(eans),
)
.all()
)
return {p.gtin: p for p in products if p.gtin}
def get_historical_import_summary(
self,
vendor_id: int,
) -> dict[str, Any]:
"""
Get summary of historical order data for a vendor.
Returns:
Dict with summary statistics.
"""
# Count orders by state
order_counts = (
self.db.query(
LetzshopOrder.letzshop_state,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.letzshop_state)
.all()
)
# Count orders by locale
locale_counts = (
self.db.query(
LetzshopOrder.customer_locale,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.customer_locale)
.all()
)
# Count orders by country
country_counts = (
self.db.query(
LetzshopOrder.shipping_country_iso,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.shipping_country_iso)
.all()
)
# Total revenue
total_orders = (
self.db.query(func.count(LetzshopOrder.id))
.filter(LetzshopOrder.vendor_id == vendor_id)
.scalar()
or 0
)
# Unique customers
unique_customers = (
self.db.query(func.count(func.distinct(LetzshopOrder.customer_email)))
.filter(LetzshopOrder.vendor_id == vendor_id)
.scalar()
or 0
)
return {
"total_orders": total_orders,
"unique_customers": unique_customers,
"orders_by_state": {state: count for state, count in order_counts},
"orders_by_locale": {locale or "unknown": count for locale, count in locale_counts},
"orders_by_country": {country or "unknown": count for country, count in country_counts},
}