Files
orion/app/modules/inventory/services/inventory_service.py
Samir Boulahtit aad18c27ab
Some checks failed
CI / ruff (push) Successful in 11s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has started running
refactor: remove all backward compatibility code across 70 files
Clean up 28 backward compatibility instances identified in the codebase.
The app is not live, so all shims are replaced with the target architecture:

- Remove legacy Inventory.location column (use bin_location exclusively)
- Remove dashboard _extract_metric_value helper (use flat metrics dict)
- Remove legacy stat field duplicates (total_stores, total_imports, etc.)
- Remove 13 re-export shims and class aliases across modules
- Remove module-enabling JSON fallback (use PlatformModule junction table)
- Remove menu_to_legacy_format() conversion (return dataclasses directly)
- Remove title/description from MarketplaceProductBase schema
- Clean billing convenience method docstrings
- Clean test fixtures and backward-compat comments
- Add PlatformModule seeding to init_production.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 13:20:29 +01:00

975 lines
34 KiB
Python

# app/modules/inventory/services/inventory_service.py
import logging
from datetime import UTC, datetime
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.modules.catalog.exceptions import ProductNotFoundException
from app.modules.catalog.models import Product
from app.modules.inventory.exceptions import (
InsufficientInventoryException,
InvalidInventoryOperationException,
InvalidQuantityException,
InventoryNotFoundException,
InventoryValidationException,
)
from app.modules.inventory.models.inventory import Inventory
from app.modules.inventory.schemas.inventory import (
AdminInventoryItem,
AdminInventoryListResponse,
AdminInventoryLocationsResponse,
AdminInventoryStats,
AdminLowStockItem,
AdminStoresWithInventoryResponse,
AdminStoreWithInventory,
InventoryAdjust,
InventoryCreate,
InventoryLocationResponse,
InventoryReserve,
InventoryUpdate,
ProductInventorySummary,
)
from app.modules.tenancy.exceptions import StoreNotFoundException
from app.modules.tenancy.models import Store
logger = logging.getLogger(__name__)
class InventoryService:
"""Service for inventory operations with store isolation."""
def set_inventory(
self, db: Session, store_id: int, inventory_data: InventoryCreate
) -> Inventory:
"""
Set exact inventory quantity for a product at a location (replaces existing).
Args:
db: Database session
store_id: Store ID (from middleware)
inventory_data: Inventory data
Returns:
Inventory object
"""
try:
# Validate product belongs to store
product = self._get_store_product(db, store_id, inventory_data.product_id)
# Validate location
location = self._validate_location(inventory_data.location)
# Validate quantity
self._validate_quantity(inventory_data.quantity, allow_zero=True)
# Check if inventory entry exists
existing = self._get_inventory_entry(
db, inventory_data.product_id, location
)
if existing:
old_qty = existing.quantity
existing.quantity = inventory_data.quantity
existing.updated_at = datetime.now(UTC)
db.flush()
db.refresh(existing)
logger.info(
f"Set inventory for product {inventory_data.product_id} at {location}: "
f"{old_qty}{inventory_data.quantity}"
)
return existing
# Create new inventory entry
new_inventory = Inventory(
product_id=inventory_data.product_id,
store_id=store_id,
warehouse="strassen", # Default warehouse
bin_location=location,
quantity=inventory_data.quantity,
gtin=product.marketplace_product.gtin, # Optional reference
)
db.add(new_inventory)
db.flush()
db.refresh(new_inventory)
logger.info(
f"Created inventory for product {inventory_data.product_id} at {location}: "
f"{inventory_data.quantity}"
)
return new_inventory
except (
ProductNotFoundException,
InvalidQuantityException,
InventoryValidationException,
):
db.rollback()
raise
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error setting inventory: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to set inventory", operation="set_inventory"
)
def adjust_inventory(
self, db: Session, store_id: int, inventory_data: InventoryAdjust
) -> Inventory:
"""
Adjust inventory by adding or removing quantity.
Positive quantity = add, negative = remove.
Args:
db: Database session
store_id: Store ID
inventory_data: Adjustment data
Returns:
Updated Inventory object
"""
try:
# Validate product belongs to store
product = self._get_store_product(db, store_id, inventory_data.product_id)
# Validate location
location = self._validate_location(inventory_data.location)
# Check if inventory exists
existing = self._get_inventory_entry(
db, inventory_data.product_id, location
)
if not existing:
# Create new if adding, error if removing
if inventory_data.quantity < 0:
raise InventoryNotFoundException(
f"No inventory found for product {inventory_data.product_id} at {location}"
)
# Create with positive quantity
new_inventory = Inventory(
product_id=inventory_data.product_id,
store_id=store_id,
warehouse="strassen", # Default warehouse
bin_location=location,
quantity=inventory_data.quantity,
gtin=product.marketplace_product.gtin,
)
db.add(new_inventory)
db.flush()
db.refresh(new_inventory)
logger.info(
f"Created inventory for product {inventory_data.product_id} at {location}: "
f"+{inventory_data.quantity}"
)
return new_inventory
# Adjust existing inventory
old_qty = existing.quantity
new_qty = old_qty + inventory_data.quantity
# Validate resulting quantity
if new_qty < 0:
raise InsufficientInventoryException(
gtin=getattr(product.marketplace_product, "gtin", str(inventory_data.product_id)),
location=location,
requested=abs(inventory_data.quantity),
available=old_qty,
)
existing.quantity = new_qty
existing.updated_at = datetime.now(UTC)
db.flush()
db.refresh(existing)
logger.info(
f"Adjusted inventory for product {inventory_data.product_id} at {location}: "
f"{old_qty} {'+' if inventory_data.quantity >= 0 else ''}{inventory_data.quantity} = {new_qty}"
)
return existing
except (
ProductNotFoundException,
InventoryNotFoundException,
InsufficientInventoryException,
InventoryValidationException,
):
db.rollback()
raise
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error adjusting inventory: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to adjust inventory", operation="adjust_inventory"
)
def reserve_inventory(
self, db: Session, store_id: int, reserve_data: InventoryReserve
) -> Inventory:
"""
Reserve inventory for an order (increases reserved_quantity).
Args:
db: Database session
store_id: Store ID
reserve_data: Reservation data
Returns:
Updated Inventory object
"""
try:
# Validate product
self._get_store_product(db, store_id, reserve_data.product_id)
# Validate location and quantity
location = self._validate_location(reserve_data.location)
self._validate_quantity(reserve_data.quantity, allow_zero=False)
# Get inventory entry
inventory = self._get_inventory_entry(db, reserve_data.product_id, location)
if not inventory:
raise InventoryNotFoundException(
f"No inventory found for product {reserve_data.product_id} at {location}"
)
# Check available quantity
available = inventory.quantity - inventory.reserved_quantity
if available < reserve_data.quantity:
raise InsufficientInventoryException(
gtin=getattr(inventory, "gtin", str(reserve_data.product_id)),
location=location,
requested=reserve_data.quantity,
available=available,
)
# Reserve inventory
inventory.reserved_quantity += reserve_data.quantity
inventory.updated_at = datetime.now(UTC)
db.flush()
db.refresh(inventory)
logger.info(
f"Reserved {reserve_data.quantity} units for product {reserve_data.product_id} "
f"at {location}"
)
return inventory
except (
ProductNotFoundException,
InventoryNotFoundException,
InsufficientInventoryException,
InvalidQuantityException,
):
db.rollback()
raise
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error reserving inventory: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to reserve inventory", operation="reserve_inventory"
)
def release_reservation(
self, db: Session, store_id: int, reserve_data: InventoryReserve
) -> Inventory:
"""
Release reserved inventory (decreases reserved_quantity).
Args:
db: Database session
store_id: Store ID
reserve_data: Reservation data
Returns:
Updated Inventory object
"""
try:
# Validate product
self._get_store_product(db, store_id, reserve_data.product_id)
location = self._validate_location(reserve_data.location)
self._validate_quantity(reserve_data.quantity, allow_zero=False)
inventory = self._get_inventory_entry(db, reserve_data.product_id, location)
if not inventory:
raise InventoryNotFoundException(
f"No inventory found for product {reserve_data.product_id} at {location}"
)
# Validate reserved quantity
if inventory.reserved_quantity < reserve_data.quantity:
logger.warning(
f"Attempting to release more than reserved. Reserved: {inventory.reserved_quantity}, "
f"Requested: {reserve_data.quantity}"
)
inventory.reserved_quantity = 0
else:
inventory.reserved_quantity -= reserve_data.quantity
inventory.updated_at = datetime.now(UTC)
db.flush()
db.refresh(inventory)
logger.info(
f"Released {reserve_data.quantity} units for product {reserve_data.product_id} "
f"at {location}"
)
return inventory
except (
ProductNotFoundException,
InventoryNotFoundException,
InvalidQuantityException,
):
db.rollback()
raise
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error releasing reservation: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to release reservation", operation="release_reservation"
)
def fulfill_reservation(
self, db: Session, store_id: int, reserve_data: InventoryReserve
) -> Inventory:
"""
Fulfill a reservation (decreases both quantity and reserved_quantity).
Use when order is shipped/completed.
Args:
db: Database session
store_id: Store ID
reserve_data: Reservation data
Returns:
Updated Inventory object
"""
try:
self._get_store_product(db, store_id, reserve_data.product_id)
location = self._validate_location(reserve_data.location)
self._validate_quantity(reserve_data.quantity, allow_zero=False)
inventory = self._get_inventory_entry(db, reserve_data.product_id, location)
if not inventory:
raise InventoryNotFoundException(
f"No inventory found for product {reserve_data.product_id} at {location}"
)
# Validate quantities
if inventory.quantity < reserve_data.quantity:
raise InsufficientInventoryException(
gtin=getattr(inventory, "gtin", str(reserve_data.product_id)),
location=location,
requested=reserve_data.quantity,
available=inventory.quantity,
)
if inventory.reserved_quantity < reserve_data.quantity:
logger.warning(
f"Fulfilling more than reserved. Reserved: {inventory.reserved_quantity}, "
f"Fulfilling: {reserve_data.quantity}"
)
# Fulfill (remove from both quantity and reserved)
inventory.quantity -= reserve_data.quantity
inventory.reserved_quantity = max(
0, inventory.reserved_quantity - reserve_data.quantity
)
inventory.updated_at = datetime.now(UTC)
db.flush()
db.refresh(inventory)
logger.info(
f"Fulfilled {reserve_data.quantity} units for product {reserve_data.product_id} "
f"at {location}"
)
return inventory
except (
ProductNotFoundException,
InventoryNotFoundException,
InsufficientInventoryException,
InvalidQuantityException,
):
db.rollback()
raise
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error fulfilling reservation: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to fulfill reservation", operation="fulfill_reservation"
)
def get_product_inventory(
self, db: Session, store_id: int, product_id: int
) -> ProductInventorySummary:
"""
Get inventory summary for a product across all locations.
Args:
db: Database session
store_id: Store ID
product_id: Product ID
Returns:
ProductInventorySummary
"""
try:
product = self._get_store_product(db, store_id, product_id)
inventory_entries = (
db.query(Inventory).filter(Inventory.product_id == product_id).all()
)
if not inventory_entries:
return ProductInventorySummary(
product_id=product_id,
store_id=store_id,
product_sku=product.store_sku,
product_title=product.marketplace_product.get_title() or "",
total_quantity=0,
total_reserved=0,
total_available=0,
locations=[],
)
total_qty = sum(inv.quantity for inv in inventory_entries)
total_reserved = sum(inv.reserved_quantity for inv in inventory_entries)
total_available = sum(inv.available_quantity for inv in inventory_entries)
locations = [
InventoryLocationResponse(
location=inv.bin_location,
quantity=inv.quantity,
reserved_quantity=inv.reserved_quantity,
available_quantity=inv.available_quantity,
)
for inv in inventory_entries
]
return ProductInventorySummary(
product_id=product_id,
store_id=store_id,
product_sku=product.store_sku,
product_title=product.marketplace_product.get_title() or "",
total_quantity=total_qty,
total_reserved=total_reserved,
total_available=total_available,
locations=locations,
)
except ProductNotFoundException:
raise
except SQLAlchemyError as e:
logger.error(f"Error getting product inventory: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to retrieve product inventory",
operation="get_product_inventory",
)
def get_store_inventory(
self,
db: Session,
store_id: int,
skip: int = 0,
limit: int = 100,
location: str | None = None,
low_stock_threshold: int | None = None,
) -> list[Inventory]:
"""
Get all inventory for a store with filtering.
Args:
db: Database session
store_id: Store ID
skip: Pagination offset
limit: Pagination limit
location: Filter by location
low_stock_threshold: Filter items below threshold
Returns:
List of Inventory objects
"""
try:
query = db.query(Inventory).filter(Inventory.store_id == store_id)
if location:
query = query.filter(Inventory.bin_location.ilike(f"%{location}%"))
if low_stock_threshold is not None:
query = query.filter(Inventory.quantity <= low_stock_threshold)
return query.offset(skip).limit(limit).all()
except SQLAlchemyError as e:
logger.error(f"Error getting store inventory: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to retrieve store inventory",
operation="get_store_inventory",
)
def update_inventory(
self,
db: Session,
store_id: int,
inventory_id: int,
inventory_update: InventoryUpdate,
) -> Inventory:
"""Update inventory entry."""
try:
inventory = self._get_inventory_by_id(db, inventory_id)
# Verify ownership
if inventory.store_id != store_id:
raise InventoryNotFoundException(f"Inventory {inventory_id} not found")
# Update fields
if inventory_update.quantity is not None:
self._validate_quantity(inventory_update.quantity, allow_zero=True)
inventory.quantity = inventory_update.quantity
if inventory_update.reserved_quantity is not None:
self._validate_quantity(
inventory_update.reserved_quantity, allow_zero=True
)
inventory.reserved_quantity = inventory_update.reserved_quantity
if inventory_update.location:
inventory.bin_location = self._validate_location(inventory_update.location)
inventory.updated_at = datetime.now(UTC)
db.flush()
db.refresh(inventory)
logger.info(f"Updated inventory {inventory_id}")
return inventory
except (
InventoryNotFoundException,
InvalidQuantityException,
InventoryValidationException,
):
db.rollback()
raise
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error updating inventory: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to update inventory", operation="update_inventory"
)
def delete_inventory(self, db: Session, store_id: int, inventory_id: int) -> bool:
"""Delete inventory entry."""
try:
inventory = self._get_inventory_by_id(db, inventory_id)
# Verify ownership
if inventory.store_id != store_id:
raise InventoryNotFoundException(f"Inventory {inventory_id} not found")
db.delete(inventory)
db.flush()
logger.info(f"Deleted inventory {inventory_id}")
return True
except InventoryNotFoundException:
raise
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error deleting inventory: {str(e)}")
raise InvalidInventoryOperationException(
"Failed to delete inventory", operation="delete_inventory"
)
# =========================================================================
# Admin Methods (cross-store operations)
# =========================================================================
def get_all_inventory_admin(
self,
db: Session,
skip: int = 0,
limit: int = 50,
store_id: int | None = None,
location: str | None = None,
low_stock: int | None = None,
search: str | None = None,
) -> AdminInventoryListResponse:
"""
Get inventory across all stores with filtering (admin only).
Args:
db: Database session
skip: Pagination offset
limit: Pagination limit
store_id: Filter by store
location: Filter by location
low_stock: Filter items below threshold
search: Search by product title or SKU
Returns:
AdminInventoryListResponse
"""
query = db.query(Inventory).join(Product).join(Store)
# Apply filters
if store_id is not None:
query = query.filter(Inventory.store_id == store_id)
if location:
query = query.filter(Inventory.bin_location.ilike(f"%{location}%"))
if low_stock is not None:
query = query.filter(Inventory.quantity <= low_stock)
if search:
from app.modules.marketplace.models import ( # IMPORT-002
MarketplaceProduct,
MarketplaceProductTranslation,
)
query = (
query.join(MarketplaceProduct)
.outerjoin(MarketplaceProductTranslation)
.filter(
(MarketplaceProductTranslation.title.ilike(f"%{search}%"))
| (Product.store_sku.ilike(f"%{search}%"))
)
)
# Get total count before pagination
total = query.count()
# Apply pagination
inventories = query.offset(skip).limit(limit).all()
# Build response with store/product info
items = []
for inv in inventories:
product = inv.product
store = inv.store
title = None
if product and product.marketplace_product:
title = product.marketplace_product.get_title()
items.append(
AdminInventoryItem(
id=inv.id,
product_id=inv.product_id,
store_id=inv.store_id,
store_name=store.name if store else None,
store_code=store.store_code if store else None,
product_title=title,
product_sku=product.store_sku if product else None,
location=inv.bin_location,
quantity=inv.quantity,
reserved_quantity=inv.reserved_quantity,
available_quantity=inv.available_quantity,
gtin=inv.gtin,
created_at=inv.created_at,
updated_at=inv.updated_at,
)
)
return AdminInventoryListResponse(
inventories=items,
total=total,
skip=skip,
limit=limit,
store_filter=store_id,
location_filter=location,
)
def get_inventory_stats_admin(self, db: Session) -> AdminInventoryStats:
"""Get platform-wide inventory statistics (admin only)."""
# Total entries
total_entries = db.query(func.count(Inventory.id)).scalar() or 0
# Aggregate quantities
totals = db.query(
func.sum(Inventory.quantity).label("total_qty"),
func.sum(Inventory.reserved_quantity).label("total_reserved"),
).first()
total_quantity = totals.total_qty or 0
total_reserved = totals.total_reserved or 0
total_available = total_quantity - total_reserved
# Low stock count (default threshold: 10)
low_stock_count = (
db.query(func.count(Inventory.id))
.filter(Inventory.quantity <= 10)
.scalar()
or 0
)
# Stores with inventory
stores_with_inventory = (
db.query(func.count(func.distinct(Inventory.store_id))).scalar() or 0
)
# Unique locations
unique_locations = (
db.query(func.count(func.distinct(Inventory.bin_location))).scalar() or 0
)
return AdminInventoryStats(
total_entries=total_entries,
total_quantity=total_quantity,
total_reserved=total_reserved,
total_available=total_available,
low_stock_count=low_stock_count,
stores_with_inventory=stores_with_inventory,
unique_locations=unique_locations,
)
def get_low_stock_items_admin(
self,
db: Session,
threshold: int = 10,
store_id: int | None = None,
limit: int = 50,
) -> list[AdminLowStockItem]:
"""Get items with low stock levels (admin only)."""
query = (
db.query(Inventory)
.join(Product)
.join(Store)
.filter(Inventory.quantity <= threshold)
)
if store_id is not None:
query = query.filter(Inventory.store_id == store_id)
# Order by quantity ascending (most critical first)
query = query.order_by(Inventory.quantity.asc())
inventories = query.limit(limit).all()
items = []
for inv in inventories:
product = inv.product
store = inv.store
title = None
if product and product.marketplace_product:
title = product.marketplace_product.get_title()
items.append(
AdminLowStockItem(
id=inv.id,
product_id=inv.product_id,
store_id=inv.store_id,
store_name=store.name if store else None,
product_title=title,
location=inv.bin_location,
quantity=inv.quantity,
reserved_quantity=inv.reserved_quantity,
available_quantity=inv.available_quantity,
)
)
return items
def get_stores_with_inventory_admin(
self, db: Session
) -> AdminStoresWithInventoryResponse:
"""Get list of stores that have inventory entries (admin only)."""
# SVC-005 - Admin function, intentionally cross-store
# Use subquery to avoid DISTINCT on JSON columns (PostgreSQL can't compare JSON)
store_ids_subquery = (
db.query(Inventory.store_id)
.distinct()
.subquery()
)
stores = (
db.query(Store)
.filter(Store.id.in_(db.query(store_ids_subquery.c.store_id)))
.order_by(Store.name)
.all()
)
return AdminStoresWithInventoryResponse(
stores=[
AdminStoreWithInventory(
id=v.id, name=v.name, store_code=v.store_code
)
for v in stores
]
)
def get_inventory_locations_admin(
self, db: Session, store_id: int | None = None
) -> AdminInventoryLocationsResponse:
"""Get list of unique inventory locations (admin only)."""
query = db.query(func.distinct(Inventory.bin_location))
if store_id is not None:
query = query.filter(Inventory.store_id == store_id)
locations = [loc[0] for loc in query.all()]
return AdminInventoryLocationsResponse(locations=sorted(locations))
def get_store_inventory_admin(
self,
db: Session,
store_id: int,
skip: int = 0,
limit: int = 50,
location: str | None = None,
low_stock: int | None = None,
) -> AdminInventoryListResponse:
"""Get inventory for a specific store (admin only)."""
# Verify store exists
store = db.query(Store).filter(Store.id == store_id).first()
if not store:
raise StoreNotFoundException(f"Store {store_id} not found")
# Use the existing method
inventories = self.get_store_inventory(
db=db,
store_id=store_id,
skip=skip,
limit=limit,
location=location,
low_stock_threshold=low_stock,
)
# Build response with product info
items = []
for inv in inventories:
product = inv.product
title = None
if product and product.marketplace_product:
title = product.marketplace_product.get_title()
items.append(
AdminInventoryItem(
id=inv.id,
product_id=inv.product_id,
store_id=inv.store_id,
store_name=store.name,
store_code=store.store_code,
product_title=title,
product_sku=product.store_sku if product else None,
location=inv.bin_location,
quantity=inv.quantity,
reserved_quantity=inv.reserved_quantity,
available_quantity=inv.available_quantity,
gtin=inv.gtin,
created_at=inv.created_at,
updated_at=inv.updated_at,
)
)
# Get total count for pagination
total_query = db.query(func.count(Inventory.id)).filter(
Inventory.store_id == store_id
)
if location:
total_query = total_query.filter(Inventory.bin_location.ilike(f"%{location}%"))
if low_stock is not None:
total_query = total_query.filter(Inventory.quantity <= low_stock)
total = total_query.scalar() or 0
return AdminInventoryListResponse(
inventories=items,
total=total,
skip=skip,
limit=limit,
store_filter=store_id,
location_filter=location,
)
def get_product_inventory_admin(
self, db: Session, product_id: int
) -> ProductInventorySummary:
"""Get inventory summary for a product (admin only - no store check)."""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise ProductNotFoundException(f"Product {product_id} not found")
# Use existing method with the product's store_id
return self.get_product_inventory(db, product.store_id, product_id)
def verify_store_exists(self, db: Session, store_id: int) -> Store:
"""Verify store exists and return it."""
store = db.query(Store).filter(Store.id == store_id).first()
if not store:
raise StoreNotFoundException(f"Store {store_id} not found")
return store
def get_inventory_by_id_admin(self, db: Session, inventory_id: int) -> Inventory:
"""Get inventory by ID (admin only - returns inventory with store_id)."""
inventory = db.query(Inventory).filter(Inventory.id == inventory_id).first()
if not inventory:
raise InventoryNotFoundException(f"Inventory {inventory_id} not found")
return inventory
# =========================================================================
# Private helper methods
# =========================================================================
def _get_store_product(
self, db: Session, store_id: int, product_id: int
) -> Product:
"""Get product and verify it belongs to store."""
product = (
db.query(Product)
.filter(Product.id == product_id, Product.store_id == store_id)
.first()
)
if not product:
raise ProductNotFoundException(
f"Product {product_id} not found in your catalog"
)
return product
def _get_inventory_entry(
self, db: Session, product_id: int, location: str
) -> Inventory | None:
"""Get inventory entry by product and location."""
return (
db.query(Inventory)
.filter(Inventory.product_id == product_id, Inventory.bin_location == location)
.first()
)
def _get_inventory_by_id(self, db: Session, inventory_id: int) -> Inventory:
"""Get inventory by ID or raise exception."""
inventory = db.query(Inventory).filter(Inventory.id == inventory_id).first()
if not inventory:
raise InventoryNotFoundException(f"Inventory {inventory_id} not found")
return inventory
def _validate_location(self, location: str) -> str:
"""Validate and normalize location."""
if not location or not location.strip():
raise InventoryValidationException("Location is required")
return location.strip().upper()
def _validate_quantity(self, quantity: int, allow_zero: bool = True) -> None:
"""Validate quantity value."""
if quantity is None:
raise InvalidQuantityException("Quantity is required")
if not isinstance(quantity, int):
raise InvalidQuantityException("Quantity must be an integer")
if quantity < 0:
raise InvalidQuantityException("Quantity cannot be negative")
if not allow_zero and quantity == 0:
raise InvalidQuantityException("Quantity must be positive")
# Create service instance
inventory_service = InventoryService()