Files
orion/app/services/order_inventory_service.py
Samir Boulahtit 5a3f2bce57 feat: add partial shipment support (Phase 3)
- Add shipped_quantity field to OrderItem for tracking partial fulfillment
- Add partially_shipped order status for orders with partial shipments
- Add fulfill_item method for shipping individual items with quantities
- Add get_shipment_status method for detailed shipment tracking
- Add vendor API endpoints for partial shipment operations:
  - GET /orders/{id}/shipment-status - Get item-level shipment status
  - POST /orders/{id}/items/{item_id}/ship - Ship specific item quantity
- Automatic status updates: partially_shipped when some items shipped,
  shipped when all items fully shipped
- Migration to add shipped_quantity column with upgrade for existing data
- Update documentation with partial shipment usage examples

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-01 18:28:54 +01:00

736 lines
25 KiB
Python

# app/services/order_inventory_service.py
"""
Order-Inventory Integration Service.
This service orchestrates inventory operations for orders:
- Reserve inventory when orders are confirmed
- Fulfill (deduct) inventory when orders are shipped
- Release reservations when orders are cancelled
This is the critical link between the order and inventory systems
that ensures stock accuracy.
All operations are logged to the inventory_transactions table for audit trail.
"""
import logging
from sqlalchemy.orm import Session
from app.exceptions import (
InsufficientInventoryException,
InventoryNotFoundException,
OrderNotFoundException,
ValidationException,
)
from app.services.inventory_service import inventory_service
from models.database.inventory import Inventory
from models.database.inventory_transaction import InventoryTransaction, TransactionType
from models.database.order import Order, OrderItem
from models.schema.inventory import InventoryReserve
logger = logging.getLogger(__name__)
# Default location for inventory operations
DEFAULT_LOCATION = "DEFAULT"
class OrderInventoryService:
"""
Orchestrate order and inventory operations together.
This service ensures that:
1. When orders are confirmed, inventory is reserved
2. When orders are shipped, inventory is fulfilled (deducted)
3. When orders are cancelled, reservations are released
Note: Letzshop orders with unmatched products (placeholder) skip
inventory operations for those items.
"""
def get_order_with_items(
self, db: Session, vendor_id: int, order_id: int
) -> Order:
"""Get order with items or raise OrderNotFoundException."""
order = (
db.query(Order)
.filter(Order.id == order_id, Order.vendor_id == vendor_id)
.first()
)
if not order:
raise OrderNotFoundException(f"Order {order_id} not found")
return order
def _find_inventory_location(
self, db: Session, product_id: int, vendor_id: int
) -> str | None:
"""
Find the location with available inventory for a product.
Returns the first location with available quantity, or None if no
inventory exists.
"""
inventory = (
db.query(Inventory)
.filter(
Inventory.product_id == product_id,
Inventory.vendor_id == vendor_id,
Inventory.quantity > Inventory.reserved_quantity,
)
.first()
)
return inventory.location if inventory else None
def _is_placeholder_product(self, order_item: OrderItem) -> bool:
"""Check if the order item uses a placeholder product."""
if not order_item.product:
return True
# Check if it's the placeholder product (GTIN 0000000000000)
return order_item.product.gtin == "0000000000000"
def _log_transaction(
self,
db: Session,
vendor_id: int,
product_id: int,
inventory: Inventory,
transaction_type: TransactionType,
quantity_change: int,
order: Order,
reason: str | None = None,
) -> InventoryTransaction:
"""
Create an inventory transaction record for audit trail.
Args:
db: Database session
vendor_id: Vendor ID
product_id: Product ID
inventory: Inventory record after the operation
transaction_type: Type of transaction
quantity_change: Change in quantity (positive = add, negative = remove)
order: Order associated with this transaction
reason: Optional reason for the transaction
Returns:
Created InventoryTransaction
"""
transaction = InventoryTransaction.create_transaction(
vendor_id=vendor_id,
product_id=product_id,
inventory_id=inventory.id if inventory else None,
transaction_type=transaction_type,
quantity_change=quantity_change,
quantity_after=inventory.quantity if inventory else 0,
reserved_after=inventory.reserved_quantity if inventory else 0,
location=inventory.location if inventory else None,
warehouse=inventory.warehouse if inventory else None,
order_id=order.id,
order_number=order.order_number,
reason=reason,
created_by="system",
)
db.add(transaction)
return transaction
def reserve_for_order(
self,
db: Session,
vendor_id: int,
order_id: int,
skip_missing: bool = True,
) -> dict:
"""
Reserve inventory for all items in an order.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
skip_missing: If True, skip items without inventory instead of failing
Returns:
Dict with reserved count and any skipped items
Raises:
InsufficientInventoryException: If skip_missing=False and inventory unavailable
"""
order = self.get_order_with_items(db, vendor_id, order_id)
reserved_count = 0
skipped_items = []
for item in order.items:
# Skip placeholder products
if self._is_placeholder_product(item):
skipped_items.append({
"item_id": item.id,
"reason": "placeholder_product",
})
continue
# Find inventory location
location = self._find_inventory_location(db, item.product_id, vendor_id)
if not location:
if skip_missing:
skipped_items.append({
"item_id": item.id,
"product_id": item.product_id,
"reason": "no_inventory",
})
continue
else:
raise InventoryNotFoundException(
f"No inventory found for product {item.product_id}"
)
try:
reserve_data = InventoryReserve(
product_id=item.product_id,
location=location,
quantity=item.quantity,
)
updated_inventory = inventory_service.reserve_inventory(
db, vendor_id, reserve_data
)
reserved_count += 1
# Log transaction for audit trail
self._log_transaction(
db=db,
vendor_id=vendor_id,
product_id=item.product_id,
inventory=updated_inventory,
transaction_type=TransactionType.RESERVE,
quantity_change=0, # Reserve doesn't change quantity, only reserved_quantity
order=order,
reason=f"Reserved for order {order.order_number}",
)
logger.info(
f"Reserved {item.quantity} units of product {item.product_id} "
f"for order {order.order_number}"
)
except InsufficientInventoryException:
if skip_missing:
skipped_items.append({
"item_id": item.id,
"product_id": item.product_id,
"reason": "insufficient_inventory",
})
else:
raise
logger.info(
f"Order {order.order_number}: reserved {reserved_count} items, "
f"skipped {len(skipped_items)}"
)
return {
"order_id": order_id,
"order_number": order.order_number,
"reserved_count": reserved_count,
"skipped_items": skipped_items,
}
def fulfill_order(
self,
db: Session,
vendor_id: int,
order_id: int,
skip_missing: bool = True,
) -> dict:
"""
Fulfill (deduct) inventory when an order is shipped.
This decreases both the total quantity and reserved quantity,
effectively consuming the reserved stock.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
skip_missing: If True, skip items without inventory
Returns:
Dict with fulfilled count and any skipped items
"""
order = self.get_order_with_items(db, vendor_id, order_id)
fulfilled_count = 0
skipped_items = []
for item in order.items:
# Skip already fully shipped items
if item.is_fully_shipped:
continue
# Skip placeholder products
if self._is_placeholder_product(item):
skipped_items.append({
"item_id": item.id,
"reason": "placeholder_product",
})
continue
# Only fulfill remaining quantity
quantity_to_fulfill = item.remaining_quantity
# Find inventory location
location = self._find_inventory_location(db, item.product_id, vendor_id)
# Also check for inventory with reserved quantity
if not location:
inventory = (
db.query(Inventory)
.filter(
Inventory.product_id == item.product_id,
Inventory.vendor_id == vendor_id,
)
.first()
)
if inventory:
location = inventory.location
if not location:
if skip_missing:
skipped_items.append({
"item_id": item.id,
"product_id": item.product_id,
"reason": "no_inventory",
})
continue
else:
raise InventoryNotFoundException(
f"No inventory found for product {item.product_id}"
)
try:
reserve_data = InventoryReserve(
product_id=item.product_id,
location=location,
quantity=quantity_to_fulfill,
)
updated_inventory = inventory_service.fulfill_reservation(
db, vendor_id, reserve_data
)
fulfilled_count += 1
# Update item shipped quantity
item.shipped_quantity = item.quantity
item.inventory_fulfilled = True
# Log transaction for audit trail
self._log_transaction(
db=db,
vendor_id=vendor_id,
product_id=item.product_id,
inventory=updated_inventory,
transaction_type=TransactionType.FULFILL,
quantity_change=-quantity_to_fulfill, # Negative because stock is consumed
order=order,
reason=f"Fulfilled for order {order.order_number}",
)
logger.info(
f"Fulfilled {quantity_to_fulfill} units of product {item.product_id} "
f"for order {order.order_number}"
)
except (InsufficientInventoryException, InventoryNotFoundException) as e:
if skip_missing:
skipped_items.append({
"item_id": item.id,
"product_id": item.product_id,
"reason": str(e),
})
else:
raise
logger.info(
f"Order {order.order_number}: fulfilled {fulfilled_count} items, "
f"skipped {len(skipped_items)}"
)
return {
"order_id": order_id,
"order_number": order.order_number,
"fulfilled_count": fulfilled_count,
"skipped_items": skipped_items,
}
def fulfill_item(
self,
db: Session,
vendor_id: int,
order_id: int,
item_id: int,
quantity: int | None = None,
skip_missing: bool = True,
) -> dict:
"""
Fulfill (deduct) inventory for a specific order item.
Supports partial fulfillment - ship some units now, rest later.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
item_id: Order item ID
quantity: Quantity to ship (defaults to remaining quantity)
skip_missing: If True, skip if inventory not found
Returns:
Dict with fulfillment result
Raises:
ValidationException: If quantity exceeds remaining
"""
order = self.get_order_with_items(db, vendor_id, order_id)
# Find the item
item = None
for order_item in order.items:
if order_item.id == item_id:
item = order_item
break
if not item:
raise ValidationException(f"Item {item_id} not found in order {order_id}")
# Check if already fully shipped
if item.is_fully_shipped:
return {
"order_id": order_id,
"item_id": item_id,
"fulfilled_quantity": 0,
"message": "Item already fully shipped",
}
# Default to remaining quantity
quantity_to_fulfill = quantity or item.remaining_quantity
# Validate quantity
if quantity_to_fulfill > item.remaining_quantity:
raise ValidationException(
f"Cannot ship {quantity_to_fulfill} units - only {item.remaining_quantity} remaining"
)
if quantity_to_fulfill <= 0:
return {
"order_id": order_id,
"item_id": item_id,
"fulfilled_quantity": 0,
"message": "Nothing to fulfill",
}
# Skip placeholder products
if self._is_placeholder_product(item):
return {
"order_id": order_id,
"item_id": item_id,
"fulfilled_quantity": 0,
"message": "Placeholder product - skipped",
}
# Find inventory location
location = self._find_inventory_location(db, item.product_id, vendor_id)
if not location:
inventory = (
db.query(Inventory)
.filter(
Inventory.product_id == item.product_id,
Inventory.vendor_id == vendor_id,
)
.first()
)
if inventory:
location = inventory.location
if not location:
if skip_missing:
return {
"order_id": order_id,
"item_id": item_id,
"fulfilled_quantity": 0,
"message": "No inventory found",
}
else:
raise InventoryNotFoundException(
f"No inventory found for product {item.product_id}"
)
try:
reserve_data = InventoryReserve(
product_id=item.product_id,
location=location,
quantity=quantity_to_fulfill,
)
updated_inventory = inventory_service.fulfill_reservation(
db, vendor_id, reserve_data
)
# Update item shipped quantity
item.shipped_quantity += quantity_to_fulfill
# Mark as fulfilled only if fully shipped
if item.is_fully_shipped:
item.inventory_fulfilled = True
# Log transaction
self._log_transaction(
db=db,
vendor_id=vendor_id,
product_id=item.product_id,
inventory=updated_inventory,
transaction_type=TransactionType.FULFILL,
quantity_change=-quantity_to_fulfill,
order=order,
reason=f"Partial shipment for order {order.order_number}",
)
logger.info(
f"Fulfilled {quantity_to_fulfill} of {item.quantity} units "
f"for item {item_id} in order {order.order_number}"
)
return {
"order_id": order_id,
"item_id": item_id,
"fulfilled_quantity": quantity_to_fulfill,
"shipped_quantity": item.shipped_quantity,
"remaining_quantity": item.remaining_quantity,
"is_fully_shipped": item.is_fully_shipped,
}
except (InsufficientInventoryException, InventoryNotFoundException) as e:
if skip_missing:
return {
"order_id": order_id,
"item_id": item_id,
"fulfilled_quantity": 0,
"message": str(e),
}
else:
raise
def release_order_reservation(
self,
db: Session,
vendor_id: int,
order_id: int,
skip_missing: bool = True,
) -> dict:
"""
Release reserved inventory when an order is cancelled.
This decreases the reserved quantity, making the stock available again.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
skip_missing: If True, skip items without inventory
Returns:
Dict with released count and any skipped items
"""
order = self.get_order_with_items(db, vendor_id, order_id)
released_count = 0
skipped_items = []
for item in order.items:
# Skip placeholder products
if self._is_placeholder_product(item):
skipped_items.append({
"item_id": item.id,
"reason": "placeholder_product",
})
continue
# Find inventory - look for any inventory for this product
inventory = (
db.query(Inventory)
.filter(
Inventory.product_id == item.product_id,
Inventory.vendor_id == vendor_id,
)
.first()
)
if not inventory:
if skip_missing:
skipped_items.append({
"item_id": item.id,
"product_id": item.product_id,
"reason": "no_inventory",
})
continue
else:
raise InventoryNotFoundException(
f"No inventory found for product {item.product_id}"
)
try:
reserve_data = InventoryReserve(
product_id=item.product_id,
location=inventory.location,
quantity=item.quantity,
)
updated_inventory = inventory_service.release_reservation(
db, vendor_id, reserve_data
)
released_count += 1
# Log transaction for audit trail
self._log_transaction(
db=db,
vendor_id=vendor_id,
product_id=item.product_id,
inventory=updated_inventory,
transaction_type=TransactionType.RELEASE,
quantity_change=0, # Release doesn't change quantity, only reserved_quantity
order=order,
reason=f"Released for cancelled order {order.order_number}",
)
logger.info(
f"Released {item.quantity} units of product {item.product_id} "
f"for cancelled order {order.order_number}"
)
except Exception as e:
if skip_missing:
skipped_items.append({
"item_id": item.id,
"product_id": item.product_id,
"reason": str(e),
})
else:
raise
logger.info(
f"Order {order.order_number}: released {released_count} items, "
f"skipped {len(skipped_items)}"
)
return {
"order_id": order_id,
"order_number": order.order_number,
"released_count": released_count,
"skipped_items": skipped_items,
}
def handle_status_change(
self,
db: Session,
vendor_id: int,
order_id: int,
old_status: str | None,
new_status: str,
) -> dict | None:
"""
Handle inventory operations based on order status changes.
Status transitions that trigger inventory operations:
- Any → processing: Reserve inventory (if not already reserved)
- processing → shipped: Fulfill inventory (deduct from stock)
- processing → partially_shipped: Partial fulfillment already done via fulfill_item
- Any → cancelled: Release reservations
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
old_status: Previous status (can be None for new orders)
new_status: New status
Returns:
Result of inventory operation, or None if no operation needed
"""
# Skip if status didn't change
if old_status == new_status:
return None
result = None
# Transitioning to processing - reserve inventory
if new_status == "processing":
result = self.reserve_for_order(db, vendor_id, order_id, skip_missing=True)
logger.info(f"Order {order_id} confirmed: inventory reserved")
# Transitioning to shipped - fulfill remaining inventory
elif new_status == "shipped":
result = self.fulfill_order(db, vendor_id, order_id, skip_missing=True)
logger.info(f"Order {order_id} shipped: inventory fulfilled")
# partially_shipped - no automatic fulfillment (handled via fulfill_item)
elif new_status == "partially_shipped":
logger.info(
f"Order {order_id} partially shipped: use fulfill_item for item-level fulfillment"
)
result = {"order_id": order_id, "status": "partially_shipped"}
# Transitioning to cancelled - release reservations
elif new_status == "cancelled":
# Only release if there was a previous status (order was in progress)
if old_status and old_status not in ("cancelled", "refunded"):
result = self.release_order_reservation(
db, vendor_id, order_id, skip_missing=True
)
logger.info(f"Order {order_id} cancelled: reservations released")
return result
def get_shipment_status(
self,
db: Session,
vendor_id: int,
order_id: int,
) -> dict:
"""
Get detailed shipment status for an order.
Returns item-level shipment status for partial shipment tracking.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
Returns:
Dict with shipment status details
"""
order = self.get_order_with_items(db, vendor_id, order_id)
items = []
for item in order.items:
items.append({
"item_id": item.id,
"product_id": item.product_id,
"product_name": item.product_name,
"quantity": item.quantity,
"shipped_quantity": item.shipped_quantity,
"remaining_quantity": item.remaining_quantity,
"is_fully_shipped": item.is_fully_shipped,
"is_partially_shipped": item.is_partially_shipped,
})
return {
"order_id": order_id,
"order_number": order.order_number,
"order_status": order.status,
"is_fully_shipped": order.is_fully_shipped,
"is_partially_shipped": order.is_partially_shipped,
"shipped_item_count": order.shipped_item_count,
"total_item_count": len(order.items),
"total_shipped_units": order.total_shipped_units,
"total_ordered_units": order.total_ordered_units,
"items": items,
}
# Create service instance
order_inventory_service = OrderInventoryService()