Files
orion/app/services/stock_service.py
2025-09-21 13:00:10 +02:00

266 lines
9.4 KiB
Python

# app/services/stock_service.py
"""Summary description ....
This module provides classes and functions for:
- ....
- ....
- ....
"""
import logging
from datetime import datetime
from typing import List, Optional
from sqlalchemy.orm import Session
from models.api.stock import (StockAdd, StockCreate, StockLocationResponse,
StockSummaryResponse, StockUpdate)
from models.database.product import Product
from models.database.stock import Stock
from utils.data_processing import GTINProcessor
logger = logging.getLogger(__name__)
class StockService:
"""Service class for stock operations following the application's service pattern."""
def __init__(self):
"""Class constructor."""
self.gtin_processor = GTINProcessor()
def normalize_gtin(self, gtin_value) -> Optional[str]:
"""Normalize GTIN format using the GTINProcessor."""
return self.gtin_processor.normalize(gtin_value)
def set_stock(self, db: Session, stock_data: StockCreate) -> Stock:
"""Set exact stock quantity for a GTIN at a specific location (replaces existing quantity)."""
normalized_gtin = self.normalize_gtin(stock_data.gtin)
if not normalized_gtin:
raise ValueError("Invalid GTIN format")
location = stock_data.location.strip().upper()
# Check if stock entry already exists for this GTIN and location
existing_stock = (
db.query(Stock)
.filter(Stock.gtin == normalized_gtin, Stock.location == location)
.first()
)
if existing_stock:
# Update existing stock (SET to exact quantity)
old_quantity = existing_stock.quantity
existing_stock.quantity = stock_data.quantity
existing_stock.updated_at = datetime.utcnow()
db.commit()
db.refresh(existing_stock)
logger.info(
f"Updated stock for GTIN {normalized_gtin} at {location}: {old_quantity}{stock_data.quantity}"
)
return existing_stock
else:
# Create new stock entry
new_stock = Stock(
gtin=normalized_gtin, location=location, quantity=stock_data.quantity
)
db.add(new_stock)
db.commit()
db.refresh(new_stock)
logger.info(
f"Created new stock for GTIN {normalized_gtin} at {location}: {stock_data.quantity}"
)
return new_stock
def add_stock(self, db: Session, stock_data: StockAdd) -> Stock:
"""Add quantity to existing stock for a GTIN at a specific location (adds to existing quantity)."""
normalized_gtin = self.normalize_gtin(stock_data.gtin)
if not normalized_gtin:
raise ValueError("Invalid GTIN format")
location = stock_data.location.strip().upper()
# Check if stock entry already exists for this GTIN and location
existing_stock = (
db.query(Stock)
.filter(Stock.gtin == normalized_gtin, Stock.location == location)
.first()
)
if existing_stock:
# Add to existing stock
old_quantity = existing_stock.quantity
existing_stock.quantity += stock_data.quantity
existing_stock.updated_at = datetime.utcnow()
db.commit()
db.refresh(existing_stock)
logger.info(
f"Added stock for GTIN {normalized_gtin} at {location}: "
f"{old_quantity} + {stock_data.quantity} = {existing_stock.quantity}"
)
return existing_stock
else:
# Create new stock entry with the quantity
new_stock = Stock(
gtin=normalized_gtin, location=location, quantity=stock_data.quantity
)
db.add(new_stock)
db.commit()
db.refresh(new_stock)
logger.info(
f"Created new stock for GTIN {normalized_gtin} at {location}: {stock_data.quantity}"
)
return new_stock
def remove_stock(self, db: Session, stock_data: StockAdd) -> Stock:
"""Remove quantity from existing stock for a GTIN at a specific location."""
normalized_gtin = self.normalize_gtin(stock_data.gtin)
if not normalized_gtin:
raise ValueError("Invalid GTIN format")
location = stock_data.location.strip().upper()
# Find existing stock entry
existing_stock = (
db.query(Stock)
.filter(Stock.gtin == normalized_gtin, Stock.location == location)
.first()
)
if not existing_stock:
raise ValueError(
f"No stock found for GTIN {normalized_gtin} at location {location}"
)
# Check if we have enough stock to remove
if existing_stock.quantity < stock_data.quantity:
raise ValueError(
f"Insufficient stock. Available: {existing_stock.quantity}, Requested to remove: {stock_data.quantity}"
)
# Remove from existing stock
old_quantity = existing_stock.quantity
existing_stock.quantity -= stock_data.quantity
existing_stock.updated_at = datetime.utcnow()
db.commit()
db.refresh(existing_stock)
logger.info(
f"Removed stock for GTIN {normalized_gtin} at {location}: "
f"{old_quantity} - {stock_data.quantity} = {existing_stock.quantity}"
)
return existing_stock
def get_stock_by_gtin(self, db: Session, gtin: str) -> StockSummaryResponse:
"""Get all stock locations and total quantity for a specific GTIN."""
normalized_gtin = self.normalize_gtin(gtin)
if not normalized_gtin:
raise ValueError("Invalid GTIN format")
# Get all stock entries for this GTIN
stock_entries = db.query(Stock).filter(Stock.gtin == normalized_gtin).all()
if not stock_entries:
raise ValueError(f"No stock found for GTIN: {gtin}")
# Calculate total quantity and build locations list
total_quantity = 0
locations = []
for entry in stock_entries:
total_quantity += entry.quantity
locations.append(
StockLocationResponse(location=entry.location, quantity=entry.quantity)
)
# Try to get product title for reference
product = db.query(Product).filter(Product.gtin == normalized_gtin).first()
product_title = product.title if product else None
return StockSummaryResponse(
gtin=normalized_gtin,
total_quantity=total_quantity,
locations=locations,
product_title=product_title,
)
def get_total_stock(self, db: Session, gtin: str) -> dict:
"""Get total quantity in stock for a specific GTIN."""
normalized_gtin = self.normalize_gtin(gtin)
if not normalized_gtin:
raise ValueError("Invalid GTIN format")
# Calculate total stock
total_stock = db.query(Stock).filter(Stock.gtin == normalized_gtin).all()
total_quantity = sum(entry.quantity for entry in total_stock)
# Get product info for context
product = db.query(Product).filter(Product.gtin == normalized_gtin).first()
return {
"gtin": normalized_gtin,
"total_quantity": total_quantity,
"product_title": product.title if product else None,
"locations_count": len(total_stock),
}
def get_all_stock(
self,
db: Session,
skip: int = 0,
limit: int = 100,
location: Optional[str] = None,
gtin: Optional[str] = None,
) -> List[Stock]:
"""Get all stock entries with optional filtering."""
query = db.query(Stock)
if location:
query = query.filter(Stock.location.ilike(f"%{location}%"))
if gtin:
normalized_gtin = self.normalize_gtin(gtin)
if normalized_gtin:
query = query.filter(Stock.gtin == normalized_gtin)
return query.offset(skip).limit(limit).all()
def update_stock(
self, db: Session, stock_id: int, stock_update: StockUpdate
) -> Stock:
"""Update stock quantity for a specific stock entry."""
stock_entry = db.query(Stock).filter(Stock.id == stock_id).first()
if not stock_entry:
raise ValueError("Stock entry not found")
stock_entry.quantity = stock_update.quantity
stock_entry.updated_at = datetime.utcnow()
db.commit()
db.refresh(stock_entry)
logger.info(
f"Updated stock entry {stock_id} to quantity {stock_update.quantity}"
)
return stock_entry
def delete_stock(self, db: Session, stock_id: int) -> bool:
"""Delete a stock entry."""
stock_entry = db.query(Stock).filter(Stock.id == stock_id).first()
if not stock_entry:
raise ValueError("Stock entry not found")
gtin = stock_entry.gtin
location = stock_entry.location
db.delete(stock_entry)
db.commit()
logger.info(f"Deleted stock entry {stock_id} for GTIN {gtin} at {location}")
return True
def get_stock_by_id(self, db: Session, stock_id: int) -> Optional[Stock]:
"""Get a stock entry by its ID."""
return db.query(Stock).filter(Stock.id == stock_id).first()
# Create service instance
stock_service = StockService()