Files
orion/app/services/inventory_transaction_service.py
Samir Boulahtit 159243066c feat: add vendor API endpoints for inventory transaction history
Adds three new endpoints for viewing stock movement history:
- GET /inventory/transactions - paginated list with filters
- GET /inventory/transactions/product/{id} - product-specific history
- GET /inventory/transactions/order/{id} - order-specific history

Creates InventoryTransactionService to encapsulate query logic
following architecture patterns. Includes response schemas with
product details for rich UI display.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-01 18:20:12 +01:00

294 lines
9.5 KiB
Python

# app/services/inventory_transaction_service.py
"""
Inventory Transaction Service.
Provides query operations for inventory transaction history.
All transaction WRITES are handled by OrderInventoryService.
This service handles transaction READS for reporting and auditing.
"""
import logging
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.exceptions import OrderNotFoundException, ProductNotFoundException
from models.database.inventory import Inventory
from models.database.inventory_transaction import InventoryTransaction
from models.database.order import Order
from models.database.product import Product
logger = logging.getLogger(__name__)
class InventoryTransactionService:
"""Service for querying inventory transaction history."""
def get_vendor_transactions(
self,
db: Session,
vendor_id: int,
skip: int = 0,
limit: int = 50,
product_id: int | None = None,
transaction_type: str | None = None,
) -> tuple[list[dict], int]:
"""
Get inventory transactions for a vendor with optional filters.
Args:
db: Database session
vendor_id: Vendor ID
skip: Pagination offset
limit: Pagination limit
product_id: Optional product filter
transaction_type: Optional transaction type filter
Returns:
Tuple of (transactions with product details, total count)
"""
# Build query
query = db.query(InventoryTransaction).filter(
InventoryTransaction.vendor_id == vendor_id
)
# Apply filters
if product_id:
query = query.filter(InventoryTransaction.product_id == product_id)
if transaction_type:
query = query.filter(
InventoryTransaction.transaction_type == transaction_type
)
# Get total count
total = query.count()
# Get transactions with pagination (newest first)
transactions = (
query.order_by(InventoryTransaction.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
# Build result with product details
result = []
for tx in transactions:
product = db.query(Product).filter(Product.id == tx.product_id).first()
product_title = None
product_sku = None
if product:
product_sku = product.vendor_sku
if product.marketplace_product:
product_title = product.marketplace_product.get_title()
result.append(
{
"id": tx.id,
"vendor_id": tx.vendor_id,
"product_id": tx.product_id,
"inventory_id": tx.inventory_id,
"transaction_type": (
tx.transaction_type.value if tx.transaction_type else None
),
"quantity_change": tx.quantity_change,
"quantity_after": tx.quantity_after,
"reserved_after": tx.reserved_after,
"location": tx.location,
"warehouse": tx.warehouse,
"order_id": tx.order_id,
"order_number": tx.order_number,
"reason": tx.reason,
"created_by": tx.created_by,
"created_at": tx.created_at,
"product_title": product_title,
"product_sku": product_sku,
}
)
return result, total
def get_product_history(
self,
db: Session,
vendor_id: int,
product_id: int,
limit: int = 50,
) -> dict:
"""
Get transaction history for a specific product.
Args:
db: Database session
vendor_id: Vendor ID
product_id: Product ID
limit: Max transactions to return
Returns:
Dict with product info, current inventory, and transactions
Raises:
ProductNotFoundException: If product not found or doesn't belong to vendor
"""
# Get product details
product = (
db.query(Product)
.filter(Product.id == product_id, Product.vendor_id == vendor_id)
.first()
)
if not product:
raise ProductNotFoundException(
f"Product {product_id} not found in vendor catalog"
)
product_title = None
product_sku = product.vendor_sku
if product.marketplace_product:
product_title = product.marketplace_product.get_title()
# Get current inventory
inventory = (
db.query(Inventory)
.filter(Inventory.product_id == product_id, Inventory.vendor_id == vendor_id)
.first()
)
current_quantity = inventory.quantity if inventory else 0
current_reserved = inventory.reserved_quantity if inventory else 0
# Get transactions
transactions = (
db.query(InventoryTransaction)
.filter(
InventoryTransaction.vendor_id == vendor_id,
InventoryTransaction.product_id == product_id,
)
.order_by(InventoryTransaction.created_at.desc())
.limit(limit)
.all()
)
total = (
db.query(func.count(InventoryTransaction.id))
.filter(
InventoryTransaction.vendor_id == vendor_id,
InventoryTransaction.product_id == product_id,
)
.scalar()
or 0
)
return {
"product_id": product_id,
"product_title": product_title,
"product_sku": product_sku,
"current_quantity": current_quantity,
"current_reserved": current_reserved,
"transactions": [
{
"id": tx.id,
"vendor_id": tx.vendor_id,
"product_id": tx.product_id,
"inventory_id": tx.inventory_id,
"transaction_type": (
tx.transaction_type.value if tx.transaction_type else None
),
"quantity_change": tx.quantity_change,
"quantity_after": tx.quantity_after,
"reserved_after": tx.reserved_after,
"location": tx.location,
"warehouse": tx.warehouse,
"order_id": tx.order_id,
"order_number": tx.order_number,
"reason": tx.reason,
"created_by": tx.created_by,
"created_at": tx.created_at,
}
for tx in transactions
],
"total": total,
}
def get_order_history(
self,
db: Session,
vendor_id: int,
order_id: int,
) -> dict:
"""
Get all inventory transactions for a specific order.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
Returns:
Dict with order info and transactions
Raises:
OrderNotFoundException: If order not found or doesn't belong to vendor
"""
# Verify order belongs to vendor
order = (
db.query(Order)
.filter(Order.id == order_id, Order.vendor_id == vendor_id)
.first()
)
if not order:
raise OrderNotFoundException(f"Order {order_id} not found")
# Get transactions for this order
transactions = (
db.query(InventoryTransaction)
.filter(InventoryTransaction.order_id == order_id)
.order_by(InventoryTransaction.created_at.asc())
.all()
)
# Build result with product details
result = []
for tx in transactions:
product = db.query(Product).filter(Product.id == tx.product_id).first()
product_title = None
product_sku = None
if product:
product_sku = product.vendor_sku
if product.marketplace_product:
product_title = product.marketplace_product.get_title()
result.append(
{
"id": tx.id,
"vendor_id": tx.vendor_id,
"product_id": tx.product_id,
"inventory_id": tx.inventory_id,
"transaction_type": (
tx.transaction_type.value if tx.transaction_type else None
),
"quantity_change": tx.quantity_change,
"quantity_after": tx.quantity_after,
"reserved_after": tx.reserved_after,
"location": tx.location,
"warehouse": tx.warehouse,
"order_id": tx.order_id,
"order_number": tx.order_number,
"reason": tx.reason,
"created_by": tx.created_by,
"created_at": tx.created_at,
"product_title": product_title,
"product_sku": product_sku,
}
)
return {
"order_id": order_id,
"order_number": order.order_number,
"transactions": result,
}
# Create service instance
inventory_transaction_service = InventoryTransactionService()