from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from app.core.database import get_db from app.api.deps import get_current_user from models.api_models import (ProductListResponse, ProductResponse, ProductCreate, ProductDetailResponse, StockLocationResponse, StockSummaryResponse, ProductUpdate) from models.database_models import User, Product, Stock from datetime import datetime import logging from utils.data_processing import GTINProcessor, PriceProcessor router = APIRouter() logger = logging.getLogger(__name__) # Initialize processors gtin_processor = GTINProcessor() price_processor = PriceProcessor() # Enhanced Product Routes with Marketplace Support @router.get("/products", response_model=ProductListResponse) def get_products( skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), brand: Optional[str] = Query(None), category: Optional[str] = Query(None), availability: Optional[str] = Query(None), marketplace: Optional[str] = Query(None, description="Filter by marketplace"), shop_name: Optional[str] = Query(None, description="Filter by shop name"), search: Optional[str] = Query(None), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get products with advanced filtering including marketplace and shop (Protected)""" query = db.query(Product) # Apply filters if brand: query = query.filter(Product.brand.ilike(f"%{brand}%")) if category: query = query.filter(Product.google_product_category.ilike(f"%{category}%")) if availability: query = query.filter(Product.availability == availability) if marketplace: query = query.filter(Product.marketplace.ilike(f"%{marketplace}%")) if shop_name: query = query.filter(Product.shop_name.ilike(f"%{shop_name}%")) if search: # Search in title, description, and marketplace search_term = f"%{search}%" query = query.filter( (Product.title.ilike(search_term)) | (Product.description.ilike(search_term)) | (Product.marketplace.ilike(search_term)) | (Product.shop_name.ilike(search_term)) ) # Get total count for pagination total = query.count() # Apply pagination products = query.offset(skip).limit(limit).all() return ProductListResponse( products=products, total=total, skip=skip, limit=limit ) @router.post("/products", response_model=ProductResponse) def create_product( product: ProductCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Create a new product with validation and marketplace support (Protected)""" # Check if product_id already exists existing = db.query(Product).filter(Product.product_id == product.product_id).first() if existing: raise HTTPException(status_code=400, detail="Product with this ID already exists") # Process and validate GTIN if provided if product.gtin: normalized_gtin = gtin_processor.normalize(product.gtin) if not normalized_gtin: raise HTTPException(status_code=400, detail="Invalid GTIN format") product.gtin = normalized_gtin # Process price if provided if product.price: parsed_price, currency = price_processor.parse_price_currency(product.price) if parsed_price: product.price = parsed_price product.currency = currency # Set default marketplace if not provided if not product.marketplace: product.marketplace = "Letzshop" db_product = Product(**product.dict()) db.add(db_product) db.commit() db.refresh(db_product) logger.info( f"Created product {db_product.product_id} for marketplace {db_product.marketplace}, " f"shop {db_product.shop_name}") return db_product @router.get("/products/{product_id}", response_model=ProductDetailResponse) def get_product(product_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): """Get product with stock information (Protected)""" product = db.query(Product).filter(Product.product_id == product_id).first() if not product: raise HTTPException(status_code=404, detail="Product not found") # Get stock information if GTIN exists stock_info = None if product.gtin: stock_entries = db.query(Stock).filter(Stock.gtin == product.gtin).all() if stock_entries: total_quantity = sum(entry.quantity for entry in stock_entries) locations = [ StockLocationResponse(location=entry.location, quantity=entry.quantity) for entry in stock_entries ] stock_info = StockSummaryResponse( gtin=product.gtin, total_quantity=total_quantity, locations=locations ) return ProductDetailResponse( product=product, stock_info=stock_info ) @router.put("/products/{product_id}", response_model=ProductResponse) def update_product( product_id: str, product_update: ProductUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Update product with validation and marketplace support (Protected)""" product = db.query(Product).filter(Product.product_id == product_id).first() if not product: raise HTTPException(status_code=404, detail="Product not found") # Update fields update_data = product_update.dict(exclude_unset=True) # Validate GTIN if being updated if "gtin" in update_data and update_data["gtin"]: normalized_gtin = gtin_processor.normalize(update_data["gtin"]) if not normalized_gtin: raise HTTPException(status_code=400, detail="Invalid GTIN format") update_data["gtin"] = normalized_gtin # Process price if being updated if "price" in update_data and update_data["price"]: parsed_price, currency = price_processor.parse_price_currency(update_data["price"]) if parsed_price: update_data["price"] = parsed_price update_data["currency"] = currency for key, value in update_data.items(): setattr(product, key, value) product.updated_at = datetime.utcnow() db.commit() db.refresh(product) return product @router.delete("/products/{product_id}") def delete_product( product_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Delete product and associated stock (Protected)""" product = db.query(Product).filter(Product.product_id == product_id).first() if not product: raise HTTPException(status_code=404, detail="Product not found") # Delete associated stock entries if GTIN exists if product.gtin: db.query(Stock).filter(Stock.gtin == product.gtin).delete() db.delete(product) db.commit() return {"message": "Product and associated stock deleted successfully"} # Export with streaming for large datasets (Protected) @router.get("/export-csv") async def export_csv( marketplace: Optional[str] = Query(None, description="Filter by marketplace"), shop_name: Optional[str] = Query(None, description="Filter by shop name"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Export products as CSV with streaming and marketplace filtering (Protected)""" def generate_csv(): # Stream CSV generation for memory efficiency yield "product_id,title,description,link,image_link,availability,price,currency,brand,gtin,marketplace,shop_name\n" batch_size = 1000 offset = 0 while True: query = db.query(Product) # Apply marketplace filters if marketplace: query = query.filter(Product.marketplace.ilike(f"%{marketplace}%")) if shop_name: query = query.filter(Product.shop_name.ilike(f"%{shop_name}%")) products = query.offset(offset).limit(batch_size).all() if not products: break for product in products: # Create CSV row with marketplace fields row = (f'"{product.product_id}","{product.title or ""}","{product.description or ""}",' f'"{product.link or ""}","{product.image_link or ""}","{product.availability or ""}",' f'"{product.price or ""}","{product.currency or ""}","{product.brand or ""}",' f'"{product.gtin or ""}","{product.marketplace or ""}","{product.shop_name or ""}"\n') yield row offset += batch_size filename = "products_export" if marketplace: filename += f"_{marketplace}" if shop_name: filename += f"_{shop_name}" filename += ".csv" return StreamingResponse( generate_csv(), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename={filename}"} )