Files
orion/app/services/letzshop/order_service.py
Samir Boulahtit fadc9036a2 fix: add db.rollback() to prevent session state corruption during historical import
When create_letzshop_order raises an exception (e.g., product not found by GTIN)
after flushing the order to the database, the session is left in an inconsistent
state. Without rollback, subsequent database operations fail or hang, causing
the import to get stuck at "0 processed...".

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-19 21:54:11 +01:00

939 lines
31 KiB
Python

# app/services/letzshop/order_service.py
"""
Letzshop order service for handling order-related database operations.
This service handles Letzshop-specific order operations while using the
unified Order model. All Letzshop orders are stored in the `orders` table
with `channel='letzshop'`.
"""
import logging
from datetime import UTC, datetime
from typing import Any, Callable
from sqlalchemy import String, and_, func, or_
from sqlalchemy.orm import Session
from app.services.order_service import order_service as unified_order_service
from models.database.letzshop import (
LetzshopFulfillmentQueue,
LetzshopHistoricalImportJob,
LetzshopSyncLog,
VendorLetzshopCredentials,
)
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.order import Order, OrderItem
from models.database.product import Product
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
class VendorNotFoundError(Exception):
"""Raised when a vendor is not found."""
class OrderNotFoundError(Exception):
"""Raised when an order is not found."""
class LetzshopOrderService:
"""Service for Letzshop order database operations using unified Order model."""
def __init__(self, db: Session):
self.db = db
# =========================================================================
# Vendor Operations
# =========================================================================
def get_vendor(self, vendor_id: int) -> Vendor | None:
"""Get vendor by ID."""
return self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
def get_vendor_or_raise(self, vendor_id: int) -> Vendor:
"""Get vendor by ID or raise VendorNotFoundError."""
vendor = self.get_vendor(vendor_id)
if vendor is None:
raise VendorNotFoundError(f"Vendor with ID {vendor_id} not found")
return vendor
def list_vendors_with_letzshop_status(
self,
skip: int = 0,
limit: int = 100,
configured_only: bool = False,
) -> tuple[list[dict[str, Any]], int]:
"""
List vendors with their Letzshop integration status.
Returns a tuple of (vendor_overviews, total_count).
"""
query = self.db.query(Vendor).filter(Vendor.is_active == True) # noqa: E712
if configured_only:
query = query.join(
VendorLetzshopCredentials,
Vendor.id == VendorLetzshopCredentials.vendor_id,
)
total = query.count()
vendors = query.order_by(Vendor.name).offset(skip).limit(limit).all()
vendor_overviews = []
for vendor in vendors:
credentials = (
self.db.query(VendorLetzshopCredentials)
.filter(VendorLetzshopCredentials.vendor_id == vendor.id)
.first()
)
# Count Letzshop orders from unified orders table
pending_orders = 0
total_orders = 0
if credentials:
pending_orders = (
self.db.query(func.count(Order.id))
.filter(
Order.vendor_id == vendor.id,
Order.channel == "letzshop",
Order.status == "pending",
)
.scalar()
or 0
)
total_orders = (
self.db.query(func.count(Order.id))
.filter(
Order.vendor_id == vendor.id,
Order.channel == "letzshop",
)
.scalar()
or 0
)
vendor_overviews.append(
{
"vendor_id": vendor.id,
"vendor_name": vendor.name,
"vendor_code": vendor.vendor_code,
"is_configured": credentials is not None,
"auto_sync_enabled": credentials.auto_sync_enabled
if credentials
else False,
"last_sync_at": credentials.last_sync_at if credentials else None,
"last_sync_status": credentials.last_sync_status
if credentials
else None,
"pending_orders": pending_orders,
"total_orders": total_orders,
}
)
return vendor_overviews, total
# =========================================================================
# Order Operations (using unified Order model)
# =========================================================================
def get_order(self, vendor_id: int, order_id: int) -> Order | None:
"""Get a Letzshop order by ID for a specific vendor."""
return (
self.db.query(Order)
.filter(
Order.id == order_id,
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
)
.first()
)
def get_order_or_raise(self, vendor_id: int, order_id: int) -> Order:
"""Get a Letzshop order or raise OrderNotFoundError."""
order = self.get_order(vendor_id, order_id)
if order is None:
raise OrderNotFoundError(f"Order {order_id} not found")
return order
def get_order_by_shipment_id(
self, vendor_id: int, shipment_id: str
) -> Order | None:
"""Get a Letzshop order by external shipment ID."""
return (
self.db.query(Order)
.filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
Order.external_shipment_id == shipment_id,
)
.first()
)
def get_order_by_id(self, order_id: int) -> Order | None:
"""Get a Letzshop order by its database ID."""
return (
self.db.query(Order)
.filter(
Order.id == order_id,
Order.channel == "letzshop",
)
.first()
)
def list_orders(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
status: str | None = None,
has_declined_items: bool | None = None,
search: str | None = None,
) -> tuple[list[Order], int]:
"""
List Letzshop orders for a vendor.
Args:
vendor_id: Vendor ID to filter by.
skip: Number of records to skip.
limit: Maximum number of records to return.
status: Filter by order status (pending, processing, shipped, etc.)
has_declined_items: If True, only return orders with declined items.
search: Search by order number, customer name, or email.
Returns a tuple of (orders, total_count).
"""
query = self.db.query(Order).filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
)
if status:
query = query.filter(Order.status == status)
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
Order.order_number.ilike(search_term),
Order.external_order_number.ilike(search_term),
Order.customer_email.ilike(search_term),
Order.customer_first_name.ilike(search_term),
Order.customer_last_name.ilike(search_term),
)
)
# Filter for orders with declined items
if has_declined_items is True:
# Subquery to find orders with declined items
declined_order_ids = (
self.db.query(OrderItem.order_id)
.filter(OrderItem.item_state == "confirmed_unavailable")
.subquery()
)
query = query.filter(Order.id.in_(declined_order_ids))
total = query.count()
orders = (
query.order_by(Order.order_date.desc())
.offset(skip)
.limit(limit)
.all()
)
return orders, total
def get_order_stats(self, vendor_id: int) -> dict[str, int]:
"""
Get order counts by status for a vendor's Letzshop orders.
Returns:
Dict with counts for each status.
"""
status_counts = (
self.db.query(
Order.status,
func.count(Order.id).label("count"),
)
.filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
)
.group_by(Order.status)
.all()
)
stats = {
"pending": 0,
"processing": 0,
"shipped": 0,
"delivered": 0,
"cancelled": 0,
"refunded": 0,
"total": 0,
}
for status, count in status_counts:
if status in stats:
stats[status] = count
stats["total"] += count
# Count orders with declined items
declined_items_count = (
self.db.query(func.count(func.distinct(OrderItem.order_id)))
.join(Order, OrderItem.order_id == Order.id)
.filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
OrderItem.item_state == "confirmed_unavailable",
)
.scalar()
or 0
)
stats["has_declined_items"] = declined_items_count
return stats
def create_order(
self,
vendor_id: int,
shipment_data: dict[str, Any],
) -> Order:
"""
Create a new Letzshop order from shipment data.
Uses the unified order service to create the order.
"""
return unified_order_service.create_letzshop_order(
db=self.db,
vendor_id=vendor_id,
shipment_data=shipment_data,
)
def update_order_from_shipment(
self,
order: Order,
shipment_data: dict[str, Any],
) -> Order:
"""Update an existing order from shipment data."""
order_data = shipment_data.get("order", {})
# Map Letzshop state to status
letzshop_state = shipment_data.get("state", "unconfirmed")
state_mapping = {
"unconfirmed": "pending",
"confirmed": "processing",
"declined": "cancelled",
}
new_status = state_mapping.get(letzshop_state, "processing")
# Update status if changed
if order.status != new_status:
order.status = new_status
now = datetime.now(UTC)
if new_status == "processing":
order.confirmed_at = now
elif new_status == "cancelled":
order.cancelled_at = now
# Update external data
order.external_data = shipment_data
# Update locale if not set
if not order.customer_locale and order_data.get("locale"):
order.customer_locale = order_data.get("locale")
# Update order_date if not set
if not order.order_date:
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
order.order_date = datetime.fromisoformat(completed_at_str)
except (ValueError, TypeError):
pass
# Update inventory unit states in order items
inventory_units_data = shipment_data.get("inventoryUnits", [])
if isinstance(inventory_units_data, dict):
inventory_units_data = inventory_units_data.get("nodes", [])
for unit in inventory_units_data:
unit_id = unit.get("id")
unit_state = unit.get("state")
if unit_id and unit_state:
# Find and update the corresponding order item
item = (
self.db.query(OrderItem)
.filter(
OrderItem.order_id == order.id,
OrderItem.external_item_id == unit_id,
)
.first()
)
if item:
item.item_state = unit_state
order.updated_at = datetime.now(UTC)
return order
def mark_order_confirmed(self, order: Order) -> Order:
"""Mark an order as confirmed (processing)."""
order.confirmed_at = datetime.now(UTC)
order.status = "processing"
order.updated_at = datetime.now(UTC)
return order
def mark_order_rejected(self, order: Order) -> Order:
"""Mark an order as rejected (cancelled)."""
order.cancelled_at = datetime.now(UTC)
order.status = "cancelled"
order.updated_at = datetime.now(UTC)
return order
def update_inventory_unit_state(
self, order: Order, item_id: str, state: str
) -> Order:
"""
Update the state of a single order item.
Args:
order: The order containing the item.
item_id: The external item ID (Letzshop inventory unit ID).
state: The new state (confirmed_available, confirmed_unavailable).
Returns:
The updated order.
"""
# Find and update the item
item = (
self.db.query(OrderItem)
.filter(
OrderItem.order_id == order.id,
OrderItem.external_item_id == item_id,
)
.first()
)
if item:
item.item_state = state
item.updated_at = datetime.now(UTC)
# Check if all items are now processed
all_items = (
self.db.query(OrderItem)
.filter(OrderItem.order_id == order.id)
.all()
)
all_confirmed = all(
i.item_state in ("confirmed_available", "confirmed_unavailable", "returned")
for i in all_items
)
if all_confirmed:
has_available = any(
i.item_state == "confirmed_available" for i in all_items
)
all_unavailable = all(
i.item_state == "confirmed_unavailable" for i in all_items
)
now = datetime.now(UTC)
if all_unavailable:
order.status = "cancelled"
order.cancelled_at = now
elif has_available:
order.status = "processing"
order.confirmed_at = now
order.updated_at = now
return order
def set_order_tracking(
self,
order: Order,
tracking_number: str,
tracking_provider: str,
) -> Order:
"""Set tracking information for an order."""
order.tracking_number = tracking_number
order.tracking_provider = tracking_provider
order.shipped_at = datetime.now(UTC)
order.status = "shipped"
order.updated_at = datetime.now(UTC)
return order
def get_order_items(self, order: Order) -> list[OrderItem]:
"""Get all items for an order."""
return (
self.db.query(OrderItem)
.filter(OrderItem.order_id == order.id)
.all()
)
# =========================================================================
# Sync Log Operations
# =========================================================================
def list_sync_logs(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
) -> tuple[list[LetzshopSyncLog], int]:
"""List sync logs for a vendor."""
query = self.db.query(LetzshopSyncLog).filter(
LetzshopSyncLog.vendor_id == vendor_id
)
total = query.count()
logs = (
query.order_by(LetzshopSyncLog.started_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return logs, total
# =========================================================================
# Fulfillment Queue Operations
# =========================================================================
def list_fulfillment_queue(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
status: str | None = None,
) -> tuple[list[LetzshopFulfillmentQueue], int]:
"""List fulfillment queue items for a vendor."""
query = self.db.query(LetzshopFulfillmentQueue).filter(
LetzshopFulfillmentQueue.vendor_id == vendor_id
)
if status:
query = query.filter(LetzshopFulfillmentQueue.status == status)
total = query.count()
items = (
query.order_by(LetzshopFulfillmentQueue.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return items, total
def add_to_fulfillment_queue(
self,
vendor_id: int,
order_id: int,
operation: str,
payload: dict[str, Any],
) -> LetzshopFulfillmentQueue:
"""Add an operation to the fulfillment queue."""
queue_item = LetzshopFulfillmentQueue(
vendor_id=vendor_id,
order_id=order_id,
operation=operation,
payload=payload,
status="pending",
)
self.db.add(queue_item)
return queue_item
# =========================================================================
# Unified Jobs Operations
# =========================================================================
def list_letzshop_jobs(
self,
vendor_id: int,
job_type: str | None = None,
status: str | None = None,
skip: int = 0,
limit: int = 20,
) -> tuple[list[dict[str, Any]], int]:
"""
List unified Letzshop-related jobs for a vendor.
Combines product imports from marketplace_import_jobs and
order syncs from letzshop_sync_logs.
"""
jobs = []
# Product imports from marketplace_import_jobs
if job_type in (None, "import"):
import_query = self.db.query(MarketplaceImportJob).filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.marketplace == "Letzshop",
)
if status:
import_query = import_query.filter(
MarketplaceImportJob.status == status
)
import_jobs = import_query.order_by(
MarketplaceImportJob.created_at.desc()
).all()
for job in import_jobs:
jobs.append(
{
"id": job.id,
"type": "import",
"status": job.status,
"created_at": job.created_at,
"started_at": job.started_at,
"completed_at": job.completed_at,
"records_processed": job.total_processed or 0,
"records_succeeded": (job.imported_count or 0)
+ (job.updated_count or 0),
"records_failed": job.error_count or 0,
}
)
# Order syncs from letzshop_sync_logs
if job_type in (None, "order_sync"):
sync_query = self.db.query(LetzshopSyncLog).filter(
LetzshopSyncLog.vendor_id == vendor_id,
LetzshopSyncLog.operation_type == "order_import",
)
if status:
sync_query = sync_query.filter(LetzshopSyncLog.status == status)
sync_logs = sync_query.order_by(LetzshopSyncLog.created_at.desc()).all()
for log in sync_logs:
jobs.append(
{
"id": log.id,
"type": "order_sync",
"status": log.status,
"created_at": log.created_at,
"started_at": log.started_at,
"completed_at": log.completed_at,
"records_processed": log.records_processed or 0,
"records_succeeded": log.records_succeeded or 0,
"records_failed": log.records_failed or 0,
}
)
# Sort all jobs by created_at descending
jobs.sort(key=lambda x: x["created_at"], reverse=True)
total = len(jobs)
jobs = jobs[skip : skip + limit]
return jobs, total
# =========================================================================
# Historical Import Operations
# =========================================================================
def import_historical_shipments(
self,
vendor_id: int,
shipments: list[dict[str, Any]],
match_products: bool = True,
progress_callback: Callable[[int, int, int, int], None] | None = None,
) -> dict[str, Any]:
"""
Import historical shipments into the unified orders table.
Args:
vendor_id: Vendor ID to import for.
shipments: List of shipment data from Letzshop API.
match_products: Whether to match GTIN to local products.
progress_callback: Optional callback(processed, imported, updated, skipped)
Returns:
Dict with import statistics.
"""
stats = {
"total": len(shipments),
"imported": 0,
"updated": 0,
"skipped": 0,
"errors": 0,
"products_matched": 0,
"products_not_found": 0,
"eans_processed": set(),
"eans_matched": set(),
"eans_not_found": set(),
"error_messages": [],
}
for i, shipment in enumerate(shipments):
shipment_id = shipment.get("id")
if not shipment_id:
continue
# Check if order already exists
existing_order = self.get_order_by_shipment_id(vendor_id, shipment_id)
if existing_order:
# Check if we need to update
letzshop_state = shipment.get("state")
state_mapping = {
"unconfirmed": "pending",
"confirmed": "processing",
"declined": "cancelled",
}
expected_status = state_mapping.get(letzshop_state, "processing")
needs_update = False
if existing_order.status != expected_status:
self.update_order_from_shipment(existing_order, shipment)
needs_update = True
# Update order_date if missing
if not existing_order.order_date:
order_data = shipment.get("order", {})
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
existing_order.order_date = datetime.fromisoformat(
completed_at_str
)
needs_update = True
except (ValueError, TypeError):
pass
if needs_update:
stats["updated"] += 1
else:
stats["skipped"] += 1
else:
# Create new order using unified service
try:
self.create_order(vendor_id, shipment)
stats["imported"] += 1
except Exception as e:
# Rollback session to clear any partial changes from failed order
# This is crucial because create_letzshop_order may have flushed
# the order before the exception was raised (e.g., product not found)
self.db.rollback()
stats["errors"] += 1
stats["error_messages"].append(
f"Shipment {shipment_id}: {str(e)}"
)
logger.error(f"Error importing shipment {shipment_id}: {e}")
# Process GTINs for matching
if match_products:
inventory_units = shipment.get("inventoryUnits", [])
for unit in inventory_units:
variant = unit.get("variant", {}) or {}
trade_id = variant.get("tradeId") or {}
gtin = trade_id.get("number")
if gtin:
stats["eans_processed"].add(gtin)
# Report progress
if progress_callback and ((i + 1) % 10 == 0 or i == len(shipments) - 1):
progress_callback(
i + 1,
stats["imported"],
stats["updated"],
stats["skipped"],
)
# Match GTINs to local products
if match_products and stats["eans_processed"]:
matched, not_found = self._match_gtins_to_products(
vendor_id, list(stats["eans_processed"])
)
stats["eans_matched"] = matched
stats["eans_not_found"] = not_found
stats["products_matched"] = len(matched)
stats["products_not_found"] = len(not_found)
# Convert sets to lists for JSON serialization
stats["eans_processed"] = list(stats["eans_processed"])
stats["eans_matched"] = list(stats["eans_matched"])
stats["eans_not_found"] = list(stats["eans_not_found"])
return stats
def _match_gtins_to_products(
self,
vendor_id: int,
gtins: list[str],
) -> tuple[set[str], set[str]]:
"""Match GTIN codes to local products."""
if not gtins:
return set(), set()
products = (
self.db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.gtin.in_(gtins),
)
.all()
)
matched_gtins = {p.gtin for p in products if p.gtin}
not_found_gtins = set(gtins) - matched_gtins
logger.info(
f"GTIN matching: {len(matched_gtins)} matched, "
f"{len(not_found_gtins)} not found"
)
return matched_gtins, not_found_gtins
def get_products_by_gtins(
self,
vendor_id: int,
gtins: list[str],
) -> dict[str, Product]:
"""Get products by their GTIN codes."""
if not gtins:
return {}
products = (
self.db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.gtin.in_(gtins),
)
.all()
)
return {p.gtin: p for p in products if p.gtin}
def get_historical_import_summary(
self,
vendor_id: int,
) -> dict[str, Any]:
"""Get summary of Letzshop orders for a vendor."""
# Count orders by status
status_counts = (
self.db.query(
Order.status,
func.count(Order.id).label("count"),
)
.filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
)
.group_by(Order.status)
.all()
)
# Count orders by locale
locale_counts = (
self.db.query(
Order.customer_locale,
func.count(Order.id).label("count"),
)
.filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
)
.group_by(Order.customer_locale)
.all()
)
# Count orders by country
country_counts = (
self.db.query(
Order.ship_country_iso,
func.count(Order.id).label("count"),
)
.filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
)
.group_by(Order.ship_country_iso)
.all()
)
# Total orders
total_orders = (
self.db.query(func.count(Order.id))
.filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
)
.scalar()
or 0
)
# Unique customers
unique_customers = (
self.db.query(func.count(func.distinct(Order.customer_email)))
.filter(
Order.vendor_id == vendor_id,
Order.channel == "letzshop",
)
.scalar()
or 0
)
return {
"total_orders": total_orders,
"unique_customers": unique_customers,
"orders_by_status": {status: count for status, count in status_counts},
"orders_by_locale": {
locale or "unknown": count for locale, count in locale_counts
},
"orders_by_country": {
country or "unknown": count for country, count in country_counts
},
}
# =========================================================================
# Historical Import Job Operations
# =========================================================================
def get_running_historical_import_job(
self,
vendor_id: int,
) -> LetzshopHistoricalImportJob | None:
"""Get any running historical import job for a vendor."""
return (
self.db.query(LetzshopHistoricalImportJob)
.filter(
LetzshopHistoricalImportJob.vendor_id == vendor_id,
LetzshopHistoricalImportJob.status.in_(
["pending", "fetching", "processing"]
),
)
.first()
)
def create_historical_import_job(
self,
vendor_id: int,
user_id: int,
) -> LetzshopHistoricalImportJob:
"""Create a new historical import job."""
job = LetzshopHistoricalImportJob(
vendor_id=vendor_id,
user_id=user_id,
status="pending",
)
self.db.add(job)
self.db.commit()
self.db.refresh(job)
return job
def get_historical_import_job_by_id(
self,
vendor_id: int,
job_id: int,
) -> LetzshopHistoricalImportJob | None:
"""Get a historical import job by ID."""
return (
self.db.query(LetzshopHistoricalImportJob)
.filter(
LetzshopHistoricalImportJob.id == job_id,
LetzshopHistoricalImportJob.vendor_id == vendor_id,
)
.first()
)