Exception handling enhancement

This commit is contained in:
2025-09-23 22:42:26 +02:00
parent b1a76cdb57
commit 98285aa8aa
35 changed files with 3283 additions and 1743 deletions

View File

@@ -1,19 +1,28 @@
# app/services/product_service.py
"""Summary description ....
"""
Product service for managing product operations and data processing.
This module provides classes and functions for:
- ....
- ....
- ....
- Product CRUD operations with validation
- Advanced product filtering and search
- Stock information integration
- CSV export functionality
"""
import logging
from datetime import datetime
from typing import Generator, List, Optional
from typing import Generator, List, Optional, Tuple
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.exceptions import (
ProductNotFoundException,
ProductAlreadyExistsException,
InvalidProductDataException,
ProductValidationException,
ValidationException,
)
from models.schemas.product import ProductCreate, ProductUpdate
from models.schemas.stock import StockLocationResponse, StockSummaryResponse
from models.database.product import Product
@@ -32,28 +41,52 @@ class ProductService:
self.price_processor = PriceProcessor()
def create_product(self, db: Session, product_data: ProductCreate) -> Product:
"""Create a new product with validation."""
"""
Create a new product with validation.
Args:
db: Database session
product_data: Product creation data
Returns:
Created Product object
Raises:
ProductAlreadyExistsException: If product with ID already exists
InvalidProductDataException: If product data is invalid
ProductValidationException: If validation fails
"""
try:
# Process and validate GTIN if provided
if product_data.gtin:
normalized_gtin = self.gtin_processor.normalize(product_data.gtin)
if not normalized_gtin:
raise ValueError("Invalid GTIN format")
raise InvalidProductDataException("Invalid GTIN format", field="gtin")
product_data.gtin = normalized_gtin
# Process price if provided
if product_data.price:
parsed_price, currency = self.price_processor.parse_price_currency(
product_data.price
)
if parsed_price:
product_data.price = parsed_price
product_data.currency = currency
try:
parsed_price, currency = self.price_processor.parse_price_currency(
product_data.price
)
if parsed_price:
product_data.price = parsed_price
product_data.currency = currency
except Exception as e:
raise InvalidProductDataException(f"Invalid price format: {str(e)}", field="price")
# Set default marketplace if not provided
if not product_data.marketplace:
product_data.marketplace = "Letzshop"
# Validate required fields
if not product_data.product_id or not product_data.product_id.strip():
raise ProductValidationException("Product ID is required", field="product_id")
if not product_data.title or not product_data.title.strip():
raise ProductValidationException("Product title is required", field="title")
db_product = Product(**product_data.model_dump())
db.add(db_product)
db.commit()
@@ -62,176 +95,327 @@ class ProductService:
logger.info(f"Created product {db_product.product_id}")
return db_product
except (InvalidProductDataException, ProductValidationException):
db.rollback()
raise # Re-raise custom exceptions
except IntegrityError as e:
db.rollback()
logger.error(f"Database integrity error: {str(e)}")
raise ValueError("Product with this ID already exists")
if "product_id" in str(e).lower() or "unique" in str(e).lower():
raise ProductAlreadyExistsException(product_data.product_id)
else:
raise ProductValidationException("Data integrity constraint violation")
except Exception as e:
db.rollback()
logger.error(f"Error creating product: {str(e)}")
raise
raise ValidationException("Failed to create product")
def get_product_by_id(self, db: Session, product_id: str) -> Optional[Product]:
"""Get a product by its ID."""
return db.query(Product).filter(Product.product_id == product_id).first()
def get_products_with_filters(
self,
db: Session,
skip: int = 0,
limit: int = 100,
brand: Optional[str] = None,
category: Optional[str] = None,
availability: Optional[str] = None,
marketplace: Optional[str] = None,
shop_name: Optional[str] = None,
search: Optional[str] = None,
) -> tuple[List[Product], int]:
"""Get products with filtering and pagination."""
query = db.query(Product)
# Apply filters
if brand:
query = query.filter(Product.brand.ilike(f"%{brand}%"))
if category:
query = query.filter(Product.google_product_category.ilike(f"%{category}%"))
if availability:
query = query.filter(Product.availability == availability)
if marketplace:
query = query.filter(Product.marketplace.ilike(f"%{marketplace}%"))
if shop_name:
query = query.filter(Product.shop_name.ilike(f"%{shop_name}%"))
if search:
# Search in title, description, marketplace, and shop_name
search_term = f"%{search}%"
query = query.filter(
(Product.title.ilike(search_term))
| (Product.description.ilike(search_term))
| (Product.marketplace.ilike(search_term))
| (Product.shop_name.ilike(search_term))
)
total = query.count()
products = query.offset(skip).limit(limit).all()
return products, total
def update_product(
self, db: Session, product_id: str, product_update: ProductUpdate
) -> Product:
"""Update product with validation."""
product = db.query(Product).filter(Product.product_id == product_id).first()
if not product:
raise ValueError("Product not found")
# Update fields
update_data = product_update.model_dump(exclude_unset=True)
# Validate GTIN if being updated
if "gtin" in update_data and update_data["gtin"]:
normalized_gtin = self.gtin_processor.normalize(update_data["gtin"])
if not normalized_gtin:
raise ValueError("Invalid GTIN format")
update_data["gtin"] = normalized_gtin
# Process price if being updated
if "price" in update_data and update_data["price"]:
parsed_price, currency = self.price_processor.parse_price_currency(
update_data["price"]
)
if parsed_price:
update_data["price"] = parsed_price
update_data["currency"] = currency
for key, value in update_data.items():
setattr(product, key, value)
product.updated_at = datetime.utcnow()
db.commit()
db.refresh(product)
logger.info(f"Updated product {product_id}")
return product
def delete_product(self, db: Session, product_id: str) -> bool:
"""Delete product and associated stock."""
product = db.query(Product).filter(Product.product_id == product_id).first()
if not product:
raise ValueError("Product not found")
# Delete associated stock entries if GTIN exists
if product.gtin:
db.query(Stock).filter(Stock.gtin == product.gtin).delete()
db.delete(product)
db.commit()
logger.info(f"Deleted product {product_id}")
return True
def get_stock_info(self, db: Session, gtin: str) -> Optional[StockSummaryResponse]:
"""Get stock information for a product by GTIN."""
stock_entries = db.query(Stock).filter(Stock.gtin == gtin).all()
if not stock_entries:
try:
return db.query(Product).filter(Product.product_id == product_id).first()
except Exception as e:
logger.error(f"Error getting product {product_id}: {str(e)}")
return None
total_quantity = sum(entry.quantity for entry in stock_entries)
locations = [
StockLocationResponse(location=entry.location, quantity=entry.quantity)
for entry in stock_entries
]
def get_product_by_id_or_raise(self, db: Session, product_id: str) -> Product:
"""
Get a product by its ID or raise exception.
return StockSummaryResponse(
gtin=gtin, total_quantity=total_quantity, locations=locations
)
Args:
db: Database session
product_id: Product ID to find
def generate_csv_export(
self,
db: Session,
marketplace: Optional[str] = None,
shop_name: Optional[str] = None,
) -> Generator[str, None, None]:
"""Generate CSV export with streaming for memory efficiency."""
# CSV header
yield (
"product_id,title,description,link,image_link,availability,price,currency,brand,"
"gtin,marketplace,shop_name\n"
)
Returns:
Product object
batch_size = 1000
offset = 0
Raises:
ProductNotFoundException: If product doesn't exist
"""
product = self.get_product_by_id(db, product_id)
if not product:
raise ProductNotFoundException(product_id)
return product
while True:
def get_products_with_filters(
self,
db: Session,
skip: int = 0,
limit: int = 100,
brand: Optional[str] = None,
category: Optional[str] = None,
availability: Optional[str] = None,
marketplace: Optional[str] = None,
shop_name: Optional[str] = None,
search: Optional[str] = None,
) -> Tuple[List[Product], int]:
"""
Get products with filtering and pagination.
Args:
db: Database session
skip: Number of records to skip
limit: Maximum records to return
brand: Brand filter
category: Category filter
availability: Availability filter
marketplace: Marketplace filter
shop_name: Shop name filter
search: Search term
Returns:
Tuple of (products_list, total_count)
"""
try:
query = db.query(Product)
# Apply marketplace filters
# Apply filters
if brand:
query = query.filter(Product.brand.ilike(f"%{brand}%"))
if category:
query = query.filter(Product.google_product_category.ilike(f"%{category}%"))
if availability:
query = query.filter(Product.availability == availability)
if marketplace:
query = query.filter(Product.marketplace.ilike(f"%{marketplace}%"))
if shop_name:
query = query.filter(Product.shop_name.ilike(f"%{shop_name}%"))
products = query.offset(offset).limit(batch_size).all()
if not products:
break
for product in products:
# Create CSV row with marketplace fields
row = (
f'"{product.product_id}","{product.title or ""}","{product.description or ""}",'
f'"{product.link or ""}","{product.image_link or ""}","{product.availability or ""}",'
f'"{product.price or ""}","{product.currency or ""}","{product.brand or ""}",'
f'"{product.gtin or ""}","{product.marketplace or ""}","{product.shop_name or ""}"\n'
if search:
# Search in title, description, marketplace, and shop_name
search_term = f"%{search}%"
query = query.filter(
(Product.title.ilike(search_term))
| (Product.description.ilike(search_term))
| (Product.marketplace.ilike(search_term))
| (Product.shop_name.ilike(search_term))
)
yield row
offset += batch_size
total = query.count()
products = query.offset(skip).limit(limit).all()
return products, total
except Exception as e:
logger.error(f"Error getting products with filters: {str(e)}")
raise ValidationException("Failed to retrieve products")
def update_product(
self, db: Session, product_id: str, product_update: ProductUpdate
) -> Product:
"""
Update product with validation.
Args:
db: Database session
product_id: Product ID to update
product_update: Update data
Returns:
Updated Product object
Raises:
ProductNotFoundException: If product doesn't exist
InvalidProductDataException: If update data is invalid
ProductValidationException: If validation fails
"""
try:
product = self.get_product_by_id_or_raise(db, product_id)
# Update fields
update_data = product_update.model_dump(exclude_unset=True)
# Validate GTIN if being updated
if "gtin" in update_data and update_data["gtin"]:
normalized_gtin = self.gtin_processor.normalize(update_data["gtin"])
if not normalized_gtin:
raise InvalidProductDataException("Invalid GTIN format", field="gtin")
update_data["gtin"] = normalized_gtin
# Process price if being updated
if "price" in update_data and update_data["price"]:
try:
parsed_price, currency = self.price_processor.parse_price_currency(
update_data["price"]
)
if parsed_price:
update_data["price"] = parsed_price
update_data["currency"] = currency
except Exception as e:
raise InvalidProductDataException(f"Invalid price format: {str(e)}", field="price")
# Validate required fields if being updated
if "title" in update_data and (not update_data["title"] or not update_data["title"].strip()):
raise ProductValidationException("Product title cannot be empty", field="title")
for key, value in update_data.items():
setattr(product, key, value)
product.updated_at = datetime.utcnow()
db.commit()
db.refresh(product)
logger.info(f"Updated product {product_id}")
return product
except (ProductNotFoundException, InvalidProductDataException, ProductValidationException):
db.rollback()
raise # Re-raise custom exceptions
except Exception as e:
db.rollback()
logger.error(f"Error updating product {product_id}: {str(e)}")
raise ValidationException("Failed to update product")
def delete_product(self, db: Session, product_id: str) -> bool:
"""
Delete product and associated stock.
Args:
db: Database session
product_id: Product ID to delete
Returns:
True if deletion successful
Raises:
ProductNotFoundException: If product doesn't exist
"""
try:
product = self.get_product_by_id_or_raise(db, product_id)
# Delete associated stock entries if GTIN exists
if product.gtin:
db.query(Stock).filter(Stock.gtin == product.gtin).delete()
db.delete(product)
db.commit()
logger.info(f"Deleted product {product_id}")
return True
except ProductNotFoundException:
raise # Re-raise custom exceptions
except Exception as e:
db.rollback()
logger.error(f"Error deleting product {product_id}: {str(e)}")
raise ValidationException("Failed to delete product")
def get_stock_info(self, db: Session, gtin: str) -> Optional[StockSummaryResponse]:
"""
Get stock information for a product by GTIN.
Args:
db: Database session
gtin: GTIN to look up stock for
Returns:
StockSummaryResponse if stock found, None otherwise
"""
try:
stock_entries = db.query(Stock).filter(Stock.gtin == gtin).all()
if not stock_entries:
return None
total_quantity = sum(entry.quantity for entry in stock_entries)
locations = [
StockLocationResponse(location=entry.location, quantity=entry.quantity)
for entry in stock_entries
]
return StockSummaryResponse(
gtin=gtin, total_quantity=total_quantity, locations=locations
)
except Exception as e:
logger.error(f"Error getting stock info for GTIN {gtin}: {str(e)}")
return None
def generate_csv_export(
self,
db: Session,
marketplace: Optional[str] = None,
shop_name: Optional[str] = None,
) -> Generator[str, None, None]:
"""
Generate CSV export with streaming for memory efficiency.
Args:
db: Database session
marketplace: Optional marketplace filter
shop_name: Optional shop name filter
Yields:
CSV content as strings
"""
try:
# CSV header
yield (
"product_id,title,description,link,image_link,availability,price,currency,brand,"
"gtin,marketplace,shop_name\n"
)
batch_size = 1000
offset = 0
while True:
query = db.query(Product)
# Apply marketplace filters
if marketplace:
query = query.filter(Product.marketplace.ilike(f"%{marketplace}%"))
if shop_name:
query = query.filter(Product.shop_name.ilike(f"%{shop_name}%"))
products = query.offset(offset).limit(batch_size).all()
if not products:
break
for product in products:
# Create CSV row with marketplace fields
row = (
f'"{product.product_id}","{product.title or ""}","{product.description or ""}",'
f'"{product.link or ""}","{product.image_link or ""}","{product.availability or ""}",'
f'"{product.price or ""}","{product.currency or ""}","{product.brand or ""}",'
f'"{product.gtin or ""}","{product.marketplace or ""}","{product.shop_name or ""}"\n'
)
yield row
offset += batch_size
except Exception as e:
logger.error(f"Error generating CSV export: {str(e)}")
raise ValidationException("Failed to generate CSV export")
def product_exists(self, db: Session, product_id: str) -> bool:
"""Check if product exists by ID."""
return (
db.query(Product).filter(Product.product_id == product_id).first()
is not None
)
try:
return (
db.query(Product).filter(Product.product_id == product_id).first()
is not None
)
except Exception as e:
logger.error(f"Error checking if product exists: {str(e)}")
return False
# Private helper methods
def _validate_product_data(self, product_data: dict) -> None:
"""Validate product data structure."""
required_fields = ['product_id', 'title']
for field in required_fields:
if field not in product_data or not product_data[field]:
raise ProductValidationException(f"{field} is required", field=field)
def _normalize_product_data(self, product_data: dict) -> dict:
"""Normalize and clean product data."""
normalized = product_data.copy()
# Trim whitespace from string fields
string_fields = ['product_id', 'title', 'description', 'brand', 'marketplace', 'shop_name']
for field in string_fields:
if field in normalized and normalized[field]:
normalized[field] = normalized[field].strip()
return normalized
# Create service instance