wip: update Letzshop service and API for historical imports

- Add historical order import functionality
- Add order detail page route
- Update API endpoints for order confirmation flow

Note: These files need further updates to use the new unified order model.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-19 21:18:39 +01:00
parent 473a4fabfc
commit fceaba703e
3 changed files with 338 additions and 55 deletions

View File

@@ -11,7 +11,7 @@ Provides admin-level management of:
import logging
from fastapi import APIRouter, Depends, Path, Query
from fastapi import APIRouter, BackgroundTasks, Depends, Path, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
@@ -25,6 +25,7 @@ from app.services.letzshop import (
OrderNotFoundError,
VendorNotFoundError,
)
from app.tasks.letzshop_tasks import process_historical_import
from models.database.user import User
from models.schema.letzshop import (
FulfillmentOperationResponse,
@@ -33,8 +34,11 @@ from models.schema.letzshop import (
LetzshopCredentialsCreate,
LetzshopCredentialsResponse,
LetzshopCredentialsUpdate,
LetzshopHistoricalImportJobResponse,
LetzshopHistoricalImportStartResponse,
LetzshopJobItem,
LetzshopJobsListResponse,
LetzshopOrderDetailResponse,
LetzshopOrderListResponse,
LetzshopOrderResponse,
LetzshopSuccessResponse,
@@ -345,6 +349,12 @@ def list_vendor_letzshop_orders(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
sync_status: str | None = Query(None, description="Filter by sync status"),
has_declined_items: bool | None = Query(
None, description="Filter orders with declined/unavailable items"
),
search: str | None = Query(
None, description="Search by order number, customer name, or email"
),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
@@ -361,6 +371,8 @@ def list_vendor_letzshop_orders(
skip=skip,
limit=limit,
sync_status=sync_status,
has_declined_items=has_declined_items,
search=search,
)
# Get order stats for all statuses
@@ -377,6 +389,9 @@ def list_vendor_letzshop_orders(
letzshop_state=order.letzshop_state,
customer_email=order.customer_email,
customer_name=order.customer_name,
customer_locale=order.customer_locale,
shipping_country_iso=order.shipping_country_iso,
billing_country_iso=order.billing_country_iso,
total_amount=order.total_amount,
currency=order.currency,
local_order_id=order.local_order_id,
@@ -389,6 +404,7 @@ def list_vendor_letzshop_orders(
tracking_number=order.tracking_number,
tracking_carrier=order.tracking_carrier,
inventory_units=order.inventory_units,
order_date=order.order_date,
created_at=order.created_at,
updated_at=order.updated_at,
)
@@ -401,6 +417,53 @@ def list_vendor_letzshop_orders(
)
@router.get(
"/orders/{order_id}",
response_model=LetzshopOrderDetailResponse,
)
def get_letzshop_order_detail(
order_id: int = Path(..., description="Letzshop order ID"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get detailed information for a single Letzshop order."""
order_service = get_order_service(db)
order = order_service.get_order_by_id(order_id)
if not order:
raise ResourceNotFoundException("Order", str(order_id))
return LetzshopOrderDetailResponse(
id=order.id,
vendor_id=order.vendor_id,
letzshop_order_id=order.letzshop_order_id,
letzshop_shipment_id=order.letzshop_shipment_id,
letzshop_order_number=order.letzshop_order_number,
letzshop_state=order.letzshop_state,
customer_email=order.customer_email,
customer_name=order.customer_name,
customer_locale=order.customer_locale,
shipping_country_iso=order.shipping_country_iso,
billing_country_iso=order.billing_country_iso,
total_amount=order.total_amount,
currency=order.currency,
local_order_id=order.local_order_id,
sync_status=order.sync_status,
last_synced_at=order.last_synced_at,
sync_error=order.sync_error,
confirmed_at=order.confirmed_at,
rejected_at=order.rejected_at,
tracking_set_at=order.tracking_set_at,
tracking_number=order.tracking_number,
tracking_carrier=order.tracking_carrier,
inventory_units=order.inventory_units,
order_date=order.order_date,
created_at=order.created_at,
updated_at=order.updated_at,
raw_order_data=order.raw_order_data,
)
@router.post(
"/vendors/{vendor_id}/sync",
response_model=LetzshopSyncTriggerResponse,
@@ -543,22 +606,21 @@ def list_vendor_letzshop_jobs(
@router.post(
"/vendors/{vendor_id}/import-history",
response_model=LetzshopHistoricalImportStartResponse,
)
def import_historical_orders(
def start_historical_import(
vendor_id: int = Path(..., description="Vendor ID"),
state: str = Query("confirmed", description="Shipment state to import"),
max_pages: int | None = Query(None, ge=1, le=100, description="Max pages to fetch"),
match_products: bool = Query(True, description="Match EANs to local products"),
background_tasks: BackgroundTasks = None,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Import historical orders from Letzshop.
Start historical order import from Letzshop as a background job.
Fetches all shipments with the specified state (default: confirmed)
and imports them into the database. Supports pagination and EAN matching.
Creates a job that imports both confirmed and declined orders with
real-time progress tracking. Poll the status endpoint to track progress.
Returns statistics on imported/updated/skipped orders and product matching.
Returns a job_id for polling the status endpoint.
"""
order_service = get_order_service(db)
creds_service = get_credentials_service(db)
@@ -576,51 +638,55 @@ def import_historical_orders(
f"Letzshop credentials not configured for vendor {vendor.name}"
)
# Fetch all shipments with pagination
try:
with creds_service.create_client(vendor_id) as client:
logger.info(
f"Starting historical import for vendor {vendor_id}, state={state}, max_pages={max_pages}"
)
# Check if there's already a running import for this vendor
existing_job = order_service.get_running_historical_import_job(vendor_id)
if existing_job:
raise ValidationException(
f"Historical import already in progress (job_id={existing_job.id})"
)
shipments = client.get_all_shipments_paginated(
state=state,
page_size=50,
max_pages=max_pages,
)
# Create job record
job = order_service.create_historical_import_job(vendor_id, current_admin.id)
logger.info(f"Fetched {len(shipments)} {state} shipments from Letzshop")
logger.info(f"Created historical import job {job.id} for vendor {vendor_id}")
# Import shipments
stats = order_service.import_historical_shipments(
vendor_id=vendor_id,
shipments=shipments,
match_products=match_products,
)
# Queue background task
background_tasks.add_task(
process_historical_import,
job.id,
vendor_id,
)
db.commit()
return LetzshopHistoricalImportStartResponse(
job_id=job.id,
status="pending",
message="Historical import job started",
)
# Update sync status
creds_service.update_sync_status(
vendor_id,
"success",
None,
)
logger.info(
f"Historical import completed: {stats['imported']} imported, "
f"{stats['updated']} updated, {stats['skipped']} skipped"
)
@router.get(
"/vendors/{vendor_id}/import-history/{job_id}/status",
response_model=LetzshopHistoricalImportJobResponse,
)
def get_historical_import_status(
vendor_id: int = Path(..., description="Vendor ID"),
job_id: int = Path(..., description="Import job ID"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Get status of a historical import job.
return {
"success": True,
"message": f"Historical import completed: {stats['imported']} imported, {stats['updated']} updated",
"statistics": stats,
}
Poll this endpoint to track import progress. Returns current phase,
page being fetched, and counts of processed/imported/updated orders.
"""
order_service = get_order_service(db)
job = order_service.get_historical_import_job_by_id(vendor_id, job_id)
except LetzshopClientError as e:
creds_service.update_sync_status(vendor_id, "failed", str(e))
raise ValidationException(f"Letzshop API error: {e}")
if not job:
raise ResourceNotFoundException("HistoricalImportJob", str(job_id))
return LetzshopHistoricalImportJobResponse.model_validate(job)
@router.get(

View File

@@ -613,6 +613,30 @@ async def admin_marketplace_letzshop_page(
)
@router.get(
"/letzshop/orders/{order_id}", response_class=HTMLResponse, include_in_schema=False
)
async def admin_letzshop_order_detail_page(
request: Request,
order_id: int = Path(..., description="Letzshop order ID"),
current_user: User = Depends(get_current_admin_from_cookie_or_header),
db: Session = Depends(get_db),
):
"""
Render detailed Letzshop order page.
Shows full order information with shipping address, billing address,
product details, and order history.
"""
return templates.TemplateResponse(
"admin/letzshop-order-detail.html",
{
"request": request,
"user": current_user,
"order_id": order_id,
},
)
# ============================================================================
# PRODUCT CATALOG ROUTES
# ============================================================================

View File

@@ -8,13 +8,14 @@ architecture rules (API-002: endpoints should not contain business logic).
import logging
from datetime import UTC, datetime
from typing import Any
from typing import Any, Callable
from sqlalchemy import func
from sqlalchemy import String, func
from sqlalchemy.orm import Session
from models.database.letzshop import (
LetzshopFulfillmentQueue,
LetzshopHistoricalImportJob,
LetzshopOrder,
LetzshopSyncLog,
VendorLetzshopCredentials,
@@ -166,6 +167,14 @@ class LetzshopOrderService:
.first()
)
def get_order_by_id(self, order_id: int) -> LetzshopOrder | None:
"""Get a Letzshop order by its database ID."""
return (
self.db.query(LetzshopOrder)
.filter(LetzshopOrder.id == order_id)
.first()
)
def list_orders(
self,
vendor_id: int,
@@ -173,12 +182,26 @@ class LetzshopOrderService:
limit: int = 50,
sync_status: str | None = None,
letzshop_state: str | None = None,
has_declined_items: bool | None = None,
search: str | None = None,
) -> tuple[list[LetzshopOrder], int]:
"""
List Letzshop orders for a vendor.
Args:
vendor_id: Vendor ID to filter by.
skip: Number of records to skip.
limit: Maximum number of records to return.
sync_status: Filter by order sync status (pending, confirmed, etc.)
letzshop_state: Filter by Letzshop shipment state.
has_declined_items: If True, only return orders with at least one
declined/unavailable item (confirmed_unavailable state).
search: Search by order number, customer name, or customer email.
Returns a tuple of (orders, total_count).
"""
from sqlalchemy import or_
query = self.db.query(LetzshopOrder).filter(
LetzshopOrder.vendor_id == vendor_id
)
@@ -188,6 +211,25 @@ class LetzshopOrderService:
if letzshop_state:
query = query.filter(LetzshopOrder.letzshop_state == letzshop_state)
# Search by order number, customer name, or email
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
LetzshopOrder.letzshop_order_number.ilike(search_term),
LetzshopOrder.customer_name.ilike(search_term),
LetzshopOrder.customer_email.ilike(search_term),
)
)
# Filter for orders with declined items (confirmed_unavailable state)
if has_declined_items is True:
# Use JSON contains check for SQLite/PostgreSQL
query = query.filter(
LetzshopOrder.inventory_units.isnot(None),
LetzshopOrder.inventory_units.cast(String).contains("confirmed_unavailable"),
)
total = query.count()
orders = (
query.order_by(LetzshopOrder.created_at.desc())
@@ -203,7 +245,8 @@ class LetzshopOrderService:
Get order counts by sync_status for a vendor.
Returns:
Dict with counts for each status: pending, confirmed, rejected, shipped
Dict with counts for each status: pending, confirmed, rejected, shipped,
and has_declined_items (orders with at least one declined item).
"""
status_counts = (
self.db.query(
@@ -221,6 +264,19 @@ class LetzshopOrderService:
if status in stats:
stats[status] = count
# Count orders with declined items (confirmed_unavailable state)
declined_items_count = (
self.db.query(func.count(LetzshopOrder.id))
.filter(
LetzshopOrder.vendor_id == vendor_id,
LetzshopOrder.inventory_units.isnot(None),
LetzshopOrder.inventory_units.cast(String).contains("confirmed_unavailable"),
)
.scalar()
or 0
)
stats["has_declined_items"] = declined_items_count
return stats
def create_order(
@@ -246,6 +302,20 @@ class LetzshopOrderService:
# Extract customer locale (language preference for invoicing)
customer_locale = order_data.get("locale")
# Extract order date (completedAt from Letzshop)
order_date = None
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
from datetime import datetime
# Handle ISO format with timezone
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
order_date = datetime.fromisoformat(completed_at_str)
except (ValueError, TypeError):
pass # Keep None if parsing fails
# Extract country codes
ship_country = ship_address.get("country", {}) or {}
shipping_country_iso = ship_country.get("iso")
@@ -316,6 +386,7 @@ class LetzshopOrderService:
billing_country_iso=billing_country_iso,
total_amount=total_amount,
currency=currency,
order_date=order_date,
raw_order_data=shipment_data,
inventory_units=enriched_units,
sync_status=sync_status,
@@ -347,6 +418,19 @@ class LetzshopOrderService:
if not order.customer_locale and order_data.get("locale"):
order.customer_locale = order_data.get("locale")
# Update order_date if not already set
if not order.order_date:
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
from datetime import datetime
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
order.order_date = datetime.fromisoformat(completed_at_str)
except (ValueError, TypeError):
pass
# Update country codes if not already set
if not order.shipping_country_iso:
ship_address = order_data.get("shipAddress", {}) or {}
@@ -632,6 +716,7 @@ class LetzshopOrderService:
vendor_id: int,
shipments: list[dict[str, Any]],
match_products: bool = True,
progress_callback: Callable[[int, int, int, int], None] | None = None,
) -> dict[str, Any]:
"""
Import historical shipments into the database.
@@ -640,6 +725,8 @@ class LetzshopOrderService:
vendor_id: Vendor ID to import for.
shipments: List of shipment data from Letzshop API.
match_products: Whether to match EAN to local products.
progress_callback: Optional callback(processed, imported, updated, skipped)
for progress updates during import.
Returns:
Dict with import statistics:
@@ -662,7 +749,7 @@ class LetzshopOrderService:
"eans_not_found": set(),
}
for shipment in shipments:
for i, shipment in enumerate(shipments):
shipment_id = shipment.get("id")
if not shipment_id:
continue
@@ -671,11 +758,13 @@ class LetzshopOrderService:
existing_order = self.get_order_by_shipment_id(vendor_id, shipment_id)
if existing_order:
# Check if we need to update (e.g., state changed)
# Check if we need to update (e.g., state changed or missing data)
shipment_state = shipment.get("state")
needs_update = False
if existing_order.letzshop_state != shipment_state:
self.update_order_from_shipment(existing_order, shipment)
stats["updated"] += 1
needs_update = True
else:
# Also fix sync_status if it's out of sync with letzshop_state
state_mapping = {
@@ -686,9 +775,27 @@ class LetzshopOrderService:
expected_sync_status = state_mapping.get(shipment_state, "confirmed")
if existing_order.sync_status != expected_sync_status:
existing_order.sync_status = expected_sync_status
stats["updated"] += 1
else:
stats["skipped"] += 1
needs_update = True
# Populate order_date if missing (for orders imported before this field existed)
if not existing_order.order_date:
order_data = shipment.get("order", {})
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
from datetime import datetime
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
existing_order.order_date = datetime.fromisoformat(completed_at_str)
needs_update = True
except (ValueError, TypeError):
pass
if needs_update:
stats["updated"] += 1
else:
stats["skipped"] += 1
else:
# Create new order
self.create_order(vendor_id, shipment)
@@ -705,6 +812,15 @@ class LetzshopOrderService:
if ean:
stats["eans_processed"].add(ean)
# Report progress every 10 shipments or at the end
if progress_callback and ((i + 1) % 10 == 0 or i == len(shipments) - 1):
progress_callback(
i + 1,
stats["imported"],
stats["updated"],
stats["skipped"],
)
# Match EANs to local products
if match_products and stats["eans_processed"]:
matched, not_found = self._match_eans_to_products(
@@ -853,3 +969,80 @@ class LetzshopOrderService:
"orders_by_locale": {locale or "unknown": count for locale, count in locale_counts},
"orders_by_country": {country or "unknown": count for country, count in country_counts},
}
# =========================================================================
# Historical Import Job Operations
# =========================================================================
def get_running_historical_import_job(
self,
vendor_id: int,
) -> LetzshopHistoricalImportJob | None:
"""
Get any running historical import job for a vendor.
Args:
vendor_id: Vendor ID to check.
Returns:
Running job or None if no active job.
"""
return (
self.db.query(LetzshopHistoricalImportJob)
.filter(
LetzshopHistoricalImportJob.vendor_id == vendor_id,
LetzshopHistoricalImportJob.status.in_(
["pending", "fetching", "processing"]
),
)
.first()
)
def create_historical_import_job(
self,
vendor_id: int,
user_id: int,
) -> LetzshopHistoricalImportJob:
"""
Create a new historical import job.
Args:
vendor_id: Vendor ID to import for.
user_id: User ID who initiated the import.
Returns:
Created job record.
"""
job = LetzshopHistoricalImportJob(
vendor_id=vendor_id,
user_id=user_id,
status="pending",
)
self.db.add(job)
self.db.commit()
self.db.refresh(job)
return job
def get_historical_import_job_by_id(
self,
vendor_id: int,
job_id: int,
) -> LetzshopHistoricalImportJob | None:
"""
Get a historical import job by ID.
Args:
vendor_id: Vendor ID to verify ownership.
job_id: Job ID to fetch.
Returns:
Job record or None if not found.
"""
return (
self.db.query(LetzshopHistoricalImportJob)
.filter(
LetzshopHistoricalImportJob.id == job_id,
LetzshopHistoricalImportJob.vendor_id == vendor_id,
)
.first()
)