diff --git a/app/api/v1/admin/letzshop.py b/app/api/v1/admin/letzshop.py index b6a1488c..5afad2a5 100644 --- a/app/api/v1/admin/letzshop.py +++ b/app/api/v1/admin/letzshop.py @@ -11,7 +11,7 @@ Provides admin-level management of: import logging -from fastapi import APIRouter, Depends, Path, Query +from fastapi import APIRouter, BackgroundTasks, Depends, Path, Query from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api @@ -25,6 +25,7 @@ from app.services.letzshop import ( OrderNotFoundError, VendorNotFoundError, ) +from app.tasks.letzshop_tasks import process_historical_import from models.database.user import User from models.schema.letzshop import ( FulfillmentOperationResponse, @@ -33,8 +34,11 @@ from models.schema.letzshop import ( LetzshopCredentialsCreate, LetzshopCredentialsResponse, LetzshopCredentialsUpdate, + LetzshopHistoricalImportJobResponse, + LetzshopHistoricalImportStartResponse, LetzshopJobItem, LetzshopJobsListResponse, + LetzshopOrderDetailResponse, LetzshopOrderListResponse, LetzshopOrderResponse, LetzshopSuccessResponse, @@ -345,6 +349,12 @@ def list_vendor_letzshop_orders( skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), sync_status: str | None = Query(None, description="Filter by sync status"), + has_declined_items: bool | None = Query( + None, description="Filter orders with declined/unavailable items" + ), + search: str | None = Query( + None, description="Search by order number, customer name, or email" + ), db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): @@ -361,6 +371,8 @@ def list_vendor_letzshop_orders( skip=skip, limit=limit, sync_status=sync_status, + has_declined_items=has_declined_items, + search=search, ) # Get order stats for all statuses @@ -377,6 +389,9 @@ def list_vendor_letzshop_orders( letzshop_state=order.letzshop_state, customer_email=order.customer_email, customer_name=order.customer_name, + customer_locale=order.customer_locale, + shipping_country_iso=order.shipping_country_iso, + billing_country_iso=order.billing_country_iso, total_amount=order.total_amount, currency=order.currency, local_order_id=order.local_order_id, @@ -389,6 +404,7 @@ def list_vendor_letzshop_orders( tracking_number=order.tracking_number, tracking_carrier=order.tracking_carrier, inventory_units=order.inventory_units, + order_date=order.order_date, created_at=order.created_at, updated_at=order.updated_at, ) @@ -401,6 +417,53 @@ def list_vendor_letzshop_orders( ) +@router.get( + "/orders/{order_id}", + response_model=LetzshopOrderDetailResponse, +) +def get_letzshop_order_detail( + order_id: int = Path(..., description="Letzshop order ID"), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """Get detailed information for a single Letzshop order.""" + order_service = get_order_service(db) + + order = order_service.get_order_by_id(order_id) + if not order: + raise ResourceNotFoundException("Order", str(order_id)) + + return LetzshopOrderDetailResponse( + id=order.id, + vendor_id=order.vendor_id, + letzshop_order_id=order.letzshop_order_id, + letzshop_shipment_id=order.letzshop_shipment_id, + letzshop_order_number=order.letzshop_order_number, + letzshop_state=order.letzshop_state, + customer_email=order.customer_email, + customer_name=order.customer_name, + customer_locale=order.customer_locale, + shipping_country_iso=order.shipping_country_iso, + billing_country_iso=order.billing_country_iso, + total_amount=order.total_amount, + currency=order.currency, + local_order_id=order.local_order_id, + sync_status=order.sync_status, + last_synced_at=order.last_synced_at, + sync_error=order.sync_error, + confirmed_at=order.confirmed_at, + rejected_at=order.rejected_at, + tracking_set_at=order.tracking_set_at, + tracking_number=order.tracking_number, + tracking_carrier=order.tracking_carrier, + inventory_units=order.inventory_units, + order_date=order.order_date, + created_at=order.created_at, + updated_at=order.updated_at, + raw_order_data=order.raw_order_data, + ) + + @router.post( "/vendors/{vendor_id}/sync", response_model=LetzshopSyncTriggerResponse, @@ -543,22 +606,21 @@ def list_vendor_letzshop_jobs( @router.post( "/vendors/{vendor_id}/import-history", + response_model=LetzshopHistoricalImportStartResponse, ) -def import_historical_orders( +def start_historical_import( vendor_id: int = Path(..., description="Vendor ID"), - state: str = Query("confirmed", description="Shipment state to import"), - max_pages: int | None = Query(None, ge=1, le=100, description="Max pages to fetch"), - match_products: bool = Query(True, description="Match EANs to local products"), + background_tasks: BackgroundTasks = None, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): """ - Import historical orders from Letzshop. + Start historical order import from Letzshop as a background job. - Fetches all shipments with the specified state (default: confirmed) - and imports them into the database. Supports pagination and EAN matching. + Creates a job that imports both confirmed and declined orders with + real-time progress tracking. Poll the status endpoint to track progress. - Returns statistics on imported/updated/skipped orders and product matching. + Returns a job_id for polling the status endpoint. """ order_service = get_order_service(db) creds_service = get_credentials_service(db) @@ -576,51 +638,55 @@ def import_historical_orders( f"Letzshop credentials not configured for vendor {vendor.name}" ) - # Fetch all shipments with pagination - try: - with creds_service.create_client(vendor_id) as client: - logger.info( - f"Starting historical import for vendor {vendor_id}, state={state}, max_pages={max_pages}" - ) + # Check if there's already a running import for this vendor + existing_job = order_service.get_running_historical_import_job(vendor_id) + if existing_job: + raise ValidationException( + f"Historical import already in progress (job_id={existing_job.id})" + ) - shipments = client.get_all_shipments_paginated( - state=state, - page_size=50, - max_pages=max_pages, - ) + # Create job record + job = order_service.create_historical_import_job(vendor_id, current_admin.id) - logger.info(f"Fetched {len(shipments)} {state} shipments from Letzshop") + logger.info(f"Created historical import job {job.id} for vendor {vendor_id}") - # Import shipments - stats = order_service.import_historical_shipments( - vendor_id=vendor_id, - shipments=shipments, - match_products=match_products, - ) + # Queue background task + background_tasks.add_task( + process_historical_import, + job.id, + vendor_id, + ) - db.commit() + return LetzshopHistoricalImportStartResponse( + job_id=job.id, + status="pending", + message="Historical import job started", + ) - # Update sync status - creds_service.update_sync_status( - vendor_id, - "success", - None, - ) - logger.info( - f"Historical import completed: {stats['imported']} imported, " - f"{stats['updated']} updated, {stats['skipped']} skipped" - ) +@router.get( + "/vendors/{vendor_id}/import-history/{job_id}/status", + response_model=LetzshopHistoricalImportJobResponse, +) +def get_historical_import_status( + vendor_id: int = Path(..., description="Vendor ID"), + job_id: int = Path(..., description="Import job ID"), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """ + Get status of a historical import job. - return { - "success": True, - "message": f"Historical import completed: {stats['imported']} imported, {stats['updated']} updated", - "statistics": stats, - } + Poll this endpoint to track import progress. Returns current phase, + page being fetched, and counts of processed/imported/updated orders. + """ + order_service = get_order_service(db) + job = order_service.get_historical_import_job_by_id(vendor_id, job_id) - except LetzshopClientError as e: - creds_service.update_sync_status(vendor_id, "failed", str(e)) - raise ValidationException(f"Letzshop API error: {e}") + if not job: + raise ResourceNotFoundException("HistoricalImportJob", str(job_id)) + + return LetzshopHistoricalImportJobResponse.model_validate(job) @router.get( diff --git a/app/routes/admin_pages.py b/app/routes/admin_pages.py index 0c08fe1d..af6c5f11 100644 --- a/app/routes/admin_pages.py +++ b/app/routes/admin_pages.py @@ -613,6 +613,30 @@ async def admin_marketplace_letzshop_page( ) +@router.get( + "/letzshop/orders/{order_id}", response_class=HTMLResponse, include_in_schema=False +) +async def admin_letzshop_order_detail_page( + request: Request, + order_id: int = Path(..., description="Letzshop order ID"), + current_user: User = Depends(get_current_admin_from_cookie_or_header), + db: Session = Depends(get_db), +): + """ + Render detailed Letzshop order page. + Shows full order information with shipping address, billing address, + product details, and order history. + """ + return templates.TemplateResponse( + "admin/letzshop-order-detail.html", + { + "request": request, + "user": current_user, + "order_id": order_id, + }, + ) + + # ============================================================================ # PRODUCT CATALOG ROUTES # ============================================================================ diff --git a/app/services/letzshop/order_service.py b/app/services/letzshop/order_service.py index 486a0272..cf74fe4a 100644 --- a/app/services/letzshop/order_service.py +++ b/app/services/letzshop/order_service.py @@ -8,13 +8,14 @@ architecture rules (API-002: endpoints should not contain business logic). import logging from datetime import UTC, datetime -from typing import Any +from typing import Any, Callable -from sqlalchemy import func +from sqlalchemy import String, func from sqlalchemy.orm import Session from models.database.letzshop import ( LetzshopFulfillmentQueue, + LetzshopHistoricalImportJob, LetzshopOrder, LetzshopSyncLog, VendorLetzshopCredentials, @@ -166,6 +167,14 @@ class LetzshopOrderService: .first() ) + def get_order_by_id(self, order_id: int) -> LetzshopOrder | None: + """Get a Letzshop order by its database ID.""" + return ( + self.db.query(LetzshopOrder) + .filter(LetzshopOrder.id == order_id) + .first() + ) + def list_orders( self, vendor_id: int, @@ -173,12 +182,26 @@ class LetzshopOrderService: limit: int = 50, sync_status: str | None = None, letzshop_state: str | None = None, + has_declined_items: bool | None = None, + search: str | None = None, ) -> tuple[list[LetzshopOrder], int]: """ List Letzshop orders for a vendor. + Args: + vendor_id: Vendor ID to filter by. + skip: Number of records to skip. + limit: Maximum number of records to return. + sync_status: Filter by order sync status (pending, confirmed, etc.) + letzshop_state: Filter by Letzshop shipment state. + has_declined_items: If True, only return orders with at least one + declined/unavailable item (confirmed_unavailable state). + search: Search by order number, customer name, or customer email. + Returns a tuple of (orders, total_count). """ + from sqlalchemy import or_ + query = self.db.query(LetzshopOrder).filter( LetzshopOrder.vendor_id == vendor_id ) @@ -188,6 +211,25 @@ class LetzshopOrderService: if letzshop_state: query = query.filter(LetzshopOrder.letzshop_state == letzshop_state) + # Search by order number, customer name, or email + if search: + search_term = f"%{search}%" + query = query.filter( + or_( + LetzshopOrder.letzshop_order_number.ilike(search_term), + LetzshopOrder.customer_name.ilike(search_term), + LetzshopOrder.customer_email.ilike(search_term), + ) + ) + + # Filter for orders with declined items (confirmed_unavailable state) + if has_declined_items is True: + # Use JSON contains check for SQLite/PostgreSQL + query = query.filter( + LetzshopOrder.inventory_units.isnot(None), + LetzshopOrder.inventory_units.cast(String).contains("confirmed_unavailable"), + ) + total = query.count() orders = ( query.order_by(LetzshopOrder.created_at.desc()) @@ -203,7 +245,8 @@ class LetzshopOrderService: Get order counts by sync_status for a vendor. Returns: - Dict with counts for each status: pending, confirmed, rejected, shipped + Dict with counts for each status: pending, confirmed, rejected, shipped, + and has_declined_items (orders with at least one declined item). """ status_counts = ( self.db.query( @@ -221,6 +264,19 @@ class LetzshopOrderService: if status in stats: stats[status] = count + # Count orders with declined items (confirmed_unavailable state) + declined_items_count = ( + self.db.query(func.count(LetzshopOrder.id)) + .filter( + LetzshopOrder.vendor_id == vendor_id, + LetzshopOrder.inventory_units.isnot(None), + LetzshopOrder.inventory_units.cast(String).contains("confirmed_unavailable"), + ) + .scalar() + or 0 + ) + stats["has_declined_items"] = declined_items_count + return stats def create_order( @@ -246,6 +302,20 @@ class LetzshopOrderService: # Extract customer locale (language preference for invoicing) customer_locale = order_data.get("locale") + # Extract order date (completedAt from Letzshop) + order_date = None + completed_at_str = order_data.get("completedAt") + if completed_at_str: + try: + from datetime import datetime + + # Handle ISO format with timezone + if completed_at_str.endswith("Z"): + completed_at_str = completed_at_str[:-1] + "+00:00" + order_date = datetime.fromisoformat(completed_at_str) + except (ValueError, TypeError): + pass # Keep None if parsing fails + # Extract country codes ship_country = ship_address.get("country", {}) or {} shipping_country_iso = ship_country.get("iso") @@ -316,6 +386,7 @@ class LetzshopOrderService: billing_country_iso=billing_country_iso, total_amount=total_amount, currency=currency, + order_date=order_date, raw_order_data=shipment_data, inventory_units=enriched_units, sync_status=sync_status, @@ -347,6 +418,19 @@ class LetzshopOrderService: if not order.customer_locale and order_data.get("locale"): order.customer_locale = order_data.get("locale") + # Update order_date if not already set + if not order.order_date: + completed_at_str = order_data.get("completedAt") + if completed_at_str: + try: + from datetime import datetime + + if completed_at_str.endswith("Z"): + completed_at_str = completed_at_str[:-1] + "+00:00" + order.order_date = datetime.fromisoformat(completed_at_str) + except (ValueError, TypeError): + pass + # Update country codes if not already set if not order.shipping_country_iso: ship_address = order_data.get("shipAddress", {}) or {} @@ -632,6 +716,7 @@ class LetzshopOrderService: vendor_id: int, shipments: list[dict[str, Any]], match_products: bool = True, + progress_callback: Callable[[int, int, int, int], None] | None = None, ) -> dict[str, Any]: """ Import historical shipments into the database. @@ -640,6 +725,8 @@ class LetzshopOrderService: vendor_id: Vendor ID to import for. shipments: List of shipment data from Letzshop API. match_products: Whether to match EAN to local products. + progress_callback: Optional callback(processed, imported, updated, skipped) + for progress updates during import. Returns: Dict with import statistics: @@ -662,7 +749,7 @@ class LetzshopOrderService: "eans_not_found": set(), } - for shipment in shipments: + for i, shipment in enumerate(shipments): shipment_id = shipment.get("id") if not shipment_id: continue @@ -671,11 +758,13 @@ class LetzshopOrderService: existing_order = self.get_order_by_shipment_id(vendor_id, shipment_id) if existing_order: - # Check if we need to update (e.g., state changed) + # Check if we need to update (e.g., state changed or missing data) shipment_state = shipment.get("state") + needs_update = False + if existing_order.letzshop_state != shipment_state: self.update_order_from_shipment(existing_order, shipment) - stats["updated"] += 1 + needs_update = True else: # Also fix sync_status if it's out of sync with letzshop_state state_mapping = { @@ -686,9 +775,27 @@ class LetzshopOrderService: expected_sync_status = state_mapping.get(shipment_state, "confirmed") if existing_order.sync_status != expected_sync_status: existing_order.sync_status = expected_sync_status - stats["updated"] += 1 - else: - stats["skipped"] += 1 + needs_update = True + + # Populate order_date if missing (for orders imported before this field existed) + if not existing_order.order_date: + order_data = shipment.get("order", {}) + completed_at_str = order_data.get("completedAt") + if completed_at_str: + try: + from datetime import datetime + + if completed_at_str.endswith("Z"): + completed_at_str = completed_at_str[:-1] + "+00:00" + existing_order.order_date = datetime.fromisoformat(completed_at_str) + needs_update = True + except (ValueError, TypeError): + pass + + if needs_update: + stats["updated"] += 1 + else: + stats["skipped"] += 1 else: # Create new order self.create_order(vendor_id, shipment) @@ -705,6 +812,15 @@ class LetzshopOrderService: if ean: stats["eans_processed"].add(ean) + # Report progress every 10 shipments or at the end + if progress_callback and ((i + 1) % 10 == 0 or i == len(shipments) - 1): + progress_callback( + i + 1, + stats["imported"], + stats["updated"], + stats["skipped"], + ) + # Match EANs to local products if match_products and stats["eans_processed"]: matched, not_found = self._match_eans_to_products( @@ -853,3 +969,80 @@ class LetzshopOrderService: "orders_by_locale": {locale or "unknown": count for locale, count in locale_counts}, "orders_by_country": {country or "unknown": count for country, count in country_counts}, } + + # ========================================================================= + # Historical Import Job Operations + # ========================================================================= + + def get_running_historical_import_job( + self, + vendor_id: int, + ) -> LetzshopHistoricalImportJob | None: + """ + Get any running historical import job for a vendor. + + Args: + vendor_id: Vendor ID to check. + + Returns: + Running job or None if no active job. + """ + return ( + self.db.query(LetzshopHistoricalImportJob) + .filter( + LetzshopHistoricalImportJob.vendor_id == vendor_id, + LetzshopHistoricalImportJob.status.in_( + ["pending", "fetching", "processing"] + ), + ) + .first() + ) + + def create_historical_import_job( + self, + vendor_id: int, + user_id: int, + ) -> LetzshopHistoricalImportJob: + """ + Create a new historical import job. + + Args: + vendor_id: Vendor ID to import for. + user_id: User ID who initiated the import. + + Returns: + Created job record. + """ + job = LetzshopHistoricalImportJob( + vendor_id=vendor_id, + user_id=user_id, + status="pending", + ) + self.db.add(job) + self.db.commit() + self.db.refresh(job) + return job + + def get_historical_import_job_by_id( + self, + vendor_id: int, + job_id: int, + ) -> LetzshopHistoricalImportJob | None: + """ + Get a historical import job by ID. + + Args: + vendor_id: Vendor ID to verify ownership. + job_id: Job ID to fetch. + + Returns: + Job record or None if not found. + """ + return ( + self.db.query(LetzshopHistoricalImportJob) + .filter( + LetzshopHistoricalImportJob.id == job_id, + LetzshopHistoricalImportJob.vendor_id == vendor_id, + ) + .first() + )