# Slice 3: Product Catalog Management ## Vendor Selects and Publishes Products **Status**: 📋 NOT STARTED **Timeline**: Week 3 (5 days) **Prerequisites**: Slice 1 & 2 complete ## 🎯 Slice Objectives Enable vendors to browse imported products, select which to publish, customize them, and manage their product catalog. ### User Stories - As a Vendor Owner, I can browse imported products from staging - As a Vendor Owner, I can select which products to publish to my catalog - As a Vendor Owner, I can customize product information (pricing, descriptions) - As a Vendor Owner, I can manage my published product catalog - As a Vendor Owner, I can manually add products (not from marketplace) - As a Vendor Owner, I can manage inventory for catalog products ### Success Criteria - [ ] Vendor can browse all imported products in staging - [ ] Vendor can filter/search staging products - [ ] Vendor can select products for publishing - [ ] Vendor can customize product details before/after publishing - [ ] Published products appear in vendor catalog - [ ] Vendor can manually create products - [ ] Vendor can update product inventory - [ ] Vendor can activate/deactivate products - [ ] Product operations are properly isolated by vendor ## 📋 Backend Implementation ### Database Models #### Product Model (`models/database/product.py`) ```python class Product(Base, TimestampMixin): """ Vendor's published product catalog These are customer-facing products """ __tablename__ = "products" id = Column(Integer, primary_key=True, index=True) vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False) # Basic information sku = Column(String, nullable=False, index=True) title = Column(String, nullable=False) description = Column(Text) short_description = Column(String(500)) # Pricing price = Column(Numeric(10, 2), nullable=False) compare_at_price = Column(Numeric(10, 2)) # Original price for discounts cost_per_item = Column(Numeric(10, 2)) # For profit tracking currency = Column(String(3), default="EUR") # Categorization category = Column(String) subcategory = Column(String) brand = Column(String) tags = Column(JSON, default=list) # SEO slug = Column(String, unique=True, index=True) meta_title = Column(String) meta_description = Column(String) # Images featured_image = Column(String) # Main product image image_urls = Column(JSON, default=list) # Additional images # Status is_active = Column(Boolean, default=True) is_featured = Column(Boolean, default=False) is_on_sale = Column(Boolean, default=False) # Inventory (simple - detailed in Inventory model) track_inventory = Column(Boolean, default=True) stock_quantity = Column(Integer, default=0) low_stock_threshold = Column(Integer, default=10) # Marketplace source (if imported) marketplace_product_id = Column( Integer, ForeignKey("marketplace_products.id"), nullable=True ) external_sku = Column(String, nullable=True) # Original marketplace SKU # Additional data attributes = Column(JSON, default=dict) # Custom attributes weight = Column(Numeric(10, 2)) # For shipping dimensions = Column(JSON) # {length, width, height} # Relationships vendor = relationship("Vendor", back_populates="products") marketplace_source = relationship( "MarketplaceProduct", back_populates="published_product" ) inventory_records = relationship("Inventory", back_populates="product") order_items = relationship("OrderItem", back_populates="product") # Indexes __table_args__ = ( Index('ix_product_vendor_sku', 'vendor_id', 'sku'), Index('ix_product_active', 'vendor_id', 'is_active'), Index('ix_product_featured', 'vendor_id', 'is_featured'), ) ``` #### Inventory Model (`models/database/inventory.py`) ```python class Inventory(Base, TimestampMixin): """Track product inventory by location""" __tablename__ = "inventory" id = Column(Integer, primary_key=True, index=True) vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False) product_id = Column(Integer, ForeignKey("products.id"), nullable=False) # Location location_name = Column(String, default="Default") # Warehouse name # Quantities available_quantity = Column(Integer, default=0) reserved_quantity = Column(Integer, default=0) # Pending orders # Relationships vendor = relationship("Vendor") product = relationship("Product", back_populates="inventory_records") movements = relationship("InventoryMovement", back_populates="inventory") class InventoryMovement(Base, TimestampMixin): """Track inventory changes""" __tablename__ = "inventory_movements" id = Column(Integer, primary_key=True, index=True) inventory_id = Column(Integer, ForeignKey("inventory.id"), nullable=False) # Movement details movement_type = Column(String) # 'received', 'sold', 'adjusted', 'returned' quantity_change = Column(Integer) # Positive or negative # Context reference_type = Column(String, nullable=True) # 'order', 'import', 'manual' reference_id = Column(Integer, nullable=True) notes = Column(Text) # Relationships inventory = relationship("Inventory", back_populates="movements") ``` ### Pydantic Schemas #### Product Schemas (`models/schema/product.py`) ```python class ProductCreate(BaseModel): """Create product from scratch""" sku: str title: str description: Optional[str] = None price: float = Field(..., gt=0) compare_at_price: Optional[float] = None cost_per_item: Optional[float] = None category: Optional[str] = None brand: Optional[str] = None tags: List[str] = [] image_urls: List[str] = [] track_inventory: bool = True stock_quantity: int = 0 is_active: bool = True class ProductPublishFromMarketplace(BaseModel): """Publish product from marketplace staging""" marketplace_product_id: int custom_title: Optional[str] = None custom_description: Optional[str] = None custom_price: Optional[float] = None custom_sku: Optional[str] = None stock_quantity: int = 0 is_active: bool = True class ProductUpdate(BaseModel): """Update existing product""" title: Optional[str] = None description: Optional[str] = None short_description: Optional[str] = None price: Optional[float] = None compare_at_price: Optional[float] = None category: Optional[str] = None brand: Optional[str] = None tags: Optional[List[str]] = None image_urls: Optional[List[str]] = None is_active: Optional[bool] = None is_featured: Optional[bool] = None stock_quantity: Optional[int] = None class ProductResponse(BaseModel): """Product details""" id: int vendor_id: int sku: str title: str description: Optional[str] price: float compare_at_price: Optional[float] category: Optional[str] brand: Optional[str] tags: List[str] featured_image: Optional[str] image_urls: List[str] is_active: bool is_featured: bool stock_quantity: int marketplace_product_id: Optional[int] created_at: datetime updated_at: datetime class Config: from_attributes = True ``` ### Service Layer #### Product Service (`app/services/product_service.py`) ```python class ProductService: """Handle product catalog operations""" async def publish_from_marketplace( self, vendor_id: int, publish_data: ProductPublishFromMarketplace, db: Session ) -> Product: """Publish marketplace product to catalog""" # Get marketplace product mp_product = db.query(MarketplaceProduct).filter( MarketplaceProduct.id == publish_data.marketplace_product_id, MarketplaceProduct.vendor_id == vendor_id, MarketplaceProduct.is_published == False ).first() if not mp_product: raise ProductNotFoundError("Marketplace product not found") # Check if SKU already exists sku = publish_data.custom_sku or mp_product.external_sku existing = db.query(Product).filter( Product.vendor_id == vendor_id, Product.sku == sku ).first() if existing: raise ProductAlreadyExistsError(f"Product with SKU {sku} already exists") # Create product product = Product( vendor_id=vendor_id, sku=sku, title=publish_data.custom_title or mp_product.title, description=publish_data.custom_description or mp_product.description, price=publish_data.custom_price or mp_product.price, currency=mp_product.currency, category=mp_product.category, brand=mp_product.brand, image_urls=mp_product.image_urls, featured_image=mp_product.image_urls[0] if mp_product.image_urls else None, marketplace_product_id=mp_product.id, external_sku=mp_product.external_sku, stock_quantity=publish_data.stock_quantity, is_active=publish_data.is_active, slug=self._generate_slug(publish_data.custom_title or mp_product.title) ) db.add(product) # Mark marketplace product as published mp_product.is_published = True mp_product.published_product_id = product.id # Create initial inventory record inventory = Inventory( vendor_id=vendor_id, product_id=product.id, location_name="Default", available_quantity=publish_data.stock_quantity, reserved_quantity=0 ) db.add(inventory) # Record inventory movement if publish_data.stock_quantity > 0: movement = InventoryMovement( inventory_id=inventory.id, movement_type="received", quantity_change=publish_data.stock_quantity, reference_type="import", notes="Initial stock from marketplace import" ) db.add(movement) db.commit() db.refresh(product) return product async def create_product( self, vendor_id: int, product_data: ProductCreate, db: Session ) -> Product: """Create product manually""" # Check SKU uniqueness existing = db.query(Product).filter( Product.vendor_id == vendor_id, Product.sku == product_data.sku ).first() if existing: raise ProductAlreadyExistsError(f"SKU {product_data.sku} already exists") product = Product( vendor_id=vendor_id, **product_data.dict(), slug=self._generate_slug(product_data.title), featured_image=product_data.image_urls[0] if product_data.image_urls else None ) db.add(product) # Create inventory if product_data.track_inventory: inventory = Inventory( vendor_id=vendor_id, product_id=product.id, available_quantity=product_data.stock_quantity ) db.add(inventory) db.commit() db.refresh(product) return product def get_products( self, vendor_id: int, db: Session, is_active: Optional[bool] = None, category: Optional[str] = None, search: Optional[str] = None, skip: int = 0, limit: int = 100 ) -> List[Product]: """Get vendor's product catalog""" query = db.query(Product).filter(Product.vendor_id == vendor_id) if is_active is not None: query = query.filter(Product.is_active == is_active) if category: query = query.filter(Product.category == category) if search: query = query.filter( or_( Product.title.ilike(f"%{search}%"), Product.sku.ilike(f"%{search}%"), Product.brand.ilike(f"%{search}%") ) ) return query.order_by( Product.created_at.desc() ).offset(skip).limit(limit).all() async def update_product( self, vendor_id: int, product_id: int, update_data: ProductUpdate, db: Session ) -> Product: """Update product details""" product = db.query(Product).filter( Product.id == product_id, Product.vendor_id == vendor_id ).first() if not product: raise ProductNotFoundError() # Update fields update_dict = update_data.dict(exclude_unset=True) for field, value in update_dict.items(): setattr(product, field, value) # Update stock if changed if 'stock_quantity' in update_dict: self._update_inventory(product, update_dict['stock_quantity'], db) db.commit() db.refresh(product) return product def _generate_slug(self, title: str) -> str: """Generate URL-friendly slug""" import re slug = title.lower() slug = re.sub(r'[^a-z0-9]+', '-', slug) slug = slug.strip('-') return slug def _update_inventory( self, product: Product, new_quantity: int, db: Session ): """Update product inventory""" inventory = db.query(Inventory).filter( Inventory.product_id == product.id ).first() if inventory: quantity_change = new_quantity - inventory.available_quantity inventory.available_quantity = new_quantity # Record movement movement = InventoryMovement( inventory_id=inventory.id, movement_type="adjusted", quantity_change=quantity_change, reference_type="manual", notes="Manual adjustment" ) db.add(movement) ``` ### API Endpoints #### Product Endpoints (`app/api/v1/vendor/products.py`) ```python @router.get("", response_model=List[ProductResponse]) async def get_products( is_active: Optional[bool] = None, category: Optional[str] = None, search: Optional[str] = None, skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_vendor_user), vendor: Vendor = Depends(get_current_vendor), db: Session = Depends(get_db) ): """Get vendor's product catalog""" service = ProductService() products = service.get_products( vendor.id, db, is_active, category, search, skip, limit ) return products @router.post("", response_model=ProductResponse) async def create_product( product_data: ProductCreate, current_user: User = Depends(get_current_vendor_user), vendor: Vendor = Depends(get_current_vendor), db: Session = Depends(get_db) ): """Create product manually""" service = ProductService() product = await service.create_product(vendor.id, product_data, db) return product @router.post("/from-marketplace", response_model=ProductResponse) async def publish_from_marketplace( publish_data: ProductPublishFromMarketplace, current_user: User = Depends(get_current_vendor_user), vendor: Vendor = Depends(get_current_vendor), db: Session = Depends(get_db) ): """Publish marketplace product to catalog""" service = ProductService() product = await service.publish_from_marketplace( vendor.id, publish_data, db ) return product @router.get("/{product_id}", response_model=ProductResponse) async def get_product( product_id: int, current_user: User = Depends(get_current_vendor_user), vendor: Vendor = Depends(get_current_vendor), db: Session = Depends(get_db) ): """Get product details""" product = db.query(Product).filter( Product.id == product_id, Product.vendor_id == vendor.id ).first() if not product: raise HTTPException(status_code=404, detail="Product not found") return product @router.put("/{product_id}", response_model=ProductResponse) async def update_product( product_id: int, update_data: ProductUpdate, current_user: User = Depends(get_current_vendor_user), vendor: Vendor = Depends(get_current_vendor), db: Session = Depends(get_db) ): """Update product""" service = ProductService() product = await service.update_product(vendor.id, product_id, update_data, db) return product @router.put("/{product_id}/toggle-active") async def toggle_product_active( product_id: int, current_user: User = Depends(get_current_vendor_user), vendor: Vendor = Depends(get_current_vendor), db: Session = Depends(get_db) ): """Activate/deactivate product""" product = db.query(Product).filter( Product.id == product_id, Product.vendor_id == vendor.id ).first() if not product: raise HTTPException(status_code=404, detail="Product not found") product.is_active = not product.is_active db.commit() return {"is_active": product.is_active} @router.delete("/{product_id}") async def delete_product( product_id: int, current_user: User = Depends(get_current_vendor_user), vendor: Vendor = Depends(get_current_vendor), db: Session = Depends(get_db) ): """Remove product from catalog""" product = db.query(Product).filter( Product.id == product_id, Product.vendor_id == vendor.id ).first() if not product: raise HTTPException(status_code=404, detail="Product not found") # Mark as inactive instead of deleting product.is_active = False db.commit() return {"success": True} ``` ## 🎨 Frontend Implementation ### Templates #### Browse Marketplace Products (`templates/vendor/marketplace/browse.html`) Uses Alpine.js for reactive filtering, selection, and bulk publishing. #### Product Catalog (`templates/vendor/products/list.html`) Product management interface with search, filters, and quick actions. #### Product Edit (`templates/vendor/products/edit.html`) Detailed product editing with image management and inventory tracking. ## ✅ Testing Checklist ### Backend Tests - [ ] Product publishing from marketplace works - [ ] Manual product creation works - [ ] Product updates work correctly - [ ] Inventory tracking is accurate - [ ] SKU uniqueness is enforced - [ ] Vendor isolation maintained - [ ] Product search/filtering works - [ ] Slug generation works correctly ### Frontend Tests - [ ] Browse marketplace products - [ ] Select multiple products for publishing - [ ] Publish single product with customization - [ ] View product catalog - [ ] Edit product details - [ ] Toggle product active status - [ ] Delete/deactivate products - [ ] Search and filter products ## ➡️ Next Steps After completing Slice 3, move to **Slice 4: Customer Shopping Experience** to build the public-facing shop. --- **Slice 3 Status**: 📋 Not Started **Dependencies**: Slices 1 & 2 must be complete **Estimated Duration**: 5 days