diff --git a/app/api/v1/marketplace.py b/app/api/v1/marketplace.py index f9849ba9..60445bce 100644 --- a/app/api/v1/marketplace.py +++ b/app/api/v1/marketplace.py @@ -7,8 +7,8 @@ from app.api.deps import get_current_user from app.tasks.background_tasks import process_marketplace_import from middleware.decorators import rate_limit from models.api_models import MarketplaceImportJobResponse, MarketplaceImportRequest -from models.database_models import User, MarketplaceImportJob, Shop -from datetime import datetime +from models.database_models import User +from marketplace_service import marketplace_service import logging router = APIRouter() @@ -25,50 +25,39 @@ async def import_products_from_marketplace( current_user: User = Depends(get_current_user) ): """Import products from marketplace CSV with background processing (Protected)""" + try: + logger.info( + f"Starting marketplace import: {request.marketplace} -> {request.shop_code} by user {current_user.username}") - logger.info( - f"Starting marketplace import: {request.marketplace} -> {request.shop_code} by user {current_user.username}") + # Create import job through service + import_job = marketplace_service.create_import_job(db, request, current_user) - # Verify shop exists and user has access - shop = db.query(Shop).filter(Shop.shop_code == request.shop_code).first() - if not shop: - raise HTTPException(status_code=404, detail="Shop not found") + # Process in background + background_tasks.add_task( + process_marketplace_import, + import_job.id, + request.url, + request.marketplace, + request.shop_code, + request.batch_size or 1000 + ) - # Check permissions: admin can import for any shop, others only for their own - if current_user.role != "admin" and shop.owner_id != current_user.id: - raise HTTPException(status_code=403, detail="Access denied to this shop") + return MarketplaceImportJobResponse( + job_id=import_job.id, + status="pending", + marketplace=request.marketplace, + shop_code=request.shop_code, + message=f"Marketplace import started from {request.marketplace}. Check status with " + f"/marketplace-import-status/{import_job.id}" + ) - # Create marketplace import job record - import_job = MarketplaceImportJob( - status="pending", - source_url=request.url, - marketplace=request.marketplace, - shop_code=request.shop_code, - user_id=current_user.id, - created_at=datetime.utcnow() - ) - db.add(import_job) - db.commit() - db.refresh(import_job) - - # Process in background - background_tasks.add_task( - process_marketplace_import, - import_job.id, - request.url, - request.marketplace, - request.shop_code, - request.batch_size or 1000 - ) - - return MarketplaceImportJobResponse( - job_id=import_job.id, - status="pending", - marketplace=request.marketplace, - shop_code=request.shop_code, - message=f"Marketplace import started from {request.marketplace}. Check status with " - f"/marketplace-import-status/{import_job.id}" - ) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=403, detail=str(e)) + except Exception as e: + logger.error(f"Error starting marketplace import: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.get("/marketplace-import-status/{job_id}", response_model=MarketplaceImportJobResponse) @@ -78,28 +67,17 @@ def get_marketplace_import_status( current_user: User = Depends(get_current_user) ): """Get status of marketplace import job (Protected)""" - job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() - if not job: - raise HTTPException(status_code=404, detail="Marketplace import job not found") + try: + job = marketplace_service.get_import_job_by_id(db, job_id, current_user) + return marketplace_service.convert_to_response_model(job) - # Users can only see their own jobs, admins can see all - if current_user.role != "admin" and job.user_id != current_user.id: - raise HTTPException(status_code=403, detail="Access denied to this import job") - - return MarketplaceImportJobResponse( - job_id=job.id, - status=job.status, - marketplace=job.marketplace, - shop_name=job.shop_name, - imported=job.imported_count or 0, - updated=job.updated_count or 0, - total_processed=job.total_processed or 0, - error_count=job.error_count or 0, - error_message=job.error_message, - created_at=job.created_at, - started_at=job.started_at, - completed_at=job.completed_at - ) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=403, detail=str(e)) + except Exception as e: + logger.error(f"Error getting import job status {job_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.get("/marketplace-import-jobs", response_model=List[MarketplaceImportJobResponse]) @@ -112,35 +90,73 @@ def get_marketplace_import_jobs( current_user: User = Depends(get_current_user) ): """Get marketplace import jobs with filtering (Protected)""" + try: + jobs = marketplace_service.get_import_jobs( + db=db, + user=current_user, + marketplace=marketplace, + shop_name=shop_name, + skip=skip, + limit=limit + ) - query = db.query(MarketplaceImportJob) + return [marketplace_service.convert_to_response_model(job) for job in jobs] - # Users can only see their own jobs, admins can see all - if current_user.role != "admin": - query = query.filter(MarketplaceImportJob.user_id == current_user.id) + except Exception as e: + logger.error(f"Error getting import jobs: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") - # Apply filters - if marketplace: - query = query.filter(MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%")) - if shop_name: - query = query.filter(MarketplaceImportJob.shop_name.ilike(f"%{shop_name}%")) - # Order by creation date (newest first) and apply pagination - jobs = query.order_by(MarketplaceImportJob.created_at.desc()).offset(skip).limit(limit).all() +@router.get("/marketplace-import-stats") +def get_marketplace_import_stats( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Get statistics about marketplace import jobs (Protected)""" + try: + stats = marketplace_service.get_job_stats(db, current_user) + return stats - return [ - MarketplaceImportJobResponse( - job_id=job.id, - status=job.status, - marketplace=job.marketplace, - shop_name=job.shop_name, - imported=job.imported_count or 0, - updated=job.updated_count or 0, - total_processed=job.total_processed or 0, - error_count=job.error_count or 0, - error_message=job.error_message, - created_at=job.created_at, - started_at=job.started_at, - completed_at=job.completed_at - ) for job in jobs - ] + except Exception as e: + logger.error(f"Error getting import stats: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.put("/marketplace-import-jobs/{job_id}/cancel", response_model=MarketplaceImportJobResponse) +def cancel_marketplace_import_job( + job_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Cancel a pending or running marketplace import job (Protected)""" + try: + job = marketplace_service.cancel_import_job(db, job_id, current_user) + return marketplace_service.convert_to_response_model(job) + + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=403, detail=str(e)) + except Exception as e: + logger.error(f"Error cancelling import job {job_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.delete("/marketplace-import-jobs/{job_id}") +def delete_marketplace_import_job( + job_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Delete a completed marketplace import job (Protected)""" + try: + marketplace_service.delete_import_job(db, job_id, current_user) + return {"message": "Marketplace import job deleted successfully"} + + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=403, detail=str(e)) + except Exception as e: + logger.error(f"Error deleting import job {job_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") diff --git a/app/api/v1/products.py b/app/api/v1/products.py index d2147d4f..43d8c1f9 100644 --- a/app/api/v1/products.py +++ b/app/api/v1/products.py @@ -11,15 +11,12 @@ from models.database_models import User, Product, Stock from datetime import datetime import logging -from utils.data_processing import GTINProcessor, PriceProcessor +from app.services.product_service import product_service + router = APIRouter() logger = logging.getLogger(__name__) -# Initialize processors -gtin_processor = GTINProcessor() -price_processor = PriceProcessor() - # Enhanced Product Routes with Marketplace Support @router.get("/products", response_model=ProductListResponse) @@ -37,41 +34,28 @@ def get_products( ): """Get products with advanced filtering including marketplace and shop (Protected)""" - query = db.query(Product) - - # Apply filters - if brand: - query = query.filter(Product.brand.ilike(f"%{brand}%")) - if category: - query = query.filter(Product.google_product_category.ilike(f"%{category}%")) - if availability: - query = query.filter(Product.availability == availability) - if marketplace: - query = query.filter(Product.marketplace.ilike(f"%{marketplace}%")) - if shop_name: - query = query.filter(Product.shop_name.ilike(f"%{shop_name}%")) - if search: - # Search in title, description, and marketplace - search_term = f"%{search}%" - query = query.filter( - (Product.title.ilike(search_term)) | - (Product.description.ilike(search_term)) | - (Product.marketplace.ilike(search_term)) | - (Product.shop_name.ilike(search_term)) + try: + products, total = product_service.get_products_with_filters( + db=db, + skip=skip, + limit=limit, + brand=brand, + category=category, + availability=availability, + marketplace=marketplace, + shop_name=shop_name, + search=search ) - # Get total count for pagination - total = query.count() - - # Apply pagination - products = query.offset(skip).limit(limit).all() - - return ProductListResponse( - products=products, - total=total, - skip=skip, - limit=limit - ) + return ProductListResponse( + products=products, + total=total, + skip=skip, + limit=limit + ) + except Exception as e: + logger.error(f"Error getting products: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.post("/products", response_model=ProductResponse) @@ -82,68 +66,54 @@ def create_product( ): """Create a new product with validation and marketplace support (Protected)""" - # Check if product_id already exists - existing = db.query(Product).filter(Product.product_id == product.product_id).first() - if existing: - raise HTTPException(status_code=400, detail="Product with this ID already exists") + try: + # Check if product_id already exists + existing = product_service.get_product_by_id(db, product.product_id) + if existing: + raise HTTPException(status_code=400, detail="Product with this ID already exists") - # Process and validate GTIN if provided - if product.gtin: - normalized_gtin = gtin_processor.normalize(product.gtin) - if not normalized_gtin: - raise HTTPException(status_code=400, detail="Invalid GTIN format") - product.gtin = normalized_gtin + db_product = product_service.create_product(db, product) - # Process price if provided - if product.price: - parsed_price, currency = price_processor.parse_price_currency(product.price) - if parsed_price: - product.price = parsed_price - product.currency = currency + logger.info( + f"Created product {db_product.product_id} for marketplace {db_product.marketplace}, " + f"shop {db_product.shop_name}") + return db_product - # Set default marketplace if not provided - if not product.marketplace: - product.marketplace = "Letzshop" - - db_product = Product(**product.dict()) - db.add(db_product) - db.commit() - db.refresh(db_product) - - logger.info( - f"Created product {db_product.product_id} for marketplace {db_product.marketplace}, " - f"shop {db_product.shop_name}") - return db_product + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error creating product: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.get("/products/{product_id}", response_model=ProductDetailResponse) -def get_product(product_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): +def get_product( + product_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): """Get product with stock information (Protected)""" - product = db.query(Product).filter(Product.product_id == product_id).first() - if not product: - raise HTTPException(status_code=404, detail="Product not found") + try: + product = product_service.get_product_by_id(db, product_id) + if not product: + raise HTTPException(status_code=404, detail="Product not found") - # Get stock information if GTIN exists - stock_info = None - if product.gtin: - stock_entries = db.query(Stock).filter(Stock.gtin == product.gtin).all() - if stock_entries: - total_quantity = sum(entry.quantity for entry in stock_entries) - locations = [ - StockLocationResponse(location=entry.location, quantity=entry.quantity) - for entry in stock_entries - ] - stock_info = StockSummaryResponse( - gtin=product.gtin, - total_quantity=total_quantity, - locations=locations - ) + # Get stock information if GTIN exists + stock_info = None + if product.gtin: + stock_info = product_service.get_stock_info(db, product.gtin) - return ProductDetailResponse( - product=product, - stock_info=stock_info - ) + return ProductDetailResponse( + product=product, + stock_info=stock_info + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting product {product_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.put("/products/{product_id}", response_model=ProductResponse) @@ -155,35 +125,21 @@ def update_product( ): """Update product with validation and marketplace support (Protected)""" - product = db.query(Product).filter(Product.product_id == product_id).first() - if not product: - raise HTTPException(status_code=404, detail="Product not found") + try: + product = product_service.get_product_by_id(db, product_id) + if not product: + raise HTTPException(status_code=404, detail="Product not found") - # Update fields - update_data = product_update.dict(exclude_unset=True) + updated_product = product_service.update_product(db, product_id, product_update) + return updated_product - # Validate GTIN if being updated - if "gtin" in update_data and update_data["gtin"]: - normalized_gtin = gtin_processor.normalize(update_data["gtin"]) - if not normalized_gtin: - raise HTTPException(status_code=400, detail="Invalid GTIN format") - update_data["gtin"] = normalized_gtin - - # Process price if being updated - if "price" in update_data and update_data["price"]: - parsed_price, currency = price_processor.parse_price_currency(update_data["price"]) - if parsed_price: - update_data["price"] = parsed_price - update_data["currency"] = currency - - for key, value in update_data.items(): - setattr(product, key, value) - - product.updated_at = datetime.utcnow() - db.commit() - db.refresh(product) - - return product + except HTTPException: + raise + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error updating product {product_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.delete("/products/{product_id}") @@ -194,18 +150,21 @@ def delete_product( ): """Delete product and associated stock (Protected)""" - product = db.query(Product).filter(Product.product_id == product_id).first() - if not product: - raise HTTPException(status_code=404, detail="Product not found") + try: + product = product_service.get_product_by_id(db, product_id) + if not product: + raise HTTPException(status_code=404, detail="Product not found") - # Delete associated stock entries if GTIN exists - if product.gtin: - db.query(Stock).filter(Stock.gtin == product.gtin).delete() + product_service.delete_product(db, product_id) - db.delete(product) - db.commit() + return {"message": "Product and associated stock deleted successfully"} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting product {product_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") - return {"message": "Product and associated stock deleted successfully"} # Export with streaming for large datasets (Protected) @router.get("/export-csv") @@ -217,45 +176,27 @@ async def export_csv( ): """Export products as CSV with streaming and marketplace filtering (Protected)""" - def generate_csv(): - # Stream CSV generation for memory efficiency - yield "product_id,title,description,link,image_link,availability,price,currency,brand,gtin,marketplace,shop_name\n" + try: + def generate_csv(): + return product_service.generate_csv_export( + db=db, + marketplace=marketplace, + shop_name=shop_name + ) - batch_size = 1000 - offset = 0 + filename = "products_export" + if marketplace: + filename += f"_{marketplace}" + if shop_name: + filename += f"_{shop_name}" + filename += ".csv" - while True: - query = db.query(Product) + return StreamingResponse( + generate_csv(), + media_type="text/csv", + headers={"Content-Disposition": f"attachment; filename={filename}"} + ) - # Apply marketplace filters - if marketplace: - query = query.filter(Product.marketplace.ilike(f"%{marketplace}%")) - if shop_name: - query = query.filter(Product.shop_name.ilike(f"%{shop_name}%")) - - products = query.offset(offset).limit(batch_size).all() - if not products: - break - - for product in products: - # Create CSV row with marketplace fields - row = (f'"{product.product_id}","{product.title or ""}","{product.description or ""}",' - f'"{product.link or ""}","{product.image_link or ""}","{product.availability or ""}",' - f'"{product.price or ""}","{product.currency or ""}","{product.brand or ""}",' - f'"{product.gtin or ""}","{product.marketplace or ""}","{product.shop_name or ""}"\n') - yield row - - offset += batch_size - - filename = "products_export" - if marketplace: - filename += f"_{marketplace}" - if shop_name: - filename += f"_{shop_name}" - filename += ".csv" - - return StreamingResponse( - generate_csv(), - media_type="text/csv", - headers={"Content-Disposition": f"attachment; filename={filename}"} - ) + except Exception as e: + logger.error(f"Error exporting CSV: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") diff --git a/app/api/v1/stock.py b/app/api/v1/stock.py index d60247e0..f57a6ff3 100644 --- a/app/api/v1/stock.py +++ b/app/api/v1/stock.py @@ -6,10 +6,10 @@ from app.core.database import get_db from app.api.deps import get_current_user from app.tasks.background_tasks import process_marketplace_import from middleware.decorators import rate_limit -from models.api_models import MarketplaceImportJobResponse, MarketplaceImportRequest, StockResponse, \ - StockSummaryResponse +from models.api_models import (MarketplaceImportJobResponse, MarketplaceImportRequest, StockResponse, + StockSummaryResponse, StockCreate, StockAdd, StockUpdate) from models.database_models import User, MarketplaceImportJob, Shop -from datetime import datetime +from stock_service import stock_service import logging router = APIRouter() @@ -19,236 +19,88 @@ logger = logging.getLogger(__name__) # Stock Management Routes (Protected) @router.post("/stock", response_model=StockResponse) -def set_stock(stock: StockCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): +def set_stock( + stock: StockCreate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): """Set exact stock quantity for a GTIN at a specific location (replaces existing quantity)""" - - # Normalize GTIN - def normalize_gtin(gtin_value): - if not gtin_value: - return None - gtin_str = str(gtin_value).strip() - if '.' in gtin_str: - gtin_str = gtin_str.split('.')[0] - gtin_clean = ''.join(filter(str.isdigit, gtin_str)) - if len(gtin_clean) in [8, 12, 13, 14]: - return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12) - return gtin_clean if gtin_clean else None - - normalized_gtin = normalize_gtin(stock.gtin) - if not normalized_gtin: - raise HTTPException(status_code=400, detail="Invalid GTIN format") - - # Check if stock entry already exists for this GTIN and location - existing_stock = db.query(Stock).filter( - Stock.gtin == normalized_gtin, - Stock.location == stock.location.strip().upper() - ).first() - - if existing_stock: - # Update existing stock (SET to exact quantity) - old_quantity = existing_stock.quantity - existing_stock.quantity = stock.quantity - existing_stock.updated_at = datetime.utcnow() - db.commit() - db.refresh(existing_stock) - logger.info(f"Updated stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity} → {stock.quantity}") - return existing_stock - else: - # Create new stock entry - new_stock = Stock( - gtin=normalized_gtin, - location=stock.location.strip().upper(), - quantity=stock.quantity - ) - db.add(new_stock) - db.commit() - db.refresh(new_stock) - logger.info(f"Created new stock for GTIN {normalized_gtin} at {stock.location}: {stock.quantity}") - return new_stock + try: + result = stock_service.set_stock(db, stock) + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error setting stock: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.post("/stock/add", response_model=StockResponse) -def add_stock(stock: StockAdd, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): +def add_stock( + stock: StockAdd, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): """Add quantity to existing stock for a GTIN at a specific location (adds to existing quantity)""" - - # Normalize GTIN - def normalize_gtin(gtin_value): - if not gtin_value: - return None - gtin_str = str(gtin_value).strip() - if '.' in gtin_str: - gtin_str = gtin_str.split('.')[0] - gtin_clean = ''.join(filter(str.isdigit, gtin_str)) - if len(gtin_clean) in [8, 12, 13, 14]: - return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12) - return gtin_clean if gtin_clean else None - - normalized_gtin = normalize_gtin(stock.gtin) - if not normalized_gtin: - raise HTTPException(status_code=400, detail="Invalid GTIN format") - - # Check if stock entry already exists for this GTIN and location - existing_stock = db.query(Stock).filter( - Stock.gtin == normalized_gtin, - Stock.location == stock.location.strip().upper() - ).first() - - if existing_stock: - # Add to existing stock - old_quantity = existing_stock.quantity - existing_stock.quantity += stock.quantity - existing_stock.updated_at = datetime.utcnow() - db.commit() - db.refresh(existing_stock) - logger.info( - f"Added stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity} + {stock.quantity} = {existing_stock.quantity}") - return existing_stock - else: - # Create new stock entry with the quantity - new_stock = Stock( - gtin=normalized_gtin, - location=stock.location.strip().upper(), - quantity=stock.quantity - ) - db.add(new_stock) - db.commit() - db.refresh(new_stock) - logger.info(f"Created new stock for GTIN {normalized_gtin} at {stock.location}: {stock.quantity}") - return new_stock + try: + result = stock_service.add_stock(db, stock) + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error adding stock: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.post("/stock/remove", response_model=StockResponse) -def remove_stock(stock: StockAdd, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): +def remove_stock( + stock: StockAdd, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): """Remove quantity from existing stock for a GTIN at a specific location""" - - # Normalize GTIN - def normalize_gtin(gtin_value): - if not gtin_value: - return None - gtin_str = str(gtin_value).strip() - if '.' in gtin_str: - gtin_str = gtin_str.split('.')[0] - gtin_clean = ''.join(filter(str.isdigit, gtin_str)) - if len(gtin_clean) in [8, 12, 13, 14]: - return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12) - return gtin_clean if gtin_clean else None - - normalized_gtin = normalize_gtin(stock.gtin) - if not normalized_gtin: - raise HTTPException(status_code=400, detail="Invalid GTIN format") - - # Find existing stock entry - existing_stock = db.query(Stock).filter( - Stock.gtin == normalized_gtin, - Stock.location == stock.location.strip().upper() - ).first() - - if not existing_stock: - raise HTTPException( - status_code=404, - detail=f"No stock found for GTIN {normalized_gtin} at location {stock.location}" - ) - - # Check if we have enough stock to remove - if existing_stock.quantity < stock.quantity: - raise HTTPException( - status_code=400, - detail=f"Insufficient stock. Available: {existing_stock.quantity}, Requested to remove: {stock.quantity}" - ) - - # Remove from existing stock - old_quantity = existing_stock.quantity - existing_stock.quantity -= stock.quantity - existing_stock.updated_at = datetime.utcnow() - db.commit() - db.refresh(existing_stock) - logger.info( - f"Removed stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity} - {stock.quantity} = {existing_stock.quantity}") - return existing_stock + try: + result = stock_service.remove_stock(db, stock) + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error removing stock: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.get("/stock/{gtin}", response_model=StockSummaryResponse) -def get_stock_by_gtin(gtin: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): +def get_stock_by_gtin( + gtin: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): """Get all stock locations and total quantity for a specific GTIN""" - - # Normalize GTIN - def normalize_gtin(gtin_value): - if not gtin_value: - return None - gtin_str = str(gtin_value).strip() - if '.' in gtin_str: - gtin_str = gtin_str.split('.')[0] - gtin_clean = ''.join(filter(str.isdigit, gtin_str)) - if len(gtin_clean) in [8, 12, 13, 14]: - return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12) - return gtin_clean if gtin_clean else None - - normalized_gtin = normalize_gtin(gtin) - if not normalized_gtin: - raise HTTPException(status_code=400, detail="Invalid GTIN format") - - # Get all stock entries for this GTIN - stock_entries = db.query(Stock).filter(Stock.gtin == normalized_gtin).all() - - if not stock_entries: - raise HTTPException(status_code=404, detail=f"No stock found for GTIN: {gtin}") - - # Calculate total quantity and build locations list - total_quantity = 0 - locations = [] - - for entry in stock_entries: - total_quantity += entry.quantity - locations.append(StockLocationResponse( - location=entry.location, - quantity=entry.quantity - )) - - # Try to get product title for reference - product = db.query(Product).filter(Product.gtin == normalized_gtin).first() - product_title = product.title if product else None - - return StockSummaryResponse( - gtin=normalized_gtin, - total_quantity=total_quantity, - locations=locations, - product_title=product_title - ) + try: + result = stock_service.get_stock_by_gtin(db, gtin) + return result + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Error getting stock for GTIN {gtin}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.get("/stock/{gtin}/total") -def get_total_stock(gtin: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): +def get_total_stock( + gtin: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): """Get total quantity in stock for a specific GTIN""" - - # Normalize GTIN - def normalize_gtin(gtin_value): - if not gtin_value: - return None - gtin_str = str(gtin_value).strip() - if '.' in gtin_str: - gtin_str = gtin_str.split('.')[0] - gtin_clean = ''.join(filter(str.isdigit, gtin_str)) - if len(gtin_clean) in [8, 12, 13, 14]: - return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12) - return gtin_clean if gtin_clean else None - - normalized_gtin = normalize_gtin(gtin) - if not normalized_gtin: - raise HTTPException(status_code=400, detail="Invalid GTIN format") - - # Calculate total stock - total_stock = db.query(Stock).filter(Stock.gtin == normalized_gtin).all() - total_quantity = sum(entry.quantity for entry in total_stock) - - # Get product info for context - product = db.query(Product).filter(Product.gtin == normalized_gtin).first() - - return { - "gtin": normalized_gtin, - "total_quantity": total_quantity, - "product_title": product.title if product else None, - "locations_count": len(total_stock) - } + try: + result = stock_service.get_total_stock(db, gtin) + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error getting total stock for GTIN {gtin}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.get("/stock", response_model=List[StockResponse]) @@ -261,55 +113,50 @@ def get_all_stock( current_user: User = Depends(get_current_user) ): """Get all stock entries with optional filtering""" - query = db.query(Stock) - - if location: - query = query.filter(Stock.location.ilike(f"%{location}%")) - - if gtin: - # Normalize GTIN for search - def normalize_gtin(gtin_value): - if not gtin_value: - return None - gtin_str = str(gtin_value).strip() - if '.' in gtin_str: - gtin_str = gtin_str.split('.')[0] - gtin_clean = ''.join(filter(str.isdigit, gtin_str)) - if len(gtin_clean) in [8, 12, 13, 14]: - return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12) - return gtin_clean if gtin_clean else None - - normalized_gtin = normalize_gtin(gtin) - if normalized_gtin: - query = query.filter(Stock.gtin == normalized_gtin) - - stock_entries = query.offset(skip).limit(limit).all() - return stock_entries + try: + result = stock_service.get_all_stock( + db=db, + skip=skip, + limit=limit, + location=location, + gtin=gtin + ) + return result + except Exception as e: + logger.error(f"Error getting all stock: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.put("/stock/{stock_id}", response_model=StockResponse) -def update_stock(stock_id: int, stock_update: StockUpdate, db: Session = Depends(get_db), - current_user: User = Depends(get_current_user)): +def update_stock( + stock_id: int, + stock_update: StockUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): """Update stock quantity for a specific stock entry""" - stock_entry = db.query(Stock).filter(Stock.id == stock_id).first() - if not stock_entry: - raise HTTPException(status_code=404, detail="Stock entry not found") - - stock_entry.quantity = stock_update.quantity - stock_entry.updated_at = datetime.utcnow() - db.commit() - db.refresh(stock_entry) - return stock_entry + try: + result = stock_service.update_stock(db, stock_id, stock_update) + return result + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Error updating stock {stock_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") @router.delete("/stock/{stock_id}") -def delete_stock(stock_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): +def delete_stock( + stock_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): """Delete a stock entry""" - stock_entry = db.query(Stock).filter(Stock.id == stock_id).first() - if not stock_entry: - raise HTTPException(status_code=404, detail="Stock entry not found") - - db.delete(stock_entry) - db.commit() - return {"message": "Stock entry deleted successfully"} - + try: + stock_service.delete_stock(db, stock_id) + return {"message": "Stock entry deleted successfully"} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Error deleting stock {stock_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") diff --git a/app/services/marketplace_service.py b/app/services/marketplace_service.py new file mode 100644 index 00000000..0a790b73 --- /dev/null +++ b/app/services/marketplace_service.py @@ -0,0 +1,202 @@ +from sqlalchemy.orm import Session +from models.database_models import MarketplaceImportJob, Shop, User +from models.api_models import MarketplaceImportRequest, MarketplaceImportJobResponse +from typing import Optional, List +from datetime import datetime +import logging + +logger = logging.getLogger(__name__) + + +class MarketplaceService: + def __init__(self): + pass + + def validate_shop_access(self, db: Session, shop_code: str, user: User) -> Shop: + """Validate that the shop exists and user has access to it""" + shop = db.query(Shop).filter(Shop.shop_code == shop_code).first() + if not shop: + raise ValueError("Shop not found") + + # Check permissions: admin can import for any shop, others only for their own + if user.role != "admin" and shop.owner_id != user.id: + raise PermissionError("Access denied to this shop") + + return shop + + def create_import_job( + self, + db: Session, + request: MarketplaceImportRequest, + user: User + ) -> MarketplaceImportJob: + """Create a new marketplace import job""" + # Validate shop access first + shop = self.validate_shop_access(db, request.shop_code, user) + + # Create marketplace import job record + import_job = MarketplaceImportJob( + status="pending", + source_url=request.url, + marketplace=request.marketplace, + shop_code=request.shop_code, + user_id=user.id, + created_at=datetime.utcnow() + ) + + db.add(import_job) + db.commit() + db.refresh(import_job) + + logger.info( + f"Created marketplace import job {import_job.id}: {request.marketplace} -> {request.shop_code} by user {user.username}") + + return import_job + + def get_import_job_by_id(self, db: Session, job_id: int, user: User) -> MarketplaceImportJob: + """Get a marketplace import job by ID with access control""" + job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() + if not job: + raise ValueError("Marketplace import job not found") + + # Users can only see their own jobs, admins can see all + if user.role != "admin" and job.user_id != user.id: + raise PermissionError("Access denied to this import job") + + return job + + def get_import_jobs( + self, + db: Session, + user: User, + marketplace: Optional[str] = None, + shop_name: Optional[str] = None, + skip: int = 0, + limit: int = 50 + ) -> List[MarketplaceImportJob]: + """Get marketplace import jobs with filtering and access control""" + query = db.query(MarketplaceImportJob) + + # Users can only see their own jobs, admins can see all + if user.role != "admin": + query = query.filter(MarketplaceImportJob.user_id == user.id) + + # Apply filters + if marketplace: + query = query.filter(MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%")) + if shop_name: + query = query.filter(MarketplaceImportJob.shop_name.ilike(f"%{shop_name}%")) + + # Order by creation date (newest first) and apply pagination + jobs = query.order_by(MarketplaceImportJob.created_at.desc()).offset(skip).limit(limit).all() + + return jobs + + def update_job_status( + self, + db: Session, + job_id: int, + status: str, + **kwargs + ) -> MarketplaceImportJob: + """Update marketplace import job status and other fields""" + job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() + if not job: + raise ValueError("Marketplace import job not found") + + job.status = status + + # Update optional fields if provided + if 'imported_count' in kwargs: + job.imported_count = kwargs['imported_count'] + if 'updated_count' in kwargs: + job.updated_count = kwargs['updated_count'] + if 'total_processed' in kwargs: + job.total_processed = kwargs['total_processed'] + if 'error_count' in kwargs: + job.error_count = kwargs['error_count'] + if 'error_message' in kwargs: + job.error_message = kwargs['error_message'] + if 'started_at' in kwargs: + job.started_at = kwargs['started_at'] + if 'completed_at' in kwargs: + job.completed_at = kwargs['completed_at'] + + db.commit() + db.refresh(job) + + logger.info(f"Updated marketplace import job {job_id} status to {status}") + return job + + def get_job_stats(self, db: Session, user: User) -> dict: + """Get statistics about marketplace import jobs for a user""" + query = db.query(MarketplaceImportJob) + + # Users can only see their own jobs, admins can see all + if user.role != "admin": + query = query.filter(MarketplaceImportJob.user_id == user.id) + + total_jobs = query.count() + pending_jobs = query.filter(MarketplaceImportJob.status == "pending").count() + running_jobs = query.filter(MarketplaceImportJob.status == "running").count() + completed_jobs = query.filter(MarketplaceImportJob.status == "completed").count() + failed_jobs = query.filter(MarketplaceImportJob.status == "failed").count() + + return { + "total_jobs": total_jobs, + "pending_jobs": pending_jobs, + "running_jobs": running_jobs, + "completed_jobs": completed_jobs, + "failed_jobs": failed_jobs + } + + def convert_to_response_model(self, job: MarketplaceImportJob) -> MarketplaceImportJobResponse: + """Convert database model to API response model""" + return MarketplaceImportJobResponse( + job_id=job.id, + status=job.status, + marketplace=job.marketplace, + shop_name=job.shop_name, + imported=job.imported_count or 0, + updated=job.updated_count or 0, + total_processed=job.total_processed or 0, + error_count=job.error_count or 0, + error_message=job.error_message, + created_at=job.created_at, + started_at=job.started_at, + completed_at=job.completed_at + ) + + def cancel_import_job(self, db: Session, job_id: int, user: User) -> MarketplaceImportJob: + """Cancel a pending or running import job""" + job = self.get_import_job_by_id(db, job_id, user) + + if job.status not in ["pending", "running"]: + raise ValueError(f"Cannot cancel job with status: {job.status}") + + job.status = "cancelled" + job.completed_at = datetime.utcnow() + + db.commit() + db.refresh(job) + + logger.info(f"Cancelled marketplace import job {job_id}") + return job + + def delete_import_job(self, db: Session, job_id: int, user: User) -> bool: + """Delete a marketplace import job""" + job = self.get_import_job_by_id(db, job_id, user) + + # Only allow deletion of completed, failed, or cancelled jobs + if job.status in ["pending", "running"]: + raise ValueError(f"Cannot delete job with status: {job.status}. Cancel it first.") + + db.delete(job) + db.commit() + + logger.info(f"Deleted marketplace import job {job_id}") + return True + + +# Create service instance +marketplace_service = MarketplaceService() diff --git a/app/services/product_service.py b/app/services/product_service.py index 8a6b9f67..f605d8ee 100644 --- a/app/services/product_service.py +++ b/app/services/product_service.py @@ -1,8 +1,9 @@ from sqlalchemy.orm import Session -from models.database_models import Product -from models.api_models import ProductCreate +from models.database_models import Product, Stock +from models.api_models import ProductCreate, ProductUpdate, StockLocationResponse, StockSummaryResponse from utils.data_processing import GTINProcessor, PriceProcessor -from typing import Optional, List +from typing import Optional, List, Generator +from datetime import datetime import logging logger = logging.getLogger(__name__) @@ -41,6 +42,10 @@ class ProductService: logger.info(f"Created product {db_product.product_id}") return db_product + def get_product_by_id(self, db: Session, product_id: str) -> Optional[Product]: + """Get a product by its ID""" + return db.query(Product).filter(Product.product_id == product_id).first() + def get_products_with_filters( self, db: Session, @@ -48,7 +53,9 @@ class ProductService: limit: int = 100, brand: Optional[str] = None, category: Optional[str] = None, + availability: Optional[str] = None, marketplace: Optional[str] = None, + shop_name: Optional[str] = None, search: Optional[str] = None ) -> tuple[List[Product], int]: """Get products with filtering and pagination""" @@ -59,14 +66,20 @@ class ProductService: query = query.filter(Product.brand.ilike(f"%{brand}%")) if category: query = query.filter(Product.google_product_category.ilike(f"%{category}%")) + if availability: + query = query.filter(Product.availability == availability) if marketplace: query = query.filter(Product.marketplace.ilike(f"%{marketplace}%")) + if shop_name: + query = query.filter(Product.shop_name.ilike(f"%{shop_name}%")) if search: + # Search in title, description, marketplace, and shop_name search_term = f"%{search}%" query = query.filter( (Product.title.ilike(search_term)) | (Product.description.ilike(search_term)) | - (Product.marketplace.ilike(search_term)) + (Product.marketplace.ilike(search_term)) | + (Product.shop_name.ilike(search_term)) ) total = query.count() @@ -74,6 +87,114 @@ class ProductService: return products, total + def update_product(self, db: Session, product_id: str, product_update: ProductUpdate) -> Product: + """Update product with validation""" + product = db.query(Product).filter(Product.product_id == product_id).first() + if not product: + raise ValueError("Product not found") + + # Update fields + update_data = product_update.dict(exclude_unset=True) + + # Validate GTIN if being updated + if "gtin" in update_data and update_data["gtin"]: + normalized_gtin = self.gtin_processor.normalize(update_data["gtin"]) + if not normalized_gtin: + raise ValueError("Invalid GTIN format") + update_data["gtin"] = normalized_gtin + + # Process price if being updated + if "price" in update_data and update_data["price"]: + parsed_price, currency = self.price_processor.parse_price_currency(update_data["price"]) + if parsed_price: + update_data["price"] = parsed_price + update_data["currency"] = currency + + for key, value in update_data.items(): + setattr(product, key, value) + + product.updated_at = datetime.utcnow() + db.commit() + db.refresh(product) + + logger.info(f"Updated product {product_id}") + return product + + def delete_product(self, db: Session, product_id: str) -> bool: + """Delete product and associated stock""" + product = db.query(Product).filter(Product.product_id == product_id).first() + if not product: + raise ValueError("Product not found") + + # Delete associated stock entries if GTIN exists + if product.gtin: + db.query(Stock).filter(Stock.gtin == product.gtin).delete() + + db.delete(product) + db.commit() + + logger.info(f"Deleted product {product_id}") + return True + + def get_stock_info(self, db: Session, gtin: str) -> Optional[StockSummaryResponse]: + """Get stock information for a product by GTIN""" + stock_entries = db.query(Stock).filter(Stock.gtin == gtin).all() + if not stock_entries: + return None + + total_quantity = sum(entry.quantity for entry in stock_entries) + locations = [ + StockLocationResponse(location=entry.location, quantity=entry.quantity) + for entry in stock_entries + ] + + return StockSummaryResponse( + gtin=gtin, + total_quantity=total_quantity, + locations=locations + ) + + def generate_csv_export( + self, + db: Session, + marketplace: Optional[str] = None, + shop_name: Optional[str] = None + ) -> Generator[str, None, None]: + """Generate CSV export with streaming for memory efficiency""" + # CSV header + yield ("product_id,title,description,link,image_link,availability,price,currency,brand," + "gtin,marketplace,shop_name\n") + + batch_size = 1000 + offset = 0 + + while True: + query = db.query(Product) + + # Apply marketplace filters + if marketplace: + query = query.filter(Product.marketplace.ilike(f"%{marketplace}%")) + if shop_name: + query = query.filter(Product.shop_name.ilike(f"%{shop_name}%")) + + products = query.offset(offset).limit(batch_size).all() + if not products: + break + + for product in products: + # Create CSV row with marketplace fields + row = (f'"{product.product_id}","{product.title or ""}","{product.description or ""}",' + f'"{product.link or ""}","{product.image_link or ""}","{product.availability or ""}",' + f'"{product.price or ""}","{product.currency or ""}","{product.brand or ""}",' + f'"{product.gtin or ""}","{product.marketplace or ""}","{product.shop_name or ""}"\n') + yield row + + offset += batch_size + + def product_exists(self, db: Session, product_id: str) -> bool: + """Check if product exists by ID""" + return db.query(Product).filter(Product.product_id == product_id).first() is not None + # Create service instance product_service = ProductService() diff --git a/app/services/stock_service.py b/app/services/stock_service.py index e69de29b..8dad7930 100644 --- a/app/services/stock_service.py +++ b/app/services/stock_service.py @@ -0,0 +1,235 @@ +from sqlalchemy.orm import Session +from models.database_models import Stock, Product +from models.api_models import StockCreate, StockAdd, StockUpdate, StockLocationResponse, StockSummaryResponse +from utils.data_processing import GTINProcessor +from typing import Optional, List, Tuple +from datetime import datetime +import logging + +logger = logging.getLogger(__name__) + + +class StockService: + def __init__(self): + self.gtin_processor = GTINProcessor() + + def normalize_gtin(self, gtin_value) -> Optional[str]: + """Normalize GTIN format using the GTINProcessor""" + return self.gtin_processor.normalize(gtin_value) + + def set_stock(self, db: Session, stock_data: StockCreate) -> Stock: + """Set exact stock quantity for a GTIN at a specific location (replaces existing quantity)""" + normalized_gtin = self.normalize_gtin(stock_data.gtin) + if not normalized_gtin: + raise ValueError("Invalid GTIN format") + + location = stock_data.location.strip().upper() + + # Check if stock entry already exists for this GTIN and location + existing_stock = db.query(Stock).filter( + Stock.gtin == normalized_gtin, + Stock.location == location + ).first() + + if existing_stock: + # Update existing stock (SET to exact quantity) + old_quantity = existing_stock.quantity + existing_stock.quantity = stock_data.quantity + existing_stock.updated_at = datetime.utcnow() + db.commit() + db.refresh(existing_stock) + logger.info( + f"Updated stock for GTIN {normalized_gtin} at {location}: {old_quantity} → {stock_data.quantity}") + return existing_stock + else: + # Create new stock entry + new_stock = Stock( + gtin=normalized_gtin, + location=location, + quantity=stock_data.quantity + ) + db.add(new_stock) + db.commit() + db.refresh(new_stock) + logger.info(f"Created new stock for GTIN {normalized_gtin} at {location}: {stock_data.quantity}") + return new_stock + + def add_stock(self, db: Session, stock_data: StockAdd) -> Stock: + """Add quantity to existing stock for a GTIN at a specific location (adds to existing quantity)""" + normalized_gtin = self.normalize_gtin(stock_data.gtin) + if not normalized_gtin: + raise ValueError("Invalid GTIN format") + + location = stock_data.location.strip().upper() + + # Check if stock entry already exists for this GTIN and location + existing_stock = db.query(Stock).filter( + Stock.gtin == normalized_gtin, + Stock.location == location + ).first() + + if existing_stock: + # Add to existing stock + old_quantity = existing_stock.quantity + existing_stock.quantity += stock_data.quantity + existing_stock.updated_at = datetime.utcnow() + db.commit() + db.refresh(existing_stock) + logger.info( + f"Added stock for GTIN {normalized_gtin} at {location}: {old_quantity} + {stock_data.quantity} = {existing_stock.quantity}") + return existing_stock + else: + # Create new stock entry with the quantity + new_stock = Stock( + gtin=normalized_gtin, + location=location, + quantity=stock_data.quantity + ) + db.add(new_stock) + db.commit() + db.refresh(new_stock) + logger.info(f"Created new stock for GTIN {normalized_gtin} at {location}: {stock_data.quantity}") + return new_stock + + def remove_stock(self, db: Session, stock_data: StockAdd) -> Stock: + """Remove quantity from existing stock for a GTIN at a specific location""" + normalized_gtin = self.normalize_gtin(stock_data.gtin) + if not normalized_gtin: + raise ValueError("Invalid GTIN format") + + location = stock_data.location.strip().upper() + + # Find existing stock entry + existing_stock = db.query(Stock).filter( + Stock.gtin == normalized_gtin, + Stock.location == location + ).first() + + if not existing_stock: + raise ValueError(f"No stock found for GTIN {normalized_gtin} at location {location}") + + # Check if we have enough stock to remove + if existing_stock.quantity < stock_data.quantity: + raise ValueError( + f"Insufficient stock. Available: {existing_stock.quantity}, Requested to remove: {stock_data.quantity}") + + # Remove from existing stock + old_quantity = existing_stock.quantity + existing_stock.quantity -= stock_data.quantity + existing_stock.updated_at = datetime.utcnow() + db.commit() + db.refresh(existing_stock) + logger.info( + f"Removed stock for GTIN {normalized_gtin} at {location}: {old_quantity} - {stock_data.quantity} = {existing_stock.quantity}") + return existing_stock + + def get_stock_by_gtin(self, db: Session, gtin: str) -> StockSummaryResponse: + """Get all stock locations and total quantity for a specific GTIN""" + normalized_gtin = self.normalize_gtin(gtin) + if not normalized_gtin: + raise ValueError("Invalid GTIN format") + + # Get all stock entries for this GTIN + stock_entries = db.query(Stock).filter(Stock.gtin == normalized_gtin).all() + + if not stock_entries: + raise ValueError(f"No stock found for GTIN: {gtin}") + + # Calculate total quantity and build locations list + total_quantity = 0 + locations = [] + + for entry in stock_entries: + total_quantity += entry.quantity + locations.append(StockLocationResponse( + location=entry.location, + quantity=entry.quantity + )) + + # Try to get product title for reference + product = db.query(Product).filter(Product.gtin == normalized_gtin).first() + product_title = product.title if product else None + + return StockSummaryResponse( + gtin=normalized_gtin, + total_quantity=total_quantity, + locations=locations, + product_title=product_title + ) + + def get_total_stock(self, db: Session, gtin: str) -> dict: + """Get total quantity in stock for a specific GTIN""" + normalized_gtin = self.normalize_gtin(gtin) + if not normalized_gtin: + raise ValueError("Invalid GTIN format") + + # Calculate total stock + total_stock = db.query(Stock).filter(Stock.gtin == normalized_gtin).all() + total_quantity = sum(entry.quantity for entry in total_stock) + + # Get product info for context + product = db.query(Product).filter(Product.gtin == normalized_gtin).first() + + return { + "gtin": normalized_gtin, + "total_quantity": total_quantity, + "product_title": product.title if product else None, + "locations_count": len(total_stock) + } + + def get_all_stock( + self, + db: Session, + skip: int = 0, + limit: int = 100, + location: Optional[str] = None, + gtin: Optional[str] = None + ) -> List[Stock]: + """Get all stock entries with optional filtering""" + query = db.query(Stock) + + if location: + query = query.filter(Stock.location.ilike(f"%{location}%")) + + if gtin: + normalized_gtin = self.normalize_gtin(gtin) + if normalized_gtin: + query = query.filter(Stock.gtin == normalized_gtin) + + return query.offset(skip).limit(limit).all() + + def update_stock(self, db: Session, stock_id: int, stock_update: StockUpdate) -> Stock: + """Update stock quantity for a specific stock entry""" + stock_entry = db.query(Stock).filter(Stock.id == stock_id).first() + if not stock_entry: + raise ValueError("Stock entry not found") + + stock_entry.quantity = stock_update.quantity + stock_entry.updated_at = datetime.utcnow() + db.commit() + db.refresh(stock_entry) + + logger.info(f"Updated stock entry {stock_id} to quantity {stock_update.quantity}") + return stock_entry + + def delete_stock(self, db: Session, stock_id: int) -> bool: + """Delete a stock entry""" + stock_entry = db.query(Stock).filter(Stock.id == stock_id).first() + if not stock_entry: + raise ValueError("Stock entry not found") + + gtin = stock_entry.gtin + location = stock_entry.location + db.delete(stock_entry) + db.commit() + + logger.info(f"Deleted stock entry {stock_id} for GTIN {gtin} at {location}") + return True + + def get_stock_by_id(self, db: Session, stock_id: int) -> Optional[Stock]: + """Get a stock entry by its ID""" + return db.query(Stock).filter(Stock.id == stock_id).first() + + +# Create service instance +stock_service = StockService() diff --git a/tests/test_marketplace_service.py b/tests/test_marketplace_service.py new file mode 100644 index 00000000..e0c382dd --- /dev/null +++ b/tests/test_marketplace_service.py @@ -0,0 +1,313 @@ +# tests/test_marketplace_service.py +import pytest +from app.services.marketplace_service import MarketplaceService +from models.api_models import MarketplaceImportRequest +from models.database_models import MarketplaceImportJob, Shop, User +from datetime import datetime + + +class TestMarketplaceService: + def setup_method(self): + self.service = MarketplaceService() + + def test_validate_shop_access_success(self, db, test_shop, test_user): + """Test successful shop access validation""" + # Set the shop owner to the test user + test_shop.owner_id = test_user.id + db.commit() + + result = self.service.validate_shop_access(db, test_shop.shop_code, test_user) + + assert result.shop_code == test_shop.shop_code + assert result.owner_id == test_user.id + + def test_validate_shop_access_admin_can_access_any_shop(self, db, test_shop, admin_user): + """Test that admin users can access any shop""" + result = self.service.validate_shop_access(db, test_shop.shop_code, admin_user) + + assert result.shop_code == test_shop.shop_code + + def test_validate_shop_access_shop_not_found(self, db, test_user): + """Test shop access validation when shop doesn't exist""" + with pytest.raises(ValueError, match="Shop not found"): + self.service.validate_shop_access(db, "NONEXISTENT", test_user) + + def test_validate_shop_access_permission_denied(self, db, test_shop, test_user, other_user): + """Test shop access validation when user doesn't own the shop""" + # Set the shop owner to a different user + test_shop.owner_id = other_user.id + db.commit() + + with pytest.raises(PermissionError, match="Access denied to this shop"): + self.service.validate_shop_access(db, test_shop.shop_code, test_user) + + def test_create_import_job_success(self, db, test_shop, test_user): + """Test successful creation of import job""" + # Set the shop owner to the test user + test_shop.owner_id = test_user.id + db.commit() + + request = MarketplaceImportRequest( + url="https://example.com/products.csv", + marketplace="Amazon", + shop_code=test_shop.shop_code, + batch_size=1000 + ) + + result = self.service.create_import_job(db, request, test_user) + + assert result.marketplace == "Amazon" + assert result.shop_code == test_shop.shop_code + assert result.user_id == test_user.id + assert result.status == "pending" + assert result.source_url == "https://example.com/products.csv" + + def test_create_import_job_invalid_shop(self, db, test_user): + """Test import job creation with invalid shop""" + request = MarketplaceImportRequest( + url="https://example.com/products.csv", + marketplace="Amazon", + shop_code="INVALID_SHOP", + batch_size=1000 + ) + + with pytest.raises(ValueError, match="Shop not found"): + self.service.create_import_job(db, request, test_user) + + def test_get_import_job_by_id_success(self, db, test_import_job, test_user): + """Test getting import job by ID for job owner""" + result = self.service.get_import_job_by_id(db, test_import_job.id, test_user) + + assert result.id == test_import_job.id + assert result.user_id == test_user.id + + def test_get_import_job_by_id_admin_access(self, db, test_import_job, admin_user): + """Test that admin can access any import job""" + result = self.service.get_import_job_by_id(db, test_import_job.id, admin_user) + + assert result.id == test_import_job.id + + def test_get_import_job_by_id_not_found(self, db, test_user): + """Test getting non-existent import job""" + with pytest.raises(ValueError, match="Marketplace import job not found"): + self.service.get_import_job_by_id(db, 99999, test_user) + + def test_get_import_job_by_id_access_denied(self, db, test_import_job, other_user): + """Test access denied when user doesn't own the job""" + with pytest.raises(PermissionError, match="Access denied to this import job"): + self.service.get_import_job_by_id(db, test_import_job.id, other_user) + + def test_get_import_jobs_user_filter(self, db, test_import_job, test_user): + """Test getting import jobs filtered by user""" + jobs = self.service.get_import_jobs(db, test_user) + + assert len(jobs) == 1 + assert jobs[0].id == test_import_job.id + assert jobs[0].user_id == test_user.id + + def test_get_import_jobs_admin_sees_all(self, db, test_import_job, admin_user): + """Test that admin sees all import jobs""" + jobs = self.service.get_import_jobs(db, admin_user) + + assert len(jobs) >= 1 + assert any(job.id == test_import_job.id for job in jobs) + + def test_get_import_jobs_with_marketplace_filter(self, db, test_import_job, test_user): + """Test getting import jobs with marketplace filter""" + jobs = self.service.get_import_jobs( + db, test_user, marketplace=test_import_job.marketplace + ) + + assert len(jobs) == 1 + assert jobs[0].marketplace == test_import_job.marketplace + + def test_get_import_jobs_with_pagination(self, db, test_user): + """Test getting import jobs with pagination""" + # Create multiple import jobs + for i in range(5): + job = MarketplaceImportJob( + status="completed", + marketplace=f"Marketplace_{i}", + shop_code="TEST_SHOP", + user_id=test_user.id, + created_at=datetime.utcnow() + ) + db.add(job) + db.commit() + + jobs = self.service.get_import_jobs(db, test_user, skip=2, limit=2) + + assert len(jobs) == 2 + + def test_update_job_status_success(self, db, test_import_job): + """Test updating job status""" + result = self.service.update_job_status( + db, + test_import_job.id, + "completed", + imported_count=100, + total_processed=100 + ) + + assert result.status == "completed" + assert result.imported_count == 100 + assert result.total_processed == 100 + + def test_update_job_status_not_found(self, db): + """Test updating non-existent job status""" + with pytest.raises(ValueError, match="Marketplace import job not found"): + self.service.update_job_status(db, 99999, "completed") + + def test_get_job_stats_user(self, db, test_import_job, test_user): + """Test getting job statistics for user""" + stats = self.service.get_job_stats(db, test_user) + + assert stats["total_jobs"] >= 1 + assert "pending_jobs" in stats + assert "running_jobs" in stats + assert "completed_jobs" in stats + assert "failed_jobs" in stats + + def test_get_job_stats_admin(self, db, test_import_job, admin_user): + """Test getting job statistics for admin""" + stats = self.service.get_job_stats(db, admin_user) + + assert stats["total_jobs"] >= 1 + + def test_convert_to_response_model(self, test_import_job): + """Test converting database model to response model""" + response = self.service.convert_to_response_model(test_import_job) + + assert response.job_id == test_import_job.id + assert response.status == test_import_job.status + assert response.marketplace == test_import_job.marketplace + assert response.imported == (test_import_job.imported_count or 0) + + def test_cancel_import_job_success(self, db, test_user): + """Test cancelling a pending import job""" + # Create a pending job + job = MarketplaceImportJob( + status="pending", + marketplace="Amazon", + shop_code="TEST_SHOP", + user_id=test_user.id, + created_at=datetime.utcnow() + ) + db.add(job) + db.commit() + db.refresh(job) + + result = self.service.cancel_import_job(db, job.id, test_user) + + assert result.status == "cancelled" + assert result.completed_at is not None + + def test_cancel_import_job_invalid_status(self, db, test_import_job, test_user): + """Test cancelling a job that can't be cancelled""" + # Set job status to completed + test_import_job.status = "completed" + db.commit() + + with pytest.raises(ValueError, match="Cannot cancel job with status: completed"): + self.service.cancel_import_job(db, test_import_job.id, test_user) + + def test_delete_import_job_success(self, db, test_user): + """Test deleting a completed import job""" + # Create a completed job + job = MarketplaceImportJob( + status="completed", + marketplace="Amazon", + shop_code="TEST_SHOP", + user_id=test_user.id, + created_at=datetime.utcnow() + ) + db.add(job) + db.commit() + db.refresh(job) + job_id = job.id + + result = self.service.delete_import_job(db, job_id, test_user) + + assert result is True + + # Verify the job is actually deleted + deleted_job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() + assert deleted_job is None + + def test_delete_import_job_invalid_status(self, db, test_user): + """Test deleting a job that can't be deleted""" + # Create a pending job + job = MarketplaceImportJob( + status="pending", + marketplace="Amazon", + shop_code="TEST_SHOP", + user_id=test_user.id, + created_at=datetime.utcnow() + ) + db.add(job) + db.commit() + db.refresh(job) + + with pytest.raises(ValueError, match="Cannot delete job with status: pending"): + self.service.delete_import_job(db, job.id, test_user) + + +# Additional fixtures for marketplace tests +@pytest.fixture +def test_shop(db): + """Create a test shop""" + shop = Shop( + shop_code="TEST_SHOP", + shop_name="Test Shop", + owner_id=1 # Will be updated in tests + ) + db.add(shop) + db.commit() + db.refresh(shop) + return shop + + +@pytest.fixture +def admin_user(db): + """Create a test admin user""" + user = User( + username="admin_user", + email="admin@test.com", + role="admin", + hashed_password="hashed_password" + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + +@pytest.fixture +def other_user(db): + """Create another test user""" + user = User( + username="other_user", + email="other@test.com", + role="user", + hashed_password="hashed_password" + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + +@pytest.fixture +def test_import_job(db, test_user): + """Create a test import job""" + job = MarketplaceImportJob( + status="pending", + marketplace="Amazon", + shop_code="TEST_SHOP", + user_id=test_user.id, + created_at=datetime.utcnow() + ) + db.add(job) + db.commit() + db.refresh(job) + return job diff --git a/tests/test_services.py b/tests/test_product_service.py similarity index 98% rename from tests/test_services.py rename to tests/test_product_service.py index d2624587..a25eb5ae 100644 --- a/tests/test_services.py +++ b/tests/test_product_service.py @@ -1,4 +1,4 @@ -# tests/test_services.py +# tests/test_product_service.py import pytest from app.services.product_service import ProductService from models.api_models import ProductCreate diff --git a/tests/test_stock_service.py b/tests/test_stock_service.py new file mode 100644 index 00000000..b9998199 --- /dev/null +++ b/tests/test_stock_service.py @@ -0,0 +1,322 @@ +# tests/test_stock_service.py +import pytest +from app.services.stock_service import StockService +from models.api_models import StockCreate, StockAdd, StockUpdate +from models.database_models import Stock, Product + + +class TestStockService: + def setup_method(self): + self.service = StockService() + + def test_normalize_gtin_valid(self): + """Test GTIN normalization with valid GTINs""" + # Test various valid GTIN formats + assert self.service.normalize_gtin("1234567890123") == "1234567890123" + assert self.service.normalize_gtin("123456789012") == "123456789012" + assert self.service.normalize_gtin("12345678") == "12345678" + + def test_normalize_gtin_invalid(self): + """Test GTIN normalization with invalid GTINs""" + assert self.service.normalize_gtin("invalid") is None + assert self.service.normalize_gtin("123") is None + assert self.service.normalize_gtin("") is None + assert self.service.normalize_gtin(None) is None + + def test_set_stock_new_entry(self, db): + """Test setting stock for a new GTIN/location combination""" + stock_data = StockCreate( + gtin="1234567890123", + location="WAREHOUSE_A", + quantity=100 + ) + + result = self.service.set_stock(db, stock_data) + + assert result.gtin == "1234567890123" + assert result.location == "WAREHOUSE_A" + assert result.quantity == 100 + + def test_set_stock_existing_entry(self, db, test_stock): + """Test setting stock for an existing GTIN/location combination""" + stock_data = StockCreate( + gtin=test_stock.gtin, + location=test_stock.location, + quantity=200 + ) + + result = self.service.set_stock(db, stock_data) + + assert result.gtin == test_stock.gtin + assert result.location == test_stock.location + assert result.quantity == 200 # Should replace the original quantity + + def test_set_stock_invalid_gtin(self, db): + """Test setting stock with invalid GTIN""" + stock_data = StockCreate( + gtin="invalid_gtin", + location="WAREHOUSE_A", + quantity=100 + ) + + with pytest.raises(ValueError, match="Invalid GTIN format"): + self.service.set_stock(db, stock_data) + + def test_add_stock_new_entry(self, db): + """Test adding stock for a new GTIN/location combination""" + stock_data = StockAdd( + gtin="1234567890123", + location="WAREHOUSE_B", + quantity=50 + ) + + result = self.service.add_stock(db, stock_data) + + assert result.gtin == "1234567890123" + assert result.location == "WAREHOUSE_B" + assert result.quantity == 50 + + def test_add_stock_existing_entry(self, db, test_stock): + """Test adding stock to an existing GTIN/location combination""" + original_quantity = test_stock.quantity + stock_data = StockAdd( + gtin=test_stock.gtin, + location=test_stock.location, + quantity=25 + ) + + result = self.service.add_stock(db, stock_data) + + assert result.gtin == test_stock.gtin + assert result.location == test_stock.location + assert result.quantity == original_quantity + 25 + + def test_add_stock_invalid_gtin(self, db): + """Test adding stock with invalid GTIN""" + stock_data = StockAdd( + gtin="invalid_gtin", + location="WAREHOUSE_A", + quantity=50 + ) + + with pytest.raises(ValueError, match="Invalid GTIN format"): + self.service.add_stock(db, stock_data) + + def test_remove_stock_success(self, db, test_stock): + """Test removing stock successfully""" + original_quantity = test_stock.quantity + remove_quantity = 10 + + stock_data = StockAdd( + gtin=test_stock.gtin, + location=test_stock.location, + quantity=remove_quantity + ) + + result = self.service.remove_stock(db, stock_data) + + assert result.gtin == test_stock.gtin + assert result.location == test_stock.location + assert result.quantity == original_quantity - remove_quantity + + def test_remove_stock_insufficient_stock(self, db, test_stock): + """Test removing more stock than available""" + stock_data = StockAdd( + gtin=test_stock.gtin, + location=test_stock.location, + quantity=test_stock.quantity + 10 # More than available + ) + + with pytest.raises(ValueError, match="Insufficient stock"): + self.service.remove_stock(db, stock_data) + + def test_remove_stock_nonexistent_entry(self, db): + """Test removing stock from non-existent GTIN/location""" + stock_data = StockAdd( + gtin="9999999999999", + location="NONEXISTENT", + quantity=10 + ) + + with pytest.raises(ValueError, match="No stock found"): + self.service.remove_stock(db, stock_data) + + def test_remove_stock_invalid_gtin(self, db): + """Test removing stock with invalid GTIN""" + stock_data = StockAdd( + gtin="invalid_gtin", + location="WAREHOUSE_A", + quantity=10 + ) + + with pytest.raises(ValueError, match="Invalid GTIN format"): + self.service.remove_stock(db, stock_data) + + def test_get_stock_by_gtin_success(self, db, test_stock, test_product): + """Test getting stock summary by GTIN""" + result = self.service.get_stock_by_gtin(db, test_stock.gtin) + + assert result.gtin == test_stock.gtin + assert result.total_quantity == test_stock.quantity + assert len(result.locations) == 1 + assert result.locations[0].location == test_stock.location + assert result.locations[0].quantity == test_stock.quantity + assert result.product_title == test_product.title + + def test_get_stock_by_gtin_multiple_locations(self, db): + """Test getting stock summary with multiple locations""" + gtin = "1234567890123" + + # Create multiple stock entries for the same GTIN + stock1 = Stock(gtin=gtin, location="WAREHOUSE_A", quantity=50) + stock2 = Stock(gtin=gtin, location="WAREHOUSE_B", quantity=30) + + db.add(stock1) + db.add(stock2) + db.commit() + + result = self.service.get_stock_by_gtin(db, gtin) + + assert result.gtin == gtin + assert result.total_quantity == 80 + assert len(result.locations) == 2 + + def test_get_stock_by_gtin_not_found(self, db): + """Test getting stock for non-existent GTIN""" + with pytest.raises(ValueError, match="No stock found"): + self.service.get_stock_by_gtin(db, "9999999999999") + + def test_get_stock_by_gtin_invalid_gtin(self, db): + """Test getting stock with invalid GTIN""" + with pytest.raises(ValueError, match="Invalid GTIN format"): + self.service.get_stock_by_gtin(db, "invalid_gtin") + + def test_get_total_stock_success(self, db, test_stock, test_product): + """Test getting total stock for a GTIN""" + result = self.service.get_total_stock(db, test_stock.gtin) + + assert result["gtin"] == test_stock.gtin + assert result["total_quantity"] == test_stock.quantity + assert result["product_title"] == test_product.title + assert result["locations_count"] == 1 + + def test_get_total_stock_invalid_gtin(self, db): + """Test getting total stock with invalid GTIN""" + with pytest.raises(ValueError, match="Invalid GTIN format"): + self.service.get_total_stock(db, "invalid_gtin") + + def test_get_all_stock_no_filters(self, db, test_stock): + """Test getting all stock without filters""" + result = self.service.get_all_stock(db) + + assert len(result) >= 1 + assert any(stock.gtin == test_stock.gtin for stock in result) + + def test_get_all_stock_with_location_filter(self, db, test_stock): + """Test getting all stock with location filter""" + result = self.service.get_all_stock(db, location=test_stock.location) + + assert len(result) >= 1 + assert all(stock.location.upper() == test_stock.location.upper() for stock in result) + + def test_get_all_stock_with_gtin_filter(self, db, test_stock): + """Test getting all stock with GTIN filter""" + result = self.service.get_all_stock(db, gtin=test_stock.gtin) + + assert len(result) >= 1 + assert all(stock.gtin == test_stock.gtin for stock in result) + + def test_get_all_stock_with_pagination(self, db): + """Test getting all stock with pagination""" + # Create multiple stock entries + for i in range(5): + stock = Stock( + gtin=f"123456789012{i}", + location=f"WAREHOUSE_{i}", + quantity=10 + ) + db.add(stock) + db.commit() + + result = self.service.get_all_stock(db, skip=2, limit=2) + + assert len(result) == 2 + + def test_update_stock_success(self, db, test_stock): + """Test updating stock quantity""" + stock_update = StockUpdate(quantity=150) + + result = self.service.update_stock(db, test_stock.id, stock_update) + + assert result.id == test_stock.id + assert result.quantity == 150 + + def test_update_stock_not_found(self, db): + """Test updating non-existent stock entry""" + stock_update = StockUpdate(quantity=150) + + with pytest.raises(ValueError, match="Stock entry not found"): + self.service.update_stock(db, 99999, stock_update) + + def test_delete_stock_success(self, db, test_stock): + """Test deleting stock entry""" + stock_id = test_stock.id + + result = self.service.delete_stock(db, stock_id) + + assert result is True + + # Verify the stock is actually deleted + deleted_stock = db.query(Stock).filter(Stock.id == stock_id).first() + assert deleted_stock is None + + def test_delete_stock_not_found(self, db): + """Test deleting non-existent stock entry""" + with pytest.raises(ValueError, match="Stock entry not found"): + self.service.delete_stock(db, 99999) + + def test_get_stock_by_id_success(self, db, test_stock): + """Test getting stock entry by ID""" + result = self.service.get_stock_by_id(db, test_stock.id) + + assert result is not None + assert result.id == test_stock.id + assert result.gtin == test_stock.gtin + + def test_get_stock_by_id_not_found(self, db): + """Test getting non-existent stock entry by ID""" + result = self.service.get_stock_by_id(db, 99999) + + assert result is None + + +# Additional fixtures that might be needed for stock tests +@pytest.fixture +def test_stock(db): + """Create a test stock entry""" + stock = Stock( + gtin="1234567890123", + location="WAREHOUSE_MAIN", + quantity=50 + ) + db.add(stock) + db.commit() + db.refresh(stock) + return stock + + +@pytest.fixture +def test_product_with_stock(db, test_stock): + """Create a test product that corresponds to the test stock""" + product = Product( + product_id="STOCK_TEST_001", + title="Stock Test Product", + gtin=test_stock.gtin, + price="29.99", + brand="TestBrand", + marketplace="Letzshop" + ) + db.add(product) + db.commit() + db.refresh(product) + return product