Files
orion/app/services/letzshop/order_service.py
Samir Boulahtit fceaba703e wip: update Letzshop service and API for historical imports
- Add historical order import functionality
- Add order detail page route
- Update API endpoints for order confirmation flow

Note: These files need further updates to use the new unified order model.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-19 21:18:39 +01:00

1049 lines
36 KiB
Python

# app/services/letzshop/order_service.py
"""
Letzshop order service for handling order-related database operations.
This service moves database queries out of the API layer to comply with
architecture rules (API-002: endpoints should not contain business logic).
"""
import logging
from datetime import UTC, datetime
from typing import Any, Callable
from sqlalchemy import String, func
from sqlalchemy.orm import Session
from models.database.letzshop import (
LetzshopFulfillmentQueue,
LetzshopHistoricalImportJob,
LetzshopOrder,
LetzshopSyncLog,
VendorLetzshopCredentials,
)
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.product import Product
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
class VendorNotFoundError(Exception):
"""Raised when a vendor is not found."""
class OrderNotFoundError(Exception):
"""Raised when a Letzshop order is not found."""
class LetzshopOrderService:
"""Service for Letzshop order database operations."""
def __init__(self, db: Session):
self.db = db
# =========================================================================
# Vendor Operations
# =========================================================================
def get_vendor(self, vendor_id: int) -> Vendor | None:
"""Get vendor by ID."""
return self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
def get_vendor_or_raise(self, vendor_id: int) -> Vendor:
"""Get vendor by ID or raise VendorNotFoundError."""
vendor = self.get_vendor(vendor_id)
if vendor is None:
raise VendorNotFoundError(f"Vendor with ID {vendor_id} not found")
return vendor
def list_vendors_with_letzshop_status(
self,
skip: int = 0,
limit: int = 100,
configured_only: bool = False,
) -> tuple[list[dict[str, Any]], int]:
"""
List vendors with their Letzshop integration status.
Returns a tuple of (vendor_overviews, total_count).
"""
# Build query
query = self.db.query(Vendor).filter(Vendor.is_active == True) # noqa: E712
if configured_only:
query = query.join(
VendorLetzshopCredentials,
Vendor.id == VendorLetzshopCredentials.vendor_id,
)
# Get total count
total = query.count()
# Get vendors
vendors = query.order_by(Vendor.name).offset(skip).limit(limit).all()
# Build response with Letzshop status
vendor_overviews = []
for vendor in vendors:
# Get credentials
credentials = (
self.db.query(VendorLetzshopCredentials)
.filter(VendorLetzshopCredentials.vendor_id == vendor.id)
.first()
)
# Get order counts
pending_orders = 0
total_orders = 0
if credentials:
pending_orders = (
self.db.query(func.count(LetzshopOrder.id))
.filter(
LetzshopOrder.vendor_id == vendor.id,
LetzshopOrder.sync_status == "pending",
)
.scalar()
or 0
)
total_orders = (
self.db.query(func.count(LetzshopOrder.id))
.filter(LetzshopOrder.vendor_id == vendor.id)
.scalar()
or 0
)
vendor_overviews.append(
{
"vendor_id": vendor.id,
"vendor_name": vendor.name,
"vendor_code": vendor.vendor_code,
"is_configured": credentials is not None,
"auto_sync_enabled": credentials.auto_sync_enabled
if credentials
else False,
"last_sync_at": credentials.last_sync_at if credentials else None,
"last_sync_status": credentials.last_sync_status
if credentials
else None,
"pending_orders": pending_orders,
"total_orders": total_orders,
}
)
return vendor_overviews, total
# =========================================================================
# Order Operations
# =========================================================================
def get_order(self, vendor_id: int, order_id: int) -> LetzshopOrder | None:
"""Get a Letzshop order by ID for a specific vendor."""
return (
self.db.query(LetzshopOrder)
.filter(
LetzshopOrder.id == order_id,
LetzshopOrder.vendor_id == vendor_id,
)
.first()
)
def get_order_or_raise(self, vendor_id: int, order_id: int) -> LetzshopOrder:
"""Get a Letzshop order or raise OrderNotFoundError."""
order = self.get_order(vendor_id, order_id)
if order is None:
raise OrderNotFoundError(f"Letzshop order {order_id} not found")
return order
def get_order_by_shipment_id(
self, vendor_id: int, shipment_id: str
) -> LetzshopOrder | None:
"""Get a Letzshop order by shipment ID."""
return (
self.db.query(LetzshopOrder)
.filter(
LetzshopOrder.vendor_id == vendor_id,
LetzshopOrder.letzshop_shipment_id == shipment_id,
)
.first()
)
def get_order_by_id(self, order_id: int) -> LetzshopOrder | None:
"""Get a Letzshop order by its database ID."""
return (
self.db.query(LetzshopOrder)
.filter(LetzshopOrder.id == order_id)
.first()
)
def list_orders(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
sync_status: str | None = None,
letzshop_state: str | None = None,
has_declined_items: bool | None = None,
search: str | None = None,
) -> tuple[list[LetzshopOrder], int]:
"""
List Letzshop orders for a vendor.
Args:
vendor_id: Vendor ID to filter by.
skip: Number of records to skip.
limit: Maximum number of records to return.
sync_status: Filter by order sync status (pending, confirmed, etc.)
letzshop_state: Filter by Letzshop shipment state.
has_declined_items: If True, only return orders with at least one
declined/unavailable item (confirmed_unavailable state).
search: Search by order number, customer name, or customer email.
Returns a tuple of (orders, total_count).
"""
from sqlalchemy import or_
query = self.db.query(LetzshopOrder).filter(
LetzshopOrder.vendor_id == vendor_id
)
if sync_status:
query = query.filter(LetzshopOrder.sync_status == sync_status)
if letzshop_state:
query = query.filter(LetzshopOrder.letzshop_state == letzshop_state)
# Search by order number, customer name, or email
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
LetzshopOrder.letzshop_order_number.ilike(search_term),
LetzshopOrder.customer_name.ilike(search_term),
LetzshopOrder.customer_email.ilike(search_term),
)
)
# Filter for orders with declined items (confirmed_unavailable state)
if has_declined_items is True:
# Use JSON contains check for SQLite/PostgreSQL
query = query.filter(
LetzshopOrder.inventory_units.isnot(None),
LetzshopOrder.inventory_units.cast(String).contains("confirmed_unavailable"),
)
total = query.count()
orders = (
query.order_by(LetzshopOrder.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return orders, total
def get_order_stats(self, vendor_id: int) -> dict[str, int]:
"""
Get order counts by sync_status for a vendor.
Returns:
Dict with counts for each status: pending, confirmed, rejected, shipped,
and has_declined_items (orders with at least one declined item).
"""
status_counts = (
self.db.query(
LetzshopOrder.sync_status,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.sync_status)
.all()
)
# Convert to dict with default 0 for missing statuses
stats = {"pending": 0, "confirmed": 0, "rejected": 0, "shipped": 0}
for status, count in status_counts:
if status in stats:
stats[status] = count
# Count orders with declined items (confirmed_unavailable state)
declined_items_count = (
self.db.query(func.count(LetzshopOrder.id))
.filter(
LetzshopOrder.vendor_id == vendor_id,
LetzshopOrder.inventory_units.isnot(None),
LetzshopOrder.inventory_units.cast(String).contains("confirmed_unavailable"),
)
.scalar()
or 0
)
stats["has_declined_items"] = declined_items_count
return stats
def create_order(
self,
vendor_id: int,
shipment_data: dict[str, Any],
) -> LetzshopOrder:
"""Create a new Letzshop order from shipment data."""
order_data = shipment_data.get("order", {})
# Handle total - can be a string like "99.99 EUR" or just a number
total = order_data.get("total", "")
total_amount = str(total) if total else ""
# Default currency to EUR (Letzshop is Luxembourg-based)
currency = "EUR"
# Extract customer name from shipping address
ship_address = order_data.get("shipAddress", {}) or {}
first_name = ship_address.get("firstName", "") or ""
last_name = ship_address.get("lastName", "") or ""
customer_name = f"{first_name} {last_name}".strip() or None
# Extract customer locale (language preference for invoicing)
customer_locale = order_data.get("locale")
# Extract order date (completedAt from Letzshop)
order_date = None
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
from datetime import datetime
# Handle ISO format with timezone
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
order_date = datetime.fromisoformat(completed_at_str)
except (ValueError, TypeError):
pass # Keep None if parsing fails
# Extract country codes
ship_country = ship_address.get("country", {}) or {}
shipping_country_iso = ship_country.get("iso")
bill_address = order_data.get("billAddress", {}) or {}
bill_country = bill_address.get("country", {}) or {}
billing_country_iso = bill_country.get("iso")
# inventoryUnits is a direct array, not wrapped in nodes
inventory_units_data = shipment_data.get("inventoryUnits", [])
if isinstance(inventory_units_data, dict):
# Handle legacy format with nodes wrapper
inventory_units_data = inventory_units_data.get("nodes", [])
# Extract enriched inventory unit data with product details
enriched_units = []
for unit in inventory_units_data:
variant = unit.get("variant", {}) or {}
product = variant.get("product", {}) or {}
trade_id = variant.get("tradeId") or {}
product_name = product.get("name", {}) or {}
enriched_unit = {
"id": unit.get("id"),
"state": unit.get("state"),
# Product identifiers
"ean": trade_id.get("number"),
"ean_type": trade_id.get("parser"),
"sku": variant.get("sku"),
"mpn": variant.get("mpn"),
# Product info
"product_name": (
product_name.get("en")
or product_name.get("fr")
or product_name.get("de")
),
"product_name_translations": product_name,
# Pricing
"price": variant.get("price"),
"variant_id": variant.get("id"),
}
enriched_units.append(enriched_unit)
# Map Letzshop state to sync_status
# Letzshop shipment states (from docs):
# - unconfirmed: needs to be confirmed/rejected
# - confirmed: at least one product confirmed
# - declined: all products rejected
# Note: "shipped" is not a state - tracking is set separately via tracking field
letzshop_state = shipment_data.get("state", "unconfirmed")
state_mapping = {
"unconfirmed": "pending",
"confirmed": "confirmed",
"declined": "rejected",
}
sync_status = state_mapping.get(letzshop_state, "confirmed")
order = LetzshopOrder(
vendor_id=vendor_id,
letzshop_order_id=order_data.get("id", ""),
letzshop_shipment_id=shipment_data["id"],
letzshop_order_number=order_data.get("number"),
letzshop_state=letzshop_state,
customer_email=order_data.get("email"),
customer_name=customer_name,
customer_locale=customer_locale,
shipping_country_iso=shipping_country_iso,
billing_country_iso=billing_country_iso,
total_amount=total_amount,
currency=currency,
order_date=order_date,
raw_order_data=shipment_data,
inventory_units=enriched_units,
sync_status=sync_status,
)
self.db.add(order)
return order
def update_order_from_shipment(
self,
order: LetzshopOrder,
shipment_data: dict[str, Any],
) -> LetzshopOrder:
"""Update an existing order from shipment data."""
order_data = shipment_data.get("order", {})
# Update letzshop_state and sync_status
# Letzshop states: unconfirmed, confirmed, declined
letzshop_state = shipment_data.get("state", "unconfirmed")
state_mapping = {
"unconfirmed": "pending",
"confirmed": "confirmed",
"declined": "rejected",
}
order.letzshop_state = letzshop_state
order.sync_status = state_mapping.get(letzshop_state, "confirmed")
order.raw_order_data = shipment_data
# Update locale if not already set
if not order.customer_locale and order_data.get("locale"):
order.customer_locale = order_data.get("locale")
# Update order_date if not already set
if not order.order_date:
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
from datetime import datetime
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
order.order_date = datetime.fromisoformat(completed_at_str)
except (ValueError, TypeError):
pass
# Update country codes if not already set
if not order.shipping_country_iso:
ship_address = order_data.get("shipAddress", {}) or {}
ship_country = ship_address.get("country", {}) or {}
order.shipping_country_iso = ship_country.get("iso")
if not order.billing_country_iso:
bill_address = order_data.get("billAddress", {}) or {}
bill_country = bill_address.get("country", {}) or {}
order.billing_country_iso = bill_country.get("iso")
# Update enriched inventory units
inventory_units_data = shipment_data.get("inventoryUnits", [])
if isinstance(inventory_units_data, dict):
inventory_units_data = inventory_units_data.get("nodes", [])
enriched_units = []
for unit in inventory_units_data:
variant = unit.get("variant", {}) or {}
product = variant.get("product", {}) or {}
trade_id = variant.get("tradeId") or {}
product_name = product.get("name", {}) or {}
enriched_unit = {
"id": unit.get("id"),
"state": unit.get("state"),
"ean": trade_id.get("number"),
"ean_type": trade_id.get("parser"),
"sku": variant.get("sku"),
"mpn": variant.get("mpn"),
"product_name": (
product_name.get("en")
or product_name.get("fr")
or product_name.get("de")
),
"product_name_translations": product_name,
"price": variant.get("price"),
"variant_id": variant.get("id"),
}
enriched_units.append(enriched_unit)
order.inventory_units = enriched_units
return order
def mark_order_confirmed(self, order: LetzshopOrder) -> LetzshopOrder:
"""Mark an order as confirmed."""
order.confirmed_at = datetime.now(UTC)
order.sync_status = "confirmed"
return order
def mark_order_rejected(self, order: LetzshopOrder) -> LetzshopOrder:
"""Mark an order as rejected."""
order.rejected_at = datetime.now(UTC)
order.sync_status = "rejected"
return order
def update_inventory_unit_state(
self, order: LetzshopOrder, item_id: str, state: str
) -> LetzshopOrder:
"""
Update the state of a single inventory unit in an order.
Args:
order: The order containing the inventory unit.
item_id: The inventory unit ID to update.
state: The new state (confirmed_available, confirmed_unavailable).
Returns:
The updated order.
"""
if not order.inventory_units:
return order
# Update the specific item's state
updated_units = []
for unit in order.inventory_units:
if unit.get("id") == item_id:
unit["state"] = state
updated_units.append(unit)
order.inventory_units = updated_units
# Check if all items are now processed and update order status accordingly
all_confirmed = all(
u.get("state") in ("confirmed_available", "confirmed_unavailable", "returned")
for u in updated_units
)
if all_confirmed:
# Determine order status based on item states
has_available = any(
u.get("state") == "confirmed_available" for u in updated_units
)
all_unavailable = all(
u.get("state") == "confirmed_unavailable" for u in updated_units
)
if all_unavailable:
order.sync_status = "rejected"
order.rejected_at = datetime.now(UTC)
elif has_available:
order.sync_status = "confirmed"
order.confirmed_at = datetime.now(UTC)
return order
def set_order_tracking(
self,
order: LetzshopOrder,
tracking_number: str,
tracking_carrier: str,
) -> LetzshopOrder:
"""Set tracking information for an order."""
order.tracking_number = tracking_number
order.tracking_carrier = tracking_carrier
order.tracking_set_at = datetime.now(UTC)
order.sync_status = "shipped"
return order
# =========================================================================
# Sync Log Operations
# =========================================================================
def list_sync_logs(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
) -> tuple[list[LetzshopSyncLog], int]:
"""
List sync logs for a vendor.
Returns a tuple of (logs, total_count).
"""
query = self.db.query(LetzshopSyncLog).filter(
LetzshopSyncLog.vendor_id == vendor_id
)
total = query.count()
logs = (
query.order_by(LetzshopSyncLog.started_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return logs, total
# =========================================================================
# Fulfillment Queue Operations
# =========================================================================
def list_fulfillment_queue(
self,
vendor_id: int,
skip: int = 0,
limit: int = 50,
status: str | None = None,
) -> tuple[list[LetzshopFulfillmentQueue], int]:
"""
List fulfillment queue items for a vendor.
Returns a tuple of (items, total_count).
"""
query = self.db.query(LetzshopFulfillmentQueue).filter(
LetzshopFulfillmentQueue.vendor_id == vendor_id
)
if status:
query = query.filter(LetzshopFulfillmentQueue.status == status)
total = query.count()
items = (
query.order_by(LetzshopFulfillmentQueue.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return items, total
# =========================================================================
# Unified Jobs Operations
# =========================================================================
def list_letzshop_jobs(
self,
vendor_id: int,
job_type: str | None = None,
status: str | None = None,
skip: int = 0,
limit: int = 20,
) -> tuple[list[dict[str, Any]], int]:
"""
List unified Letzshop-related jobs for a vendor.
Combines product imports from marketplace_import_jobs and
order syncs from letzshop_sync_logs.
Args:
vendor_id: Vendor ID
job_type: Filter by type ('import', 'order_sync', or None for all)
status: Filter by status
skip: Pagination offset
limit: Pagination limit
Returns:
Tuple of (jobs_list, total_count) where jobs_list contains dicts
with id, type, status, created_at, started_at, completed_at,
records_processed, records_succeeded, records_failed.
"""
jobs = []
# Product imports from marketplace_import_jobs
if job_type in (None, "import"):
import_query = self.db.query(MarketplaceImportJob).filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.marketplace == "Letzshop",
)
if status:
import_query = import_query.filter(
MarketplaceImportJob.status == status
)
import_jobs = import_query.order_by(
MarketplaceImportJob.created_at.desc()
).all()
for job in import_jobs:
jobs.append(
{
"id": job.id,
"type": "import",
"status": job.status,
"created_at": job.created_at,
"started_at": job.started_at,
"completed_at": job.completed_at,
"records_processed": job.total_processed or 0,
"records_succeeded": (job.imported_count or 0)
+ (job.updated_count or 0),
"records_failed": job.error_count or 0,
}
)
# Order syncs from letzshop_sync_logs
if job_type in (None, "order_sync"):
sync_query = self.db.query(LetzshopSyncLog).filter(
LetzshopSyncLog.vendor_id == vendor_id,
LetzshopSyncLog.operation_type == "order_import",
)
if status:
sync_query = sync_query.filter(LetzshopSyncLog.status == status)
sync_logs = sync_query.order_by(LetzshopSyncLog.created_at.desc()).all()
for log in sync_logs:
jobs.append(
{
"id": log.id,
"type": "order_sync",
"status": log.status,
"created_at": log.created_at,
"started_at": log.started_at,
"completed_at": log.completed_at,
"records_processed": log.records_processed or 0,
"records_succeeded": log.records_succeeded or 0,
"records_failed": log.records_failed or 0,
}
)
# Sort all jobs by created_at descending
jobs.sort(key=lambda x: x["created_at"], reverse=True)
# Get total count and apply pagination
total = len(jobs)
jobs = jobs[skip : skip + limit]
return jobs, total
# =========================================================================
# Historical Import Operations
# =========================================================================
def import_historical_shipments(
self,
vendor_id: int,
shipments: list[dict[str, Any]],
match_products: bool = True,
progress_callback: Callable[[int, int, int, int], None] | None = None,
) -> dict[str, Any]:
"""
Import historical shipments into the database.
Args:
vendor_id: Vendor ID to import for.
shipments: List of shipment data from Letzshop API.
match_products: Whether to match EAN to local products.
progress_callback: Optional callback(processed, imported, updated, skipped)
for progress updates during import.
Returns:
Dict with import statistics:
- total: Total shipments processed
- imported: New orders created
- updated: Existing orders updated
- skipped: Already up-to-date orders
- products_matched: Products matched by EAN
- products_not_found: Products not found in local catalog
"""
stats = {
"total": len(shipments),
"imported": 0,
"updated": 0,
"skipped": 0,
"products_matched": 0,
"products_not_found": 0,
"eans_processed": set(),
"eans_matched": set(),
"eans_not_found": set(),
}
for i, shipment in enumerate(shipments):
shipment_id = shipment.get("id")
if not shipment_id:
continue
# Check if order already exists
existing_order = self.get_order_by_shipment_id(vendor_id, shipment_id)
if existing_order:
# Check if we need to update (e.g., state changed or missing data)
shipment_state = shipment.get("state")
needs_update = False
if existing_order.letzshop_state != shipment_state:
self.update_order_from_shipment(existing_order, shipment)
needs_update = True
else:
# Also fix sync_status if it's out of sync with letzshop_state
state_mapping = {
"unconfirmed": "pending",
"confirmed": "confirmed",
"declined": "rejected",
}
expected_sync_status = state_mapping.get(shipment_state, "confirmed")
if existing_order.sync_status != expected_sync_status:
existing_order.sync_status = expected_sync_status
needs_update = True
# Populate order_date if missing (for orders imported before this field existed)
if not existing_order.order_date:
order_data = shipment.get("order", {})
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
from datetime import datetime
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
existing_order.order_date = datetime.fromisoformat(completed_at_str)
needs_update = True
except (ValueError, TypeError):
pass
if needs_update:
stats["updated"] += 1
else:
stats["skipped"] += 1
else:
# Create new order
self.create_order(vendor_id, shipment)
stats["imported"] += 1
# Process EANs for matching
if match_products:
inventory_units = shipment.get("inventoryUnits", [])
for unit in inventory_units:
variant = unit.get("variant", {}) or {}
trade_id = variant.get("tradeId") or {}
ean = trade_id.get("number")
if ean:
stats["eans_processed"].add(ean)
# Report progress every 10 shipments or at the end
if progress_callback and ((i + 1) % 10 == 0 or i == len(shipments) - 1):
progress_callback(
i + 1,
stats["imported"],
stats["updated"],
stats["skipped"],
)
# Match EANs to local products
if match_products and stats["eans_processed"]:
matched, not_found = self._match_eans_to_products(
vendor_id, list(stats["eans_processed"])
)
stats["eans_matched"] = matched
stats["eans_not_found"] = not_found
stats["products_matched"] = len(matched)
stats["products_not_found"] = len(not_found)
# Convert sets to lists for JSON serialization
stats["eans_processed"] = list(stats["eans_processed"])
stats["eans_matched"] = list(stats["eans_matched"])
stats["eans_not_found"] = list(stats["eans_not_found"])
return stats
def _match_eans_to_products(
self,
vendor_id: int,
eans: list[str],
) -> tuple[set[str], set[str]]:
"""
Match EAN codes to local products.
Args:
vendor_id: Vendor ID to search products for.
eans: List of EAN codes to match.
Returns:
Tuple of (matched_eans, not_found_eans).
"""
if not eans:
return set(), set()
# Query products by GTIN for this vendor
products = (
self.db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.gtin.in_(eans),
)
.all()
)
matched_eans = {p.gtin for p in products if p.gtin}
not_found_eans = set(eans) - matched_eans
logger.info(
f"EAN matching: {len(matched_eans)} matched, {len(not_found_eans)} not found"
)
return matched_eans, not_found_eans
def get_products_by_eans(
self,
vendor_id: int,
eans: list[str],
) -> dict[str, Product]:
"""
Get products by their EAN codes.
Args:
vendor_id: Vendor ID to search products for.
eans: List of EAN codes to search.
Returns:
Dict mapping EAN to Product.
"""
if not eans:
return {}
products = (
self.db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.gtin.in_(eans),
)
.all()
)
return {p.gtin: p for p in products if p.gtin}
def get_historical_import_summary(
self,
vendor_id: int,
) -> dict[str, Any]:
"""
Get summary of historical order data for a vendor.
Returns:
Dict with summary statistics.
"""
# Count orders by state
order_counts = (
self.db.query(
LetzshopOrder.letzshop_state,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.letzshop_state)
.all()
)
# Count orders by locale
locale_counts = (
self.db.query(
LetzshopOrder.customer_locale,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.customer_locale)
.all()
)
# Count orders by country
country_counts = (
self.db.query(
LetzshopOrder.shipping_country_iso,
func.count(LetzshopOrder.id).label("count"),
)
.filter(LetzshopOrder.vendor_id == vendor_id)
.group_by(LetzshopOrder.shipping_country_iso)
.all()
)
# Total revenue
total_orders = (
self.db.query(func.count(LetzshopOrder.id))
.filter(LetzshopOrder.vendor_id == vendor_id)
.scalar()
or 0
)
# Unique customers
unique_customers = (
self.db.query(func.count(func.distinct(LetzshopOrder.customer_email)))
.filter(LetzshopOrder.vendor_id == vendor_id)
.scalar()
or 0
)
return {
"total_orders": total_orders,
"unique_customers": unique_customers,
"orders_by_state": {state: count for state, count in order_counts},
"orders_by_locale": {locale or "unknown": count for locale, count in locale_counts},
"orders_by_country": {country or "unknown": count for country, count in country_counts},
}
# =========================================================================
# Historical Import Job Operations
# =========================================================================
def get_running_historical_import_job(
self,
vendor_id: int,
) -> LetzshopHistoricalImportJob | None:
"""
Get any running historical import job for a vendor.
Args:
vendor_id: Vendor ID to check.
Returns:
Running job or None if no active job.
"""
return (
self.db.query(LetzshopHistoricalImportJob)
.filter(
LetzshopHistoricalImportJob.vendor_id == vendor_id,
LetzshopHistoricalImportJob.status.in_(
["pending", "fetching", "processing"]
),
)
.first()
)
def create_historical_import_job(
self,
vendor_id: int,
user_id: int,
) -> LetzshopHistoricalImportJob:
"""
Create a new historical import job.
Args:
vendor_id: Vendor ID to import for.
user_id: User ID who initiated the import.
Returns:
Created job record.
"""
job = LetzshopHistoricalImportJob(
vendor_id=vendor_id,
user_id=user_id,
status="pending",
)
self.db.add(job)
self.db.commit()
self.db.refresh(job)
return job
def get_historical_import_job_by_id(
self,
vendor_id: int,
job_id: int,
) -> LetzshopHistoricalImportJob | None:
"""
Get a historical import job by ID.
Args:
vendor_id: Vendor ID to verify ownership.
job_id: Job ID to fetch.
Returns:
Job record or None if not found.
"""
return (
self.db.query(LetzshopHistoricalImportJob)
.filter(
LetzshopHistoricalImportJob.id == job_id,
LetzshopHistoricalImportJob.vendor_id == vendor_id,
)
.first()
)