Files
orion/app/services/letzshop/order_service.py
Samir Boulahtit 3f1f6ad852 feat: add item-level order confirmation/decline support
Per Letzshop API, each inventory unit must be confirmed/declined individually.
This enables partial confirmation (some items confirmed, others declined).

Admin API endpoints:
- POST /vendors/{id}/orders/{id}/confirm - confirm all items
- POST /vendors/{id}/orders/{id}/reject - decline all items
- POST /vendors/{id}/orders/{id}/items/{id}/confirm - confirm single item
- POST /vendors/{id}/orders/{id}/items/{id}/decline - decline single item

Order detail modal now shows:
- Product name, EAN, SKU, MPN, price per item
- Per-item state badge (unconfirmed/confirmed/declined)
- Per-item confirm/decline buttons for pending items
- Bulk confirm/decline all buttons

Order status logic:
- If all items declined -> order is "declined"
- If any item confirmed -> order is "confirmed"
- Partial confirmation supported

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-18 21:27:56 +01:00

856 lines
29 KiB
Python

# app/services/letzshop/order_service.py
"""
Letzshop order service for handling order-related database operations.
This service moves database queries out of the API layer to comply with
architecture rules (API-002: endpoints should not contain business logic).
"""
import logging
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import func
from sqlalchemy.orm import Session
from models.database.letzshop import (
LetzshopFulfillmentQueue,
LetzshopOrder,
LetzshopSyncLog,
VendorLetzshopCredentials,
)
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.product import Product
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
class VendorNotFoundError(Exception):
"""Raised when a vendor is not found."""
class OrderNotFoundError(Exception):
"""Raised when a Letzshop order is not found."""
class LetzshopOrderService:
"""Service for Letzshop order database operations."""
def __init__(self, db: Session):
self.db = db
# =========================================================================
# Vendor Operations
# =========================================================================
def get_vendor(self, vendor_id: int) -> Vendor | None:
"""Get vendor by ID."""
return self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
def get_vendor_or_raise(self, vendor_id: int) -> Vendor:
"""Get vendor by ID or raise VendorNotFoundError."""
vendor = self.get_vendor(vendor_id)
if vendor is None:
raise VendorNotFoundError(f"Vendor with ID {vendor_id} not found")
return vendor
def list_vendors_with_letzshop_status(
self,
skip: int = 0,
limit: int = 100,
configured_only: bool = False,
) -> tuple[list[dict[str, Any]], int]:
"""
List vendors with their Letzshop integration status.
Returns a tuple of (vendor_overviews, total_count).
"""
# Build query
query = self.db.query(Vendor).filter(Vendor.is_active == True) # noqa: E712
if configured_only:
query = query.join(
VendorLetzshopCredentials,
Vendor.id == VendorLetzshopCredentials.vendor_id,
)
# Get total count
total = query.count()
# Get vendors
vendors = query.order_by(Vendor.name).offset(skip).limit(limit).all()
# Build response with Letzshop status
vendor_overviews = []
for vendor in vendors:
# Get credentials
credentials = (
self.db.query(VendorLetzshopCredentials)
.filter(VendorLetzshopCredentials.vendor_id == vendor.id)
.first()
)
# Get order counts
pending_orders = 0
total_orders = 0
if credentials:
pending_orders = (
self.db.query(func.count(LetzshopOrder.id))
.filter(
LetzshopOrder.vendor_id == vendor.id,
LetzshopOrder.sync_status == "pending",
)
.scalar()
or 0
)
total_orders = (
self.db.query(func.count(LetzshopOrder.id))
.filter(LetzshopOrder.vendor_id == vendor.id)
.scalar()
or 0
)
vendor_overviews.append(
{
"vendor_id": vendor.id,
"vendor_name": vendor.name,
"vendor_code": vendor.vendor_code,
"is_configured": credentials is not None,
"auto_sync_enabled": credentials.auto_sync_enabled
if credentials
else False,
"last_sync_at": credentials.last_sync_at if credentials else None,
"last_sync_status": credentials.last_sync_status
if credentials
else None,
"pending_orders": pending_orders,
"total_orders": total_orders,
}
)
return vendor_overviews, total
# =========================================================================
# Order Operations
# =========================================================================
def get_order(self, vendor_id: int, order_id: int) -> LetzshopOrder | None:
"""Get a Letzshop order by ID for a specific vendor."""
return (
self.db.query(LetzshopOrder)
.filter(
LetzshopOrder.id == order_id,
LetzshopOrder.vendor_id == vendor_id,
)
.first()
)
def get_order_or_raise(self, vendor_id: int, order_id: int) -> LetzshopOrder:
"""Get a Letzshop order or raise OrderNotFoundError."""
order = self.get_order(vendor_id, order_id)
if order is None:
raise OrderNotFoundError(f"Letzshop order {order_id} not found")
return order
def get_order_by_shipment_id(
self, vendor_id: int, shipment_id: str
) -> LetzshopOrder | None:
"""Get a Letzshop order by shipment ID."""
return (
self.db.query(LetzshopOrder)
.filter(
LetzshopOrder.vendor_id == vendor_id,
LetzshopOrder.letzshop_shipment_id == shipment_id,
)
.first()
)
def list_orders(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
sync_status: str | None = None,
letzshop_state: str | None = None,
) -> tuple[list[LetzshopOrder], int]:
"""
List Letzshop orders for a vendor.
Returns a tuple of (orders, total_count).
"""
query = self.db.query(LetzshopOrder).filter(
LetzshopOrder.vendor_id == vendor_id
)
if sync_status:
query = query.filter(LetzshopOrder.sync_status == sync_status)
if letzshop_state:
query = query.filter(LetzshopOrder.letzshop_state == letzshop_state)
total = query.count()
orders = (
query.order_by(LetzshopOrder.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return orders, total
def get_order_stats(self, vendor_id: int) -> dict[str, int]:
"""
Get order counts by sync_status for a vendor.
Returns:
Dict with counts for each status: pending, confirmed, rejected, shipped
"""
status_counts = (
self.db.query(
LetzshopOrder.sync_status,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.sync_status)
.all()
)
# Convert to dict with default 0 for missing statuses
stats = {"pending": 0, "confirmed": 0, "rejected": 0, "shipped": 0}
for status, count in status_counts:
if status in stats:
stats[status] = count
return stats
def create_order(
self,
vendor_id: int,
shipment_data: dict[str, Any],
) -> LetzshopOrder:
"""Create a new Letzshop order from shipment data."""
order_data = shipment_data.get("order", {})
# Handle total - can be a string like "99.99 EUR" or just a number
total = order_data.get("total", "")
total_amount = str(total) if total else ""
# Default currency to EUR (Letzshop is Luxembourg-based)
currency = "EUR"
# Extract customer name from shipping address
ship_address = order_data.get("shipAddress", {}) or {}
first_name = ship_address.get("firstName", "") or ""
last_name = ship_address.get("lastName", "") or ""
customer_name = f"{first_name} {last_name}".strip() or None
# Extract customer locale (language preference for invoicing)
customer_locale = order_data.get("locale")
# Extract country codes
ship_country = ship_address.get("country", {}) or {}
shipping_country_iso = ship_country.get("iso")
bill_address = order_data.get("billAddress", {}) or {}
bill_country = bill_address.get("country", {}) or {}
billing_country_iso = bill_country.get("iso")
# inventoryUnits is a direct array, not wrapped in nodes
inventory_units_data = shipment_data.get("inventoryUnits", [])
if isinstance(inventory_units_data, dict):
# Handle legacy format with nodes wrapper
inventory_units_data = inventory_units_data.get("nodes", [])
# Extract enriched inventory unit data with product details
enriched_units = []
for unit in inventory_units_data:
variant = unit.get("variant", {}) or {}
product = variant.get("product", {}) or {}
trade_id = variant.get("tradeId") or {}
product_name = product.get("name", {}) or {}
enriched_unit = {
"id": unit.get("id"),
"state": unit.get("state"),
# Product identifiers
"ean": trade_id.get("number"),
"ean_type": trade_id.get("parser"),
"sku": variant.get("sku"),
"mpn": variant.get("mpn"),
# Product info
"product_name": (
product_name.get("en")
or product_name.get("fr")
or product_name.get("de")
),
"product_name_translations": product_name,
# Pricing
"price": variant.get("price"),
"variant_id": variant.get("id"),
}
enriched_units.append(enriched_unit)
# Map Letzshop state to sync_status
# Letzshop shipment states (from docs):
# - unconfirmed: needs to be confirmed/rejected
# - confirmed: at least one product confirmed
# - declined: all products rejected
# Note: "shipped" is not a state - tracking is set separately via tracking field
letzshop_state = shipment_data.get("state", "unconfirmed")
state_mapping = {
"unconfirmed": "pending",
"confirmed": "confirmed",
"declined": "rejected",
}
sync_status = state_mapping.get(letzshop_state, "confirmed")
order = LetzshopOrder(
vendor_id=vendor_id,
letzshop_order_id=order_data.get("id", ""),
letzshop_shipment_id=shipment_data["id"],
letzshop_order_number=order_data.get("number"),
letzshop_state=letzshop_state,
customer_email=order_data.get("email"),
customer_name=customer_name,
customer_locale=customer_locale,
shipping_country_iso=shipping_country_iso,
billing_country_iso=billing_country_iso,
total_amount=total_amount,
currency=currency,
raw_order_data=shipment_data,
inventory_units=enriched_units,
sync_status=sync_status,
)
self.db.add(order)
return order
def update_order_from_shipment(
self,
order: LetzshopOrder,
shipment_data: dict[str, Any],
) -> LetzshopOrder:
"""Update an existing order from shipment data."""
order_data = shipment_data.get("order", {})
# Update letzshop_state and sync_status
# Letzshop states: unconfirmed, confirmed, declined
letzshop_state = shipment_data.get("state", "unconfirmed")
state_mapping = {
"unconfirmed": "pending",
"confirmed": "confirmed",
"declined": "rejected",
}
order.letzshop_state = letzshop_state
order.sync_status = state_mapping.get(letzshop_state, "confirmed")
order.raw_order_data = shipment_data
# Update locale if not already set
if not order.customer_locale and order_data.get("locale"):
order.customer_locale = order_data.get("locale")
# Update country codes if not already set
if not order.shipping_country_iso:
ship_address = order_data.get("shipAddress", {}) or {}
ship_country = ship_address.get("country", {}) or {}
order.shipping_country_iso = ship_country.get("iso")
if not order.billing_country_iso:
bill_address = order_data.get("billAddress", {}) or {}
bill_country = bill_address.get("country", {}) or {}
order.billing_country_iso = bill_country.get("iso")
# Update enriched inventory units
inventory_units_data = shipment_data.get("inventoryUnits", [])
if isinstance(inventory_units_data, dict):
inventory_units_data = inventory_units_data.get("nodes", [])
enriched_units = []
for unit in inventory_units_data:
variant = unit.get("variant", {}) or {}
product = variant.get("product", {}) or {}
trade_id = variant.get("tradeId") or {}
product_name = product.get("name", {}) or {}
enriched_unit = {
"id": unit.get("id"),
"state": unit.get("state"),
"ean": trade_id.get("number"),
"ean_type": trade_id.get("parser"),
"sku": variant.get("sku"),
"mpn": variant.get("mpn"),
"product_name": (
product_name.get("en")
or product_name.get("fr")
or product_name.get("de")
),
"product_name_translations": product_name,
"price": variant.get("price"),
"variant_id": variant.get("id"),
}
enriched_units.append(enriched_unit)
order.inventory_units = enriched_units
return order
def mark_order_confirmed(self, order: LetzshopOrder) -> LetzshopOrder:
"""Mark an order as confirmed."""
order.confirmed_at = datetime.now(UTC)
order.sync_status = "confirmed"
return order
def mark_order_rejected(self, order: LetzshopOrder) -> LetzshopOrder:
"""Mark an order as rejected."""
order.rejected_at = datetime.now(UTC)
order.sync_status = "rejected"
return order
def update_inventory_unit_state(
self, order: LetzshopOrder, item_id: str, state: str
) -> LetzshopOrder:
"""
Update the state of a single inventory unit in an order.
Args:
order: The order containing the inventory unit.
item_id: The inventory unit ID to update.
state: The new state (confirmed_available, confirmed_unavailable).
Returns:
The updated order.
"""
if not order.inventory_units:
return order
# Update the specific item's state
updated_units = []
for unit in order.inventory_units:
if unit.get("id") == item_id:
unit["state"] = state
updated_units.append(unit)
order.inventory_units = updated_units
# Check if all items are now processed and update order status accordingly
all_confirmed = all(
u.get("state") in ("confirmed_available", "confirmed_unavailable", "returned")
for u in updated_units
)
if all_confirmed:
# Determine order status based on item states
has_available = any(
u.get("state") == "confirmed_available" for u in updated_units
)
all_unavailable = all(
u.get("state") == "confirmed_unavailable" for u in updated_units
)
if all_unavailable:
order.sync_status = "rejected"
order.rejected_at = datetime.now(UTC)
elif has_available:
order.sync_status = "confirmed"
order.confirmed_at = datetime.now(UTC)
return order
def set_order_tracking(
self,
order: LetzshopOrder,
tracking_number: str,
tracking_carrier: str,
) -> LetzshopOrder:
"""Set tracking information for an order."""
order.tracking_number = tracking_number
order.tracking_carrier = tracking_carrier
order.tracking_set_at = datetime.now(UTC)
order.sync_status = "shipped"
return order
# =========================================================================
# Sync Log Operations
# =========================================================================
def list_sync_logs(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
) -> tuple[list[LetzshopSyncLog], int]:
"""
List sync logs for a vendor.
Returns a tuple of (logs, total_count).
"""
query = self.db.query(LetzshopSyncLog).filter(
LetzshopSyncLog.vendor_id == vendor_id
)
total = query.count()
logs = (
query.order_by(LetzshopSyncLog.started_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return logs, total
# =========================================================================
# Fulfillment Queue Operations
# =========================================================================
def list_fulfillment_queue(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
status: str | None = None,
) -> tuple[list[LetzshopFulfillmentQueue], int]:
"""
List fulfillment queue items for a vendor.
Returns a tuple of (items, total_count).
"""
query = self.db.query(LetzshopFulfillmentQueue).filter(
LetzshopFulfillmentQueue.vendor_id == vendor_id
)
if status:
query = query.filter(LetzshopFulfillmentQueue.status == status)
total = query.count()
items = (
query.order_by(LetzshopFulfillmentQueue.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return items, total
# =========================================================================
# Unified Jobs Operations
# =========================================================================
def list_letzshop_jobs(
self,
vendor_id: int,
job_type: str | None = None,
status: str | None = None,
skip: int = 0,
limit: int = 20,
) -> tuple[list[dict[str, Any]], int]:
"""
List unified Letzshop-related jobs for a vendor.
Combines product imports from marketplace_import_jobs and
order syncs from letzshop_sync_logs.
Args:
vendor_id: Vendor ID
job_type: Filter by type ('import', 'order_sync', or None for all)
status: Filter by status
skip: Pagination offset
limit: Pagination limit
Returns:
Tuple of (jobs_list, total_count) where jobs_list contains dicts
with id, type, status, created_at, started_at, completed_at,
records_processed, records_succeeded, records_failed.
"""
jobs = []
# Product imports from marketplace_import_jobs
if job_type in (None, "import"):
import_query = self.db.query(MarketplaceImportJob).filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.marketplace == "Letzshop",
)
if status:
import_query = import_query.filter(
MarketplaceImportJob.status == status
)
import_jobs = import_query.order_by(
MarketplaceImportJob.created_at.desc()
).all()
for job in import_jobs:
jobs.append(
{
"id": job.id,
"type": "import",
"status": job.status,
"created_at": job.created_at,
"started_at": job.started_at,
"completed_at": job.completed_at,
"records_processed": job.total_processed or 0,
"records_succeeded": (job.imported_count or 0)
+ (job.updated_count or 0),
"records_failed": job.error_count or 0,
}
)
# Order syncs from letzshop_sync_logs
if job_type in (None, "order_sync"):
sync_query = self.db.query(LetzshopSyncLog).filter(
LetzshopSyncLog.vendor_id == vendor_id,
LetzshopSyncLog.operation_type == "order_import",
)
if status:
sync_query = sync_query.filter(LetzshopSyncLog.status == status)
sync_logs = sync_query.order_by(LetzshopSyncLog.created_at.desc()).all()
for log in sync_logs:
jobs.append(
{
"id": log.id,
"type": "order_sync",
"status": log.status,
"created_at": log.created_at,
"started_at": log.started_at,
"completed_at": log.completed_at,
"records_processed": log.records_processed or 0,
"records_succeeded": log.records_succeeded or 0,
"records_failed": log.records_failed or 0,
}
)
# Sort all jobs by created_at descending
jobs.sort(key=lambda x: x["created_at"], reverse=True)
# Get total count and apply pagination
total = len(jobs)
jobs = jobs[skip : skip + limit]
return jobs, total
# =========================================================================
# Historical Import Operations
# =========================================================================
def import_historical_shipments(
self,
vendor_id: int,
shipments: list[dict[str, Any]],
match_products: bool = True,
) -> dict[str, Any]:
"""
Import historical shipments into the database.
Args:
vendor_id: Vendor ID to import for.
shipments: List of shipment data from Letzshop API.
match_products: Whether to match EAN to local products.
Returns:
Dict with import statistics:
- total: Total shipments processed
- imported: New orders created
- updated: Existing orders updated
- skipped: Already up-to-date orders
- products_matched: Products matched by EAN
- products_not_found: Products not found in local catalog
"""
stats = {
"total": len(shipments),
"imported": 0,
"updated": 0,
"skipped": 0,
"products_matched": 0,
"products_not_found": 0,
"eans_processed": set(),
"eans_matched": set(),
"eans_not_found": set(),
}
for shipment in shipments:
shipment_id = shipment.get("id")
if not shipment_id:
continue
# Check if order already exists
existing_order = self.get_order_by_shipment_id(vendor_id, shipment_id)
if existing_order:
# Check if we need to update (e.g., state changed)
shipment_state = shipment.get("state")
if existing_order.letzshop_state != shipment_state:
self.update_order_from_shipment(existing_order, shipment)
stats["updated"] += 1
else:
# Also fix sync_status if it's out of sync with letzshop_state
state_mapping = {
"unconfirmed": "pending",
"confirmed": "confirmed",
"declined": "rejected",
}
expected_sync_status = state_mapping.get(shipment_state, "confirmed")
if existing_order.sync_status != expected_sync_status:
existing_order.sync_status = expected_sync_status
stats["updated"] += 1
else:
stats["skipped"] += 1
else:
# Create new order
self.create_order(vendor_id, shipment)
stats["imported"] += 1
# Process EANs for matching
if match_products:
inventory_units = shipment.get("inventoryUnits", [])
for unit in inventory_units:
variant = unit.get("variant", {}) or {}
trade_id = variant.get("tradeId") or {}
ean = trade_id.get("number")
if ean:
stats["eans_processed"].add(ean)
# Match EANs to local products
if match_products and stats["eans_processed"]:
matched, not_found = self._match_eans_to_products(
vendor_id, list(stats["eans_processed"])
)
stats["eans_matched"] = matched
stats["eans_not_found"] = not_found
stats["products_matched"] = len(matched)
stats["products_not_found"] = len(not_found)
# Convert sets to lists for JSON serialization
stats["eans_processed"] = list(stats["eans_processed"])
stats["eans_matched"] = list(stats["eans_matched"])
stats["eans_not_found"] = list(stats["eans_not_found"])
return stats
def _match_eans_to_products(
self,
vendor_id: int,
eans: list[str],
) -> tuple[set[str], set[str]]:
"""
Match EAN codes to local products.
Args:
vendor_id: Vendor ID to search products for.
eans: List of EAN codes to match.
Returns:
Tuple of (matched_eans, not_found_eans).
"""
if not eans:
return set(), set()
# Query products by GTIN for this vendor
products = (
self.db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.gtin.in_(eans),
)
.all()
)
matched_eans = {p.gtin for p in products if p.gtin}
not_found_eans = set(eans) - matched_eans
logger.info(
f"EAN matching: {len(matched_eans)} matched, {len(not_found_eans)} not found"
)
return matched_eans, not_found_eans
def get_products_by_eans(
self,
vendor_id: int,
eans: list[str],
) -> dict[str, Product]:
"""
Get products by their EAN codes.
Args:
vendor_id: Vendor ID to search products for.
eans: List of EAN codes to search.
Returns:
Dict mapping EAN to Product.
"""
if not eans:
return {}
products = (
self.db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.gtin.in_(eans),
)
.all()
)
return {p.gtin: p for p in products if p.gtin}
def get_historical_import_summary(
self,
vendor_id: int,
) -> dict[str, Any]:
"""
Get summary of historical order data for a vendor.
Returns:
Dict with summary statistics.
"""
# Count orders by state
order_counts = (
self.db.query(
LetzshopOrder.letzshop_state,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.letzshop_state)
.all()
)
# Count orders by locale
locale_counts = (
self.db.query(
LetzshopOrder.customer_locale,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.customer_locale)
.all()
)
# Count orders by country
country_counts = (
self.db.query(
LetzshopOrder.shipping_country_iso,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.shipping_country_iso)
.all()
)
# Total revenue
total_orders = (
self.db.query(func.count(LetzshopOrder.id))
.filter(LetzshopOrder.vendor_id == vendor_id)
.scalar()
or 0
)
# Unique customers
unique_customers = (
self.db.query(func.count(func.distinct(LetzshopOrder.customer_email)))
.filter(LetzshopOrder.vendor_id == vendor_id)
.scalar()
or 0
)
return {
"total_orders": total_orders,
"unique_customers": unique_customers,
"orders_by_state": {state: count for state, count in order_counts},
"orders_by_locale": {locale or "unknown": count for locale, count in locale_counts},
"orders_by_country": {country or "unknown": count for country, count in country_counts},
}