# app/services/letzshop/order_service.py """ Letzshop order service for handling order-related database operations. This service moves database queries out of the API layer to comply with architecture rules (API-002: endpoints should not contain business logic). """ import logging from datetime import UTC, datetime from typing import Any from sqlalchemy import func from sqlalchemy.orm import Session from models.database.letzshop import ( LetzshopFulfillmentQueue, LetzshopOrder, LetzshopSyncLog, VendorLetzshopCredentials, ) from models.database.marketplace_import_job import MarketplaceImportJob from models.database.product import Product from models.database.vendor import Vendor logger = logging.getLogger(__name__) class VendorNotFoundError(Exception): """Raised when a vendor is not found.""" class OrderNotFoundError(Exception): """Raised when a Letzshop order is not found.""" class LetzshopOrderService: """Service for Letzshop order database operations.""" def __init__(self, db: Session): self.db = db # ========================================================================= # Vendor Operations # ========================================================================= def get_vendor(self, vendor_id: int) -> Vendor | None: """Get vendor by ID.""" return self.db.query(Vendor).filter(Vendor.id == vendor_id).first() def get_vendor_or_raise(self, vendor_id: int) -> Vendor: """Get vendor by ID or raise VendorNotFoundError.""" vendor = self.get_vendor(vendor_id) if vendor is None: raise VendorNotFoundError(f"Vendor with ID {vendor_id} not found") return vendor def list_vendors_with_letzshop_status( self, skip: int = 0, limit: int = 100, configured_only: bool = False, ) -> tuple[list[dict[str, Any]], int]: """ List vendors with their Letzshop integration status. Returns a tuple of (vendor_overviews, total_count). """ # Build query query = self.db.query(Vendor).filter(Vendor.is_active == True) # noqa: E712 if configured_only: query = query.join( VendorLetzshopCredentials, Vendor.id == VendorLetzshopCredentials.vendor_id, ) # Get total count total = query.count() # Get vendors vendors = query.order_by(Vendor.name).offset(skip).limit(limit).all() # Build response with Letzshop status vendor_overviews = [] for vendor in vendors: # Get credentials credentials = ( self.db.query(VendorLetzshopCredentials) .filter(VendorLetzshopCredentials.vendor_id == vendor.id) .first() ) # Get order counts pending_orders = 0 total_orders = 0 if credentials: pending_orders = ( self.db.query(func.count(LetzshopOrder.id)) .filter( LetzshopOrder.vendor_id == vendor.id, LetzshopOrder.sync_status == "pending", ) .scalar() or 0 ) total_orders = ( self.db.query(func.count(LetzshopOrder.id)) .filter(LetzshopOrder.vendor_id == vendor.id) .scalar() or 0 ) vendor_overviews.append( { "vendor_id": vendor.id, "vendor_name": vendor.name, "vendor_code": vendor.vendor_code, "is_configured": credentials is not None, "auto_sync_enabled": credentials.auto_sync_enabled if credentials else False, "last_sync_at": credentials.last_sync_at if credentials else None, "last_sync_status": credentials.last_sync_status if credentials else None, "pending_orders": pending_orders, "total_orders": total_orders, } ) return vendor_overviews, total # ========================================================================= # Order Operations # ========================================================================= def get_order(self, vendor_id: int, order_id: int) -> LetzshopOrder | None: """Get a Letzshop order by ID for a specific vendor.""" return ( self.db.query(LetzshopOrder) .filter( LetzshopOrder.id == order_id, LetzshopOrder.vendor_id == vendor_id, ) .first() ) def get_order_or_raise(self, vendor_id: int, order_id: int) -> LetzshopOrder: """Get a Letzshop order or raise OrderNotFoundError.""" order = self.get_order(vendor_id, order_id) if order is None: raise OrderNotFoundError(f"Letzshop order {order_id} not found") return order def get_order_by_shipment_id( self, vendor_id: int, shipment_id: str ) -> LetzshopOrder | None: """Get a Letzshop order by shipment ID.""" return ( self.db.query(LetzshopOrder) .filter( LetzshopOrder.vendor_id == vendor_id, LetzshopOrder.letzshop_shipment_id == shipment_id, ) .first() ) def list_orders( self, vendor_id: int, skip: int = 0, limit: int = 50, sync_status: str | None = None, letzshop_state: str | None = None, ) -> tuple[list[LetzshopOrder], int]: """ List Letzshop orders for a vendor. Returns a tuple of (orders, total_count). """ query = self.db.query(LetzshopOrder).filter( LetzshopOrder.vendor_id == vendor_id ) if sync_status: query = query.filter(LetzshopOrder.sync_status == sync_status) if letzshop_state: query = query.filter(LetzshopOrder.letzshop_state == letzshop_state) total = query.count() orders = ( query.order_by(LetzshopOrder.created_at.desc()) .offset(skip) .limit(limit) .all() ) return orders, total def get_order_stats(self, vendor_id: int) -> dict[str, int]: """ Get order counts by sync_status for a vendor. Returns: Dict with counts for each status: pending, confirmed, rejected, shipped """ status_counts = ( self.db.query( LetzshopOrder.sync_status, func.count(LetzshopOrder.id).label("count"), ) .filter(LetzshopOrder.vendor_id == vendor_id) .group_by(LetzshopOrder.sync_status) .all() ) # Convert to dict with default 0 for missing statuses stats = {"pending": 0, "confirmed": 0, "rejected": 0, "shipped": 0} for status, count in status_counts: if status in stats: stats[status] = count return stats def create_order( self, vendor_id: int, shipment_data: dict[str, Any], ) -> LetzshopOrder: """Create a new Letzshop order from shipment data.""" order_data = shipment_data.get("order", {}) # Handle total - can be a string like "99.99 EUR" or just a number total = order_data.get("total", "") total_amount = str(total) if total else "" # Default currency to EUR (Letzshop is Luxembourg-based) currency = "EUR" # Extract customer name from shipping address ship_address = order_data.get("shipAddress", {}) or {} first_name = ship_address.get("firstName", "") or "" last_name = ship_address.get("lastName", "") or "" customer_name = f"{first_name} {last_name}".strip() or None # Extract customer locale (language preference for invoicing) customer_locale = order_data.get("locale") # Extract country codes ship_country = ship_address.get("country", {}) or {} shipping_country_iso = ship_country.get("iso") bill_address = order_data.get("billAddress", {}) or {} bill_country = bill_address.get("country", {}) or {} billing_country_iso = bill_country.get("iso") # inventoryUnits is a direct array, not wrapped in nodes inventory_units_data = shipment_data.get("inventoryUnits", []) if isinstance(inventory_units_data, dict): # Handle legacy format with nodes wrapper inventory_units_data = inventory_units_data.get("nodes", []) # Extract enriched inventory unit data with product details enriched_units = [] for unit in inventory_units_data: variant = unit.get("variant", {}) or {} product = variant.get("product", {}) or {} trade_id = variant.get("tradeId") or {} product_name = product.get("name", {}) or {} enriched_unit = { "id": unit.get("id"), "state": unit.get("state"), # Product identifiers "ean": trade_id.get("number"), "ean_type": trade_id.get("parser"), "sku": variant.get("sku"), "mpn": variant.get("mpn"), # Product info "product_name": ( product_name.get("en") or product_name.get("fr") or product_name.get("de") ), "product_name_translations": product_name, # Pricing "price": variant.get("price"), "variant_id": variant.get("id"), } enriched_units.append(enriched_unit) # Map Letzshop state to sync_status # Letzshop shipment states (from docs): # - unconfirmed: needs to be confirmed/rejected # - confirmed: at least one product confirmed # - declined: all products rejected # Note: "shipped" is not a state - tracking is set separately via tracking field letzshop_state = shipment_data.get("state", "unconfirmed") state_mapping = { "unconfirmed": "pending", "confirmed": "confirmed", "declined": "rejected", } sync_status = state_mapping.get(letzshop_state, "confirmed") order = LetzshopOrder( vendor_id=vendor_id, letzshop_order_id=order_data.get("id", ""), letzshop_shipment_id=shipment_data["id"], letzshop_order_number=order_data.get("number"), letzshop_state=letzshop_state, customer_email=order_data.get("email"), customer_name=customer_name, customer_locale=customer_locale, shipping_country_iso=shipping_country_iso, billing_country_iso=billing_country_iso, total_amount=total_amount, currency=currency, raw_order_data=shipment_data, inventory_units=enriched_units, sync_status=sync_status, ) self.db.add(order) return order def update_order_from_shipment( self, order: LetzshopOrder, shipment_data: dict[str, Any], ) -> LetzshopOrder: """Update an existing order from shipment data.""" order_data = shipment_data.get("order", {}) # Update letzshop_state and sync_status # Letzshop states: unconfirmed, confirmed, declined letzshop_state = shipment_data.get("state", "unconfirmed") state_mapping = { "unconfirmed": "pending", "confirmed": "confirmed", "declined": "rejected", } order.letzshop_state = letzshop_state order.sync_status = state_mapping.get(letzshop_state, "confirmed") order.raw_order_data = shipment_data # Update locale if not already set if not order.customer_locale and order_data.get("locale"): order.customer_locale = order_data.get("locale") # Update country codes if not already set if not order.shipping_country_iso: ship_address = order_data.get("shipAddress", {}) or {} ship_country = ship_address.get("country", {}) or {} order.shipping_country_iso = ship_country.get("iso") if not order.billing_country_iso: bill_address = order_data.get("billAddress", {}) or {} bill_country = bill_address.get("country", {}) or {} order.billing_country_iso = bill_country.get("iso") # Update enriched inventory units inventory_units_data = shipment_data.get("inventoryUnits", []) if isinstance(inventory_units_data, dict): inventory_units_data = inventory_units_data.get("nodes", []) enriched_units = [] for unit in inventory_units_data: variant = unit.get("variant", {}) or {} product = variant.get("product", {}) or {} trade_id = variant.get("tradeId") or {} product_name = product.get("name", {}) or {} enriched_unit = { "id": unit.get("id"), "state": unit.get("state"), "ean": trade_id.get("number"), "ean_type": trade_id.get("parser"), "sku": variant.get("sku"), "mpn": variant.get("mpn"), "product_name": ( product_name.get("en") or product_name.get("fr") or product_name.get("de") ), "product_name_translations": product_name, "price": variant.get("price"), "variant_id": variant.get("id"), } enriched_units.append(enriched_unit) order.inventory_units = enriched_units return order def mark_order_confirmed(self, order: LetzshopOrder) -> LetzshopOrder: """Mark an order as confirmed.""" order.confirmed_at = datetime.now(UTC) order.sync_status = "confirmed" return order def mark_order_rejected(self, order: LetzshopOrder) -> LetzshopOrder: """Mark an order as rejected.""" order.rejected_at = datetime.now(UTC) order.sync_status = "rejected" return order def update_inventory_unit_state( self, order: LetzshopOrder, item_id: str, state: str ) -> LetzshopOrder: """ Update the state of a single inventory unit in an order. Args: order: The order containing the inventory unit. item_id: The inventory unit ID to update. state: The new state (confirmed_available, confirmed_unavailable). Returns: The updated order. """ if not order.inventory_units: return order # Update the specific item's state updated_units = [] for unit in order.inventory_units: if unit.get("id") == item_id: unit["state"] = state updated_units.append(unit) order.inventory_units = updated_units # Check if all items are now processed and update order status accordingly all_confirmed = all( u.get("state") in ("confirmed_available", "confirmed_unavailable", "returned") for u in updated_units ) if all_confirmed: # Determine order status based on item states has_available = any( u.get("state") == "confirmed_available" for u in updated_units ) all_unavailable = all( u.get("state") == "confirmed_unavailable" for u in updated_units ) if all_unavailable: order.sync_status = "rejected" order.rejected_at = datetime.now(UTC) elif has_available: order.sync_status = "confirmed" order.confirmed_at = datetime.now(UTC) return order def set_order_tracking( self, order: LetzshopOrder, tracking_number: str, tracking_carrier: str, ) -> LetzshopOrder: """Set tracking information for an order.""" order.tracking_number = tracking_number order.tracking_carrier = tracking_carrier order.tracking_set_at = datetime.now(UTC) order.sync_status = "shipped" return order # ========================================================================= # Sync Log Operations # ========================================================================= def list_sync_logs( self, vendor_id: int, skip: int = 0, limit: int = 50, ) -> tuple[list[LetzshopSyncLog], int]: """ List sync logs for a vendor. Returns a tuple of (logs, total_count). """ query = self.db.query(LetzshopSyncLog).filter( LetzshopSyncLog.vendor_id == vendor_id ) total = query.count() logs = ( query.order_by(LetzshopSyncLog.started_at.desc()) .offset(skip) .limit(limit) .all() ) return logs, total # ========================================================================= # Fulfillment Queue Operations # ========================================================================= def list_fulfillment_queue( self, vendor_id: int, skip: int = 0, limit: int = 50, status: str | None = None, ) -> tuple[list[LetzshopFulfillmentQueue], int]: """ List fulfillment queue items for a vendor. Returns a tuple of (items, total_count). """ query = self.db.query(LetzshopFulfillmentQueue).filter( LetzshopFulfillmentQueue.vendor_id == vendor_id ) if status: query = query.filter(LetzshopFulfillmentQueue.status == status) total = query.count() items = ( query.order_by(LetzshopFulfillmentQueue.created_at.desc()) .offset(skip) .limit(limit) .all() ) return items, total # ========================================================================= # Unified Jobs Operations # ========================================================================= def list_letzshop_jobs( self, vendor_id: int, job_type: str | None = None, status: str | None = None, skip: int = 0, limit: int = 20, ) -> tuple[list[dict[str, Any]], int]: """ List unified Letzshop-related jobs for a vendor. Combines product imports from marketplace_import_jobs and order syncs from letzshop_sync_logs. Args: vendor_id: Vendor ID job_type: Filter by type ('import', 'order_sync', or None for all) status: Filter by status skip: Pagination offset limit: Pagination limit Returns: Tuple of (jobs_list, total_count) where jobs_list contains dicts with id, type, status, created_at, started_at, completed_at, records_processed, records_succeeded, records_failed. """ jobs = [] # Product imports from marketplace_import_jobs if job_type in (None, "import"): import_query = self.db.query(MarketplaceImportJob).filter( MarketplaceImportJob.vendor_id == vendor_id, MarketplaceImportJob.marketplace == "Letzshop", ) if status: import_query = import_query.filter( MarketplaceImportJob.status == status ) import_jobs = import_query.order_by( MarketplaceImportJob.created_at.desc() ).all() for job in import_jobs: jobs.append( { "id": job.id, "type": "import", "status": job.status, "created_at": job.created_at, "started_at": job.started_at, "completed_at": job.completed_at, "records_processed": job.total_processed or 0, "records_succeeded": (job.imported_count or 0) + (job.updated_count or 0), "records_failed": job.error_count or 0, } ) # Order syncs from letzshop_sync_logs if job_type in (None, "order_sync"): sync_query = self.db.query(LetzshopSyncLog).filter( LetzshopSyncLog.vendor_id == vendor_id, LetzshopSyncLog.operation_type == "order_import", ) if status: sync_query = sync_query.filter(LetzshopSyncLog.status == status) sync_logs = sync_query.order_by(LetzshopSyncLog.created_at.desc()).all() for log in sync_logs: jobs.append( { "id": log.id, "type": "order_sync", "status": log.status, "created_at": log.created_at, "started_at": log.started_at, "completed_at": log.completed_at, "records_processed": log.records_processed or 0, "records_succeeded": log.records_succeeded or 0, "records_failed": log.records_failed or 0, } ) # Sort all jobs by created_at descending jobs.sort(key=lambda x: x["created_at"], reverse=True) # Get total count and apply pagination total = len(jobs) jobs = jobs[skip : skip + limit] return jobs, total # ========================================================================= # Historical Import Operations # ========================================================================= def import_historical_shipments( self, vendor_id: int, shipments: list[dict[str, Any]], match_products: bool = True, ) -> dict[str, Any]: """ Import historical shipments into the database. Args: vendor_id: Vendor ID to import for. shipments: List of shipment data from Letzshop API. match_products: Whether to match EAN to local products. Returns: Dict with import statistics: - total: Total shipments processed - imported: New orders created - updated: Existing orders updated - skipped: Already up-to-date orders - products_matched: Products matched by EAN - products_not_found: Products not found in local catalog """ stats = { "total": len(shipments), "imported": 0, "updated": 0, "skipped": 0, "products_matched": 0, "products_not_found": 0, "eans_processed": set(), "eans_matched": set(), "eans_not_found": set(), } for shipment in shipments: shipment_id = shipment.get("id") if not shipment_id: continue # Check if order already exists existing_order = self.get_order_by_shipment_id(vendor_id, shipment_id) if existing_order: # Check if we need to update (e.g., state changed) shipment_state = shipment.get("state") if existing_order.letzshop_state != shipment_state: self.update_order_from_shipment(existing_order, shipment) stats["updated"] += 1 else: # Also fix sync_status if it's out of sync with letzshop_state state_mapping = { "unconfirmed": "pending", "confirmed": "confirmed", "declined": "rejected", } expected_sync_status = state_mapping.get(shipment_state, "confirmed") if existing_order.sync_status != expected_sync_status: existing_order.sync_status = expected_sync_status stats["updated"] += 1 else: stats["skipped"] += 1 else: # Create new order self.create_order(vendor_id, shipment) stats["imported"] += 1 # Process EANs for matching if match_products: inventory_units = shipment.get("inventoryUnits", []) for unit in inventory_units: variant = unit.get("variant", {}) or {} trade_id = variant.get("tradeId") or {} ean = trade_id.get("number") if ean: stats["eans_processed"].add(ean) # Match EANs to local products if match_products and stats["eans_processed"]: matched, not_found = self._match_eans_to_products( vendor_id, list(stats["eans_processed"]) ) stats["eans_matched"] = matched stats["eans_not_found"] = not_found stats["products_matched"] = len(matched) stats["products_not_found"] = len(not_found) # Convert sets to lists for JSON serialization stats["eans_processed"] = list(stats["eans_processed"]) stats["eans_matched"] = list(stats["eans_matched"]) stats["eans_not_found"] = list(stats["eans_not_found"]) return stats def _match_eans_to_products( self, vendor_id: int, eans: list[str], ) -> tuple[set[str], set[str]]: """ Match EAN codes to local products. Args: vendor_id: Vendor ID to search products for. eans: List of EAN codes to match. Returns: Tuple of (matched_eans, not_found_eans). """ if not eans: return set(), set() # Query products by GTIN for this vendor products = ( self.db.query(Product) .filter( Product.vendor_id == vendor_id, Product.gtin.in_(eans), ) .all() ) matched_eans = {p.gtin for p in products if p.gtin} not_found_eans = set(eans) - matched_eans logger.info( f"EAN matching: {len(matched_eans)} matched, {len(not_found_eans)} not found" ) return matched_eans, not_found_eans def get_products_by_eans( self, vendor_id: int, eans: list[str], ) -> dict[str, Product]: """ Get products by their EAN codes. Args: vendor_id: Vendor ID to search products for. eans: List of EAN codes to search. Returns: Dict mapping EAN to Product. """ if not eans: return {} products = ( self.db.query(Product) .filter( Product.vendor_id == vendor_id, Product.gtin.in_(eans), ) .all() ) return {p.gtin: p for p in products if p.gtin} def get_historical_import_summary( self, vendor_id: int, ) -> dict[str, Any]: """ Get summary of historical order data for a vendor. Returns: Dict with summary statistics. """ # Count orders by state order_counts = ( self.db.query( LetzshopOrder.letzshop_state, func.count(LetzshopOrder.id).label("count"), ) .filter(LetzshopOrder.vendor_id == vendor_id) .group_by(LetzshopOrder.letzshop_state) .all() ) # Count orders by locale locale_counts = ( self.db.query( LetzshopOrder.customer_locale, func.count(LetzshopOrder.id).label("count"), ) .filter(LetzshopOrder.vendor_id == vendor_id) .group_by(LetzshopOrder.customer_locale) .all() ) # Count orders by country country_counts = ( self.db.query( LetzshopOrder.shipping_country_iso, func.count(LetzshopOrder.id).label("count"), ) .filter(LetzshopOrder.vendor_id == vendor_id) .group_by(LetzshopOrder.shipping_country_iso) .all() ) # Total revenue total_orders = ( self.db.query(func.count(LetzshopOrder.id)) .filter(LetzshopOrder.vendor_id == vendor_id) .scalar() or 0 ) # Unique customers unique_customers = ( self.db.query(func.count(func.distinct(LetzshopOrder.customer_email))) .filter(LetzshopOrder.vendor_id == vendor_id) .scalar() or 0 ) return { "total_orders": total_orders, "unique_customers": unique_customers, "orders_by_state": {state: count for state, count in order_counts}, "orders_by_locale": {locale or "unknown": count for locale, count in locale_counts}, "orders_by_country": {country or "unknown": count for country, count in country_counts}, }