Files
orion/app/modules/inventory/routes/api/admin.py
Samir Boulahtit aad18c27ab
Some checks failed
CI / ruff (push) Successful in 11s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has started running
refactor: remove all backward compatibility code across 70 files
Clean up 28 backward compatibility instances identified in the codebase.
The app is not live, so all shims are replaced with the target architecture:

- Remove legacy Inventory.location column (use bin_location exclusively)
- Remove dashboard _extract_metric_value helper (use flat metrics dict)
- Remove legacy stat field duplicates (total_stores, total_imports, etc.)
- Remove 13 re-export shims and class aliases across modules
- Remove module-enabling JSON fallback (use PlatformModule junction table)
- Remove menu_to_legacy_format() conversion (return dataclasses directly)
- Remove title/description from MarketplaceProductBase schema
- Clean billing convenience method docstrings
- Clean test fixtures and backward-compat comments
- Add PlatformModule seeding to init_production.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 13:20:29 +01:00

442 lines
14 KiB
Python

# app/modules/inventory/routes/api/admin.py
"""
Admin inventory management endpoints.
Provides inventory management capabilities for administrators:
- View inventory across all stores
- View store-specific inventory
- Set/adjust inventory on behalf of stores
- Low stock alerts and reporting
Admin Context: Uses admin JWT authentication.
Store selection is passed as a request parameter.
"""
import logging
from fastapi import APIRouter, Depends, File, Form, Query, UploadFile
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api, require_module_access
from app.core.database import get_db
from app.modules.enums import FrontendType
from app.modules.inventory.schemas import (
AdminInventoryAdjust,
AdminInventoryCreate,
AdminInventoryListResponse,
AdminInventoryLocationsResponse,
AdminInventoryStats,
AdminInventoryTransactionItem,
AdminInventoryTransactionListResponse,
AdminLowStockItem,
AdminStoresWithInventoryResponse,
AdminTransactionStatsResponse,
InventoryAdjust,
InventoryCreate,
InventoryMessageResponse,
InventoryResponse,
InventoryUpdate,
ProductInventorySummary,
)
from app.modules.inventory.services.inventory_import_service import (
inventory_import_service,
)
from app.modules.inventory.services.inventory_service import inventory_service
from app.modules.inventory.services.inventory_transaction_service import (
inventory_transaction_service,
)
from models.schema.auth import UserContext
admin_router = APIRouter(
prefix="/inventory",
dependencies=[Depends(require_module_access("inventory", FrontendType.ADMIN))],
)
logger = logging.getLogger(__name__)
# ============================================================================
# List & Statistics Endpoints
# ============================================================================
@admin_router.get("", response_model=AdminInventoryListResponse)
def get_all_inventory(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=500),
store_id: int | None = Query(None, description="Filter by store"),
location: str | None = Query(None, description="Filter by location"),
low_stock: int | None = Query(None, ge=0, description="Filter items below threshold"),
search: str | None = Query(None, description="Search by product title or SKU"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Get inventory across all stores with filtering.
Allows admins to view and filter inventory across the platform.
"""
return inventory_service.get_all_inventory_admin(
db=db,
skip=skip,
limit=limit,
store_id=store_id,
location=location,
low_stock=low_stock,
search=search,
)
@admin_router.get("/stats", response_model=AdminInventoryStats)
def get_inventory_stats(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get platform-wide inventory statistics."""
return inventory_service.get_inventory_stats_admin(db)
@admin_router.get("/low-stock", response_model=list[AdminLowStockItem])
def get_low_stock_items(
threshold: int = Query(10, ge=0, description="Stock threshold"),
store_id: int | None = Query(None, description="Filter by store"),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get items with low stock levels."""
return inventory_service.get_low_stock_items_admin(
db=db,
threshold=threshold,
store_id=store_id,
limit=limit,
)
@admin_router.get("/stores", response_model=AdminStoresWithInventoryResponse)
def get_stores_with_inventory(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get list of stores that have inventory entries."""
return inventory_service.get_stores_with_inventory_admin(db)
@admin_router.get("/locations", response_model=AdminInventoryLocationsResponse)
def get_inventory_locations(
store_id: int | None = Query(None, description="Filter by store"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get list of unique inventory locations."""
return inventory_service.get_inventory_locations_admin(db, store_id)
# ============================================================================
# Store-Specific Endpoints
# ============================================================================
@admin_router.get("/stores/{store_id}", response_model=AdminInventoryListResponse)
def get_store_inventory(
store_id: int,
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=500),
location: str | None = Query(None, description="Filter by location"),
low_stock: int | None = Query(None, ge=0, description="Filter items below threshold"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get inventory for a specific store."""
return inventory_service.get_store_inventory_admin(
db=db,
store_id=store_id,
skip=skip,
limit=limit,
location=location,
low_stock=low_stock,
)
@admin_router.get("/products/{product_id}", response_model=ProductInventorySummary)
def get_product_inventory(
product_id: int,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get inventory summary for a specific product across all locations."""
return inventory_service.get_product_inventory_admin(db, product_id)
# ============================================================================
# Inventory Modification Endpoints
# ============================================================================
@admin_router.post("/set", response_model=InventoryResponse)
def set_inventory(
inventory_data: AdminInventoryCreate,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Set exact inventory quantity for a product at a location.
Admin version - requires explicit store_id in request body.
"""
# Verify store exists
inventory_service.verify_store_exists(db, inventory_data.store_id)
# Convert to standard schema for service
service_data = InventoryCreate(
product_id=inventory_data.product_id,
location=inventory_data.location,
quantity=inventory_data.quantity,
)
result = inventory_service.set_inventory(
db=db,
store_id=inventory_data.store_id,
inventory_data=service_data,
)
logger.info(
f"Admin {current_admin.email} set inventory for product {inventory_data.product_id} "
f"at {inventory_data.location}: {inventory_data.quantity} units"
)
db.commit()
return result
@admin_router.post("/adjust", response_model=InventoryResponse)
def adjust_inventory(
adjustment: AdminInventoryAdjust,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Adjust inventory by adding or removing quantity.
Positive quantity = add stock, negative = remove stock.
Admin version - requires explicit store_id in request body.
"""
# Verify store exists
inventory_service.verify_store_exists(db, adjustment.store_id)
# Convert to standard schema for service
service_data = InventoryAdjust(
product_id=adjustment.product_id,
location=adjustment.location,
quantity=adjustment.quantity,
)
result = inventory_service.adjust_inventory(
db=db,
store_id=adjustment.store_id,
inventory_data=service_data,
)
sign = "+" if adjustment.quantity >= 0 else ""
logger.info(
f"Admin {current_admin.email} adjusted inventory for product {adjustment.product_id} "
f"at {adjustment.location}: {sign}{adjustment.quantity} units"
f"{f' (reason: {adjustment.reason})' if adjustment.reason else ''}"
)
db.commit()
return result
@admin_router.put("/{inventory_id}", response_model=InventoryResponse)
def update_inventory(
inventory_id: int,
inventory_update: InventoryUpdate,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Update inventory entry fields."""
# Get inventory to find store_id
inventory = inventory_service.get_inventory_by_id_admin(db, inventory_id)
result = inventory_service.update_inventory(
db=db,
store_id=inventory.store_id,
inventory_id=inventory_id,
inventory_update=inventory_update,
)
logger.info(f"Admin {current_admin.email} updated inventory {inventory_id}")
db.commit()
return result
@admin_router.delete("/{inventory_id}", response_model=InventoryMessageResponse)
def delete_inventory(
inventory_id: int,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Delete inventory entry."""
# Get inventory to find store_id and log details
inventory = inventory_service.get_inventory_by_id_admin(db, inventory_id)
store_id = inventory.store_id
product_id = inventory.product_id
location = inventory.bin_location
inventory_service.delete_inventory(
db=db,
store_id=store_id,
inventory_id=inventory_id,
)
logger.info(
f"Admin {current_admin.email} deleted inventory {inventory_id} "
f"(product {product_id} at {location})"
)
db.commit()
return InventoryMessageResponse(message="Inventory deleted successfully")
# ============================================================================
# Import Endpoints
# ============================================================================
class UnmatchedGtin(BaseModel):
"""GTIN that couldn't be matched to a product."""
gtin: str
quantity: int
product_name: str
class InventoryImportResponse(BaseModel):
"""Response from inventory import."""
success: bool
total_rows: int
entries_created: int
entries_updated: int
quantity_imported: int
unmatched_gtins: list[UnmatchedGtin]
errors: list[str]
@admin_router.post("/import", response_model=InventoryImportResponse)
async def import_inventory(
file: UploadFile = File(..., description="TSV/CSV file with BIN, EAN, PRODUCT, QUANTITY columns"),
store_id: int = Form(..., description="Store ID"),
warehouse: str = Form("strassen", description="Warehouse name"),
clear_existing: bool = Form(False, description="Clear existing inventory before import"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Import inventory from a TSV/CSV file.
File format (TSV recommended):
- Required columns: BIN, EAN
- Optional columns: PRODUCT (for display), QUANTITY (defaults to 1 per row)
If QUANTITY column is present, each row represents the quantity specified.
If QUANTITY is absent, each row counts as 1 unit (rows with same EAN+BIN are summed).
Products are matched by GTIN/EAN. Unmatched GTINs are reported in the response.
"""
# Verify store exists
inventory_service.verify_store_exists(db, store_id)
# Read file content
content = await file.read()
try:
content_str = content.decode("utf-8")
except UnicodeDecodeError:
content_str = content.decode("latin-1")
# Detect delimiter
first_line = content_str.split("\n")[0] if content_str else ""
delimiter = "\t" if "\t" in first_line else ","
# Run import
result = inventory_import_service.import_from_text(
db=db,
content=content_str,
store_id=store_id,
warehouse=warehouse,
delimiter=delimiter,
clear_existing=clear_existing,
)
if result.success:
db.commit()
logger.info(
f"Admin {current_admin.email} imported inventory: "
f"{result.entries_created} created, {result.entries_updated} updated, "
f"{result.quantity_imported} total units"
)
else:
db.rollback()
logger.error(f"Inventory import failed: {result.errors}")
return InventoryImportResponse(
success=result.success,
total_rows=result.total_rows,
entries_created=result.entries_created,
entries_updated=result.entries_updated,
quantity_imported=result.quantity_imported,
unmatched_gtins=[UnmatchedGtin(**g) for g in result.unmatched_gtins],
errors=result.errors,
)
# ============================================================================
# Transaction History Endpoints
# ============================================================================
@admin_router.get("/transactions", response_model=AdminInventoryTransactionListResponse)
def get_all_transactions(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
store_id: int | None = Query(None, description="Filter by store"),
product_id: int | None = Query(None, description="Filter by product"),
transaction_type: str | None = Query(None, description="Filter by type"),
order_id: int | None = Query(None, description="Filter by order"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Get inventory transaction history across all stores.
Returns a paginated list of all stock movements with store and product details.
"""
transactions, total = inventory_transaction_service.get_all_transactions_admin(
db=db,
skip=skip,
limit=limit,
store_id=store_id,
product_id=product_id,
transaction_type=transaction_type,
order_id=order_id,
)
return AdminInventoryTransactionListResponse(
transactions=[AdminInventoryTransactionItem(**tx) for tx in transactions],
total=total,
skip=skip,
limit=limit,
)
@admin_router.get("/transactions/stats", response_model=AdminTransactionStatsResponse)
def get_transaction_stats(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get transaction statistics for the platform."""
stats = inventory_transaction_service.get_transaction_stats_admin(db)
return AdminTransactionStatsResponse(**stats)