Refactoring code for modular approach
This commit is contained in:
@@ -0,0 +1,235 @@
|
||||
from sqlalchemy.orm import Session
|
||||
from models.database_models import Stock, Product
|
||||
from models.api_models import StockCreate, StockAdd, StockUpdate, StockLocationResponse, StockSummaryResponse
|
||||
from utils.data_processing import GTINProcessor
|
||||
from typing import Optional, List, Tuple
|
||||
from datetime import datetime
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StockService:
|
||||
def __init__(self):
|
||||
self.gtin_processor = GTINProcessor()
|
||||
|
||||
def normalize_gtin(self, gtin_value) -> Optional[str]:
|
||||
"""Normalize GTIN format using the GTINProcessor"""
|
||||
return self.gtin_processor.normalize(gtin_value)
|
||||
|
||||
def set_stock(self, db: Session, stock_data: StockCreate) -> Stock:
|
||||
"""Set exact stock quantity for a GTIN at a specific location (replaces existing quantity)"""
|
||||
normalized_gtin = self.normalize_gtin(stock_data.gtin)
|
||||
if not normalized_gtin:
|
||||
raise ValueError("Invalid GTIN format")
|
||||
|
||||
location = stock_data.location.strip().upper()
|
||||
|
||||
# Check if stock entry already exists for this GTIN and location
|
||||
existing_stock = db.query(Stock).filter(
|
||||
Stock.gtin == normalized_gtin,
|
||||
Stock.location == location
|
||||
).first()
|
||||
|
||||
if existing_stock:
|
||||
# Update existing stock (SET to exact quantity)
|
||||
old_quantity = existing_stock.quantity
|
||||
existing_stock.quantity = stock_data.quantity
|
||||
existing_stock.updated_at = datetime.utcnow()
|
||||
db.commit()
|
||||
db.refresh(existing_stock)
|
||||
logger.info(
|
||||
f"Updated stock for GTIN {normalized_gtin} at {location}: {old_quantity} → {stock_data.quantity}")
|
||||
return existing_stock
|
||||
else:
|
||||
# Create new stock entry
|
||||
new_stock = Stock(
|
||||
gtin=normalized_gtin,
|
||||
location=location,
|
||||
quantity=stock_data.quantity
|
||||
)
|
||||
db.add(new_stock)
|
||||
db.commit()
|
||||
db.refresh(new_stock)
|
||||
logger.info(f"Created new stock for GTIN {normalized_gtin} at {location}: {stock_data.quantity}")
|
||||
return new_stock
|
||||
|
||||
def add_stock(self, db: Session, stock_data: StockAdd) -> Stock:
|
||||
"""Add quantity to existing stock for a GTIN at a specific location (adds to existing quantity)"""
|
||||
normalized_gtin = self.normalize_gtin(stock_data.gtin)
|
||||
if not normalized_gtin:
|
||||
raise ValueError("Invalid GTIN format")
|
||||
|
||||
location = stock_data.location.strip().upper()
|
||||
|
||||
# Check if stock entry already exists for this GTIN and location
|
||||
existing_stock = db.query(Stock).filter(
|
||||
Stock.gtin == normalized_gtin,
|
||||
Stock.location == location
|
||||
).first()
|
||||
|
||||
if existing_stock:
|
||||
# Add to existing stock
|
||||
old_quantity = existing_stock.quantity
|
||||
existing_stock.quantity += stock_data.quantity
|
||||
existing_stock.updated_at = datetime.utcnow()
|
||||
db.commit()
|
||||
db.refresh(existing_stock)
|
||||
logger.info(
|
||||
f"Added stock for GTIN {normalized_gtin} at {location}: {old_quantity} + {stock_data.quantity} = {existing_stock.quantity}")
|
||||
return existing_stock
|
||||
else:
|
||||
# Create new stock entry with the quantity
|
||||
new_stock = Stock(
|
||||
gtin=normalized_gtin,
|
||||
location=location,
|
||||
quantity=stock_data.quantity
|
||||
)
|
||||
db.add(new_stock)
|
||||
db.commit()
|
||||
db.refresh(new_stock)
|
||||
logger.info(f"Created new stock for GTIN {normalized_gtin} at {location}: {stock_data.quantity}")
|
||||
return new_stock
|
||||
|
||||
def remove_stock(self, db: Session, stock_data: StockAdd) -> Stock:
|
||||
"""Remove quantity from existing stock for a GTIN at a specific location"""
|
||||
normalized_gtin = self.normalize_gtin(stock_data.gtin)
|
||||
if not normalized_gtin:
|
||||
raise ValueError("Invalid GTIN format")
|
||||
|
||||
location = stock_data.location.strip().upper()
|
||||
|
||||
# Find existing stock entry
|
||||
existing_stock = db.query(Stock).filter(
|
||||
Stock.gtin == normalized_gtin,
|
||||
Stock.location == location
|
||||
).first()
|
||||
|
||||
if not existing_stock:
|
||||
raise ValueError(f"No stock found for GTIN {normalized_gtin} at location {location}")
|
||||
|
||||
# Check if we have enough stock to remove
|
||||
if existing_stock.quantity < stock_data.quantity:
|
||||
raise ValueError(
|
||||
f"Insufficient stock. Available: {existing_stock.quantity}, Requested to remove: {stock_data.quantity}")
|
||||
|
||||
# Remove from existing stock
|
||||
old_quantity = existing_stock.quantity
|
||||
existing_stock.quantity -= stock_data.quantity
|
||||
existing_stock.updated_at = datetime.utcnow()
|
||||
db.commit()
|
||||
db.refresh(existing_stock)
|
||||
logger.info(
|
||||
f"Removed stock for GTIN {normalized_gtin} at {location}: {old_quantity} - {stock_data.quantity} = {existing_stock.quantity}")
|
||||
return existing_stock
|
||||
|
||||
def get_stock_by_gtin(self, db: Session, gtin: str) -> StockSummaryResponse:
|
||||
"""Get all stock locations and total quantity for a specific GTIN"""
|
||||
normalized_gtin = self.normalize_gtin(gtin)
|
||||
if not normalized_gtin:
|
||||
raise ValueError("Invalid GTIN format")
|
||||
|
||||
# Get all stock entries for this GTIN
|
||||
stock_entries = db.query(Stock).filter(Stock.gtin == normalized_gtin).all()
|
||||
|
||||
if not stock_entries:
|
||||
raise ValueError(f"No stock found for GTIN: {gtin}")
|
||||
|
||||
# Calculate total quantity and build locations list
|
||||
total_quantity = 0
|
||||
locations = []
|
||||
|
||||
for entry in stock_entries:
|
||||
total_quantity += entry.quantity
|
||||
locations.append(StockLocationResponse(
|
||||
location=entry.location,
|
||||
quantity=entry.quantity
|
||||
))
|
||||
|
||||
# Try to get product title for reference
|
||||
product = db.query(Product).filter(Product.gtin == normalized_gtin).first()
|
||||
product_title = product.title if product else None
|
||||
|
||||
return StockSummaryResponse(
|
||||
gtin=normalized_gtin,
|
||||
total_quantity=total_quantity,
|
||||
locations=locations,
|
||||
product_title=product_title
|
||||
)
|
||||
|
||||
def get_total_stock(self, db: Session, gtin: str) -> dict:
|
||||
"""Get total quantity in stock for a specific GTIN"""
|
||||
normalized_gtin = self.normalize_gtin(gtin)
|
||||
if not normalized_gtin:
|
||||
raise ValueError("Invalid GTIN format")
|
||||
|
||||
# Calculate total stock
|
||||
total_stock = db.query(Stock).filter(Stock.gtin == normalized_gtin).all()
|
||||
total_quantity = sum(entry.quantity for entry in total_stock)
|
||||
|
||||
# Get product info for context
|
||||
product = db.query(Product).filter(Product.gtin == normalized_gtin).first()
|
||||
|
||||
return {
|
||||
"gtin": normalized_gtin,
|
||||
"total_quantity": total_quantity,
|
||||
"product_title": product.title if product else None,
|
||||
"locations_count": len(total_stock)
|
||||
}
|
||||
|
||||
def get_all_stock(
|
||||
self,
|
||||
db: Session,
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
location: Optional[str] = None,
|
||||
gtin: Optional[str] = None
|
||||
) -> List[Stock]:
|
||||
"""Get all stock entries with optional filtering"""
|
||||
query = db.query(Stock)
|
||||
|
||||
if location:
|
||||
query = query.filter(Stock.location.ilike(f"%{location}%"))
|
||||
|
||||
if gtin:
|
||||
normalized_gtin = self.normalize_gtin(gtin)
|
||||
if normalized_gtin:
|
||||
query = query.filter(Stock.gtin == normalized_gtin)
|
||||
|
||||
return query.offset(skip).limit(limit).all()
|
||||
|
||||
def update_stock(self, db: Session, stock_id: int, stock_update: StockUpdate) -> Stock:
|
||||
"""Update stock quantity for a specific stock entry"""
|
||||
stock_entry = db.query(Stock).filter(Stock.id == stock_id).first()
|
||||
if not stock_entry:
|
||||
raise ValueError("Stock entry not found")
|
||||
|
||||
stock_entry.quantity = stock_update.quantity
|
||||
stock_entry.updated_at = datetime.utcnow()
|
||||
db.commit()
|
||||
db.refresh(stock_entry)
|
||||
|
||||
logger.info(f"Updated stock entry {stock_id} to quantity {stock_update.quantity}")
|
||||
return stock_entry
|
||||
|
||||
def delete_stock(self, db: Session, stock_id: int) -> bool:
|
||||
"""Delete a stock entry"""
|
||||
stock_entry = db.query(Stock).filter(Stock.id == stock_id).first()
|
||||
if not stock_entry:
|
||||
raise ValueError("Stock entry not found")
|
||||
|
||||
gtin = stock_entry.gtin
|
||||
location = stock_entry.location
|
||||
db.delete(stock_entry)
|
||||
db.commit()
|
||||
|
||||
logger.info(f"Deleted stock entry {stock_id} for GTIN {gtin} at {location}")
|
||||
return True
|
||||
|
||||
def get_stock_by_id(self, db: Session, stock_id: int) -> Optional[Stock]:
|
||||
"""Get a stock entry by its ID"""
|
||||
return db.query(Stock).filter(Stock.id == stock_id).first()
|
||||
|
||||
|
||||
# Create service instance
|
||||
stock_service = StockService()
|
||||
|
||||
Reference in New Issue
Block a user