Files
orion/docs/project-roadmap/slice3_doc.md

19 KiB

Slice 3: Product Catalog Management

Vendor Selects and Publishes Products

Status: 📋 NOT STARTED
Timeline: Week 3 (5 days)
Prerequisites: Slice 1 & 2 complete

🎯 Slice Objectives

Enable vendors to browse imported products, select which to publish, customize them, and manage their product catalog.

User Stories

  • As a Vendor Owner, I can browse imported products from staging
  • As a Vendor Owner, I can select which products to publish to my catalog
  • As a Vendor Owner, I can customize product information (pricing, descriptions)
  • As a Vendor Owner, I can manage my published product catalog
  • As a Vendor Owner, I can manually add products (not from marketplace)
  • As a Vendor Owner, I can manage inventory for catalog products

Success Criteria

  • Vendor can browse all imported products in staging
  • Vendor can filter/search staging products
  • Vendor can select products for publishing
  • Vendor can customize product details before/after publishing
  • Published products appear in vendor catalog
  • Vendor can manually create products
  • Vendor can update product inventory
  • Vendor can activate/deactivate products
  • Product operations are properly isolated by vendor

📋 Backend Implementation

Database Models

Product Model (models/database/product.py)

class Product(Base, TimestampMixin):
    """
    Vendor's published product catalog
    These are customer-facing products
    """
    __tablename__ = "products"
    
    id = Column(Integer, primary_key=True, index=True)
    vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
    
    # Basic information
    sku = Column(String, nullable=False, index=True)
    title = Column(String, nullable=False)
    description = Column(Text)
    short_description = Column(String(500))
    
    # Pricing
    price = Column(Numeric(10, 2), nullable=False)
    compare_at_price = Column(Numeric(10, 2))  # Original price for discounts
    cost_per_item = Column(Numeric(10, 2))  # For profit tracking
    currency = Column(String(3), default="EUR")
    
    # Categorization
    category = Column(String)
    subcategory = Column(String)
    brand = Column(String)
    tags = Column(JSON, default=list)
    
    # SEO
    slug = Column(String, unique=True, index=True)
    meta_title = Column(String)
    meta_description = Column(String)
    
    # Images
    featured_image = Column(String)  # Main product image
    image_urls = Column(JSON, default=list)  # Additional images
    
    # Status
    is_active = Column(Boolean, default=True)
    is_featured = Column(Boolean, default=False)
    is_on_sale = Column(Boolean, default=False)
    
    # Inventory (simple - detailed in Inventory model)
    track_inventory = Column(Boolean, default=True)
    stock_quantity = Column(Integer, default=0)
    low_stock_threshold = Column(Integer, default=10)
    
    # Marketplace source (if imported)
    marketplace_product_id = Column(
        Integer, 
        ForeignKey("marketplace_products.id"), 
        nullable=True
    )
    external_sku = Column(String, nullable=True)  # Original marketplace SKU
    
    # Additional data
    attributes = Column(JSON, default=dict)  # Custom attributes
    weight = Column(Numeric(10, 2))  # For shipping
    dimensions = Column(JSON)  # {length, width, height}
    
    # Relationships
    vendor = relationship("Vendor", back_populates="products")
    marketplace_source = relationship(
        "MarketplaceProduct", 
        back_populates="published_product"
    )
    inventory_records = relationship("Inventory", back_populates="product")
    order_items = relationship("OrderItem", back_populates="product")
    
    # Indexes
    __table_args__ = (
        Index('ix_product_vendor_sku', 'vendor_id', 'sku'),
        Index('ix_product_active', 'vendor_id', 'is_active'),
        Index('ix_product_featured', 'vendor_id', 'is_featured'),
    )

Inventory Model (models/database/inventory.py)

class Inventory(Base, TimestampMixin):
    """Track product inventory by location"""
    __tablename__ = "inventory"
    
    id = Column(Integer, primary_key=True, index=True)
    vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
    
    # Location
    location_name = Column(String, default="Default")  # Warehouse name
    
    # Quantities
    available_quantity = Column(Integer, default=0)
    reserved_quantity = Column(Integer, default=0)  # Pending orders
    
    # Relationships
    vendor = relationship("Vendor")
    product = relationship("Product", back_populates="inventory_records")
    movements = relationship("InventoryMovement", back_populates="inventory")

class InventoryMovement(Base, TimestampMixin):
    """Track inventory changes"""
    __tablename__ = "inventory_movements"
    
    id = Column(Integer, primary_key=True, index=True)
    inventory_id = Column(Integer, ForeignKey("inventory.id"), nullable=False)
    
    # Movement details
    movement_type = Column(String)  # 'received', 'sold', 'adjusted', 'returned'
    quantity_change = Column(Integer)  # Positive or negative
    
    # Context
    reference_type = Column(String, nullable=True)  # 'order', 'import', 'manual'
    reference_id = Column(Integer, nullable=True)
    notes = Column(Text)
    
    # Relationships
    inventory = relationship("Inventory", back_populates="movements")

Pydantic Schemas

Product Schemas (models/schema/product.py)

class ProductCreate(BaseModel):
    """Create product from scratch"""
    sku: str
    title: str
    description: Optional[str] = None
    price: float = Field(..., gt=0)
    compare_at_price: Optional[float] = None
    cost_per_item: Optional[float] = None
    category: Optional[str] = None
    brand: Optional[str] = None
    tags: List[str] = []
    image_urls: List[str] = []
    track_inventory: bool = True
    stock_quantity: int = 0
    is_active: bool = True

class ProductPublishFromMarketplace(BaseModel):
    """Publish product from marketplace staging"""
    marketplace_product_id: int
    custom_title: Optional[str] = None
    custom_description: Optional[str] = None
    custom_price: Optional[float] = None
    custom_sku: Optional[str] = None
    stock_quantity: int = 0
    is_active: bool = True

class ProductUpdate(BaseModel):
    """Update existing product"""
    title: Optional[str] = None
    description: Optional[str] = None
    short_description: Optional[str] = None
    price: Optional[float] = None
    compare_at_price: Optional[float] = None
    category: Optional[str] = None
    brand: Optional[str] = None
    tags: Optional[List[str]] = None
    image_urls: Optional[List[str]] = None
    is_active: Optional[bool] = None
    is_featured: Optional[bool] = None
    stock_quantity: Optional[int] = None

class ProductResponse(BaseModel):
    """Product details"""
    id: int
    vendor_id: int
    sku: str
    title: str
    description: Optional[str]
    price: float
    compare_at_price: Optional[float]
    category: Optional[str]
    brand: Optional[str]
    tags: List[str]
    featured_image: Optional[str]
    image_urls: List[str]
    is_active: bool
    is_featured: bool
    stock_quantity: int
    marketplace_product_id: Optional[int]
    created_at: datetime
    updated_at: datetime
    
    class Config:
        from_attributes = True

Service Layer

Product Service (app/services/product_service.py)

class ProductService:
    """Handle product catalog operations"""
    
    async def publish_from_marketplace(
        self,
        vendor_id: int,
        publish_data: ProductPublishFromMarketplace,
        db: Session
    ) -> Product:
        """Publish marketplace product to catalog"""
        
        # Get marketplace product
        mp_product = db.query(MarketplaceProduct).filter(
            MarketplaceProduct.id == publish_data.marketplace_product_id,
            MarketplaceProduct.vendor_id == vendor_id,
            MarketplaceProduct.is_published == False
        ).first()
        
        if not mp_product:
            raise ProductNotFoundError("Marketplace product not found")
        
        # Check if SKU already exists
        sku = publish_data.custom_sku or mp_product.external_sku
        existing = db.query(Product).filter(
            Product.vendor_id == vendor_id,
            Product.sku == sku
        ).first()
        
        if existing:
            raise ProductAlreadyExistsError(f"Product with SKU {sku} already exists")
        
        # Create product
        product = Product(
            vendor_id=vendor_id,
            sku=sku,
            title=publish_data.custom_title or mp_product.title,
            description=publish_data.custom_description or mp_product.description,
            price=publish_data.custom_price or mp_product.price,
            currency=mp_product.currency,
            category=mp_product.category,
            brand=mp_product.brand,
            image_urls=mp_product.image_urls,
            featured_image=mp_product.image_urls[0] if mp_product.image_urls else None,
            marketplace_product_id=mp_product.id,
            external_sku=mp_product.external_sku,
            stock_quantity=publish_data.stock_quantity,
            is_active=publish_data.is_active,
            slug=self._generate_slug(publish_data.custom_title or mp_product.title)
        )
        
        db.add(product)
        
        # Mark marketplace product as published
        mp_product.is_published = True
        mp_product.published_product_id = product.id
        
        # Create initial inventory record
        inventory = Inventory(
            vendor_id=vendor_id,
            product_id=product.id,
            location_name="Default",
            available_quantity=publish_data.stock_quantity,
            reserved_quantity=0
        )
        db.add(inventory)
        
        # Record inventory movement
        if publish_data.stock_quantity > 0:
            movement = InventoryMovement(
                inventory_id=inventory.id,
                movement_type="received",
                quantity_change=publish_data.stock_quantity,
                reference_type="import",
                notes="Initial stock from marketplace import"
            )
            db.add(movement)
        
        db.commit()
        db.refresh(product)
        
        return product
    
    async def create_product(
        self,
        vendor_id: int,
        product_data: ProductCreate,
        db: Session
    ) -> Product:
        """Create product manually"""
        
        # Check SKU uniqueness
        existing = db.query(Product).filter(
            Product.vendor_id == vendor_id,
            Product.sku == product_data.sku
        ).first()
        
        if existing:
            raise ProductAlreadyExistsError(f"SKU {product_data.sku} already exists")
        
        product = Product(
            vendor_id=vendor_id,
            **product_data.dict(),
            slug=self._generate_slug(product_data.title),
            featured_image=product_data.image_urls[0] if product_data.image_urls else None
        )
        
        db.add(product)
        
        # Create inventory
        if product_data.track_inventory:
            inventory = Inventory(
                vendor_id=vendor_id,
                product_id=product.id,
                available_quantity=product_data.stock_quantity
            )
            db.add(inventory)
        
        db.commit()
        db.refresh(product)
        
        return product
    
    def get_products(
        self,
        vendor_id: int,
        db: Session,
        is_active: Optional[bool] = None,
        category: Optional[str] = None,
        search: Optional[str] = None,
        skip: int = 0,
        limit: int = 100
    ) -> List[Product]:
        """Get vendor's product catalog"""
        
        query = db.query(Product).filter(Product.vendor_id == vendor_id)
        
        if is_active is not None:
            query = query.filter(Product.is_active == is_active)
        
        if category:
            query = query.filter(Product.category == category)
        
        if search:
            query = query.filter(
                or_(
                    Product.title.ilike(f"%{search}%"),
                    Product.sku.ilike(f"%{search}%"),
                    Product.brand.ilike(f"%{search}%")
                )
            )
        
        return query.order_by(
            Product.created_at.desc()
        ).offset(skip).limit(limit).all()
    
    async def update_product(
        self,
        vendor_id: int,
        product_id: int,
        update_data: ProductUpdate,
        db: Session
    ) -> Product:
        """Update product details"""
        
        product = db.query(Product).filter(
            Product.id == product_id,
            Product.vendor_id == vendor_id
        ).first()
        
        if not product:
            raise ProductNotFoundError()
        
        # Update fields
        update_dict = update_data.dict(exclude_unset=True)
        for field, value in update_dict.items():
            setattr(product, field, value)
        
        # Update stock if changed
        if 'stock_quantity' in update_dict:
            self._update_inventory(product, update_dict['stock_quantity'], db)
        
        db.commit()
        db.refresh(product)
        
        return product
    
    def _generate_slug(self, title: str) -> str:
        """Generate URL-friendly slug"""
        import re
        slug = title.lower()
        slug = re.sub(r'[^a-z0-9]+', '-', slug)
        slug = slug.strip('-')
        return slug
    
    def _update_inventory(
        self,
        product: Product,
        new_quantity: int,
        db: Session
    ):
        """Update product inventory"""
        inventory = db.query(Inventory).filter(
            Inventory.product_id == product.id
        ).first()
        
        if inventory:
            quantity_change = new_quantity - inventory.available_quantity
            inventory.available_quantity = new_quantity
            
            # Record movement
            movement = InventoryMovement(
                inventory_id=inventory.id,
                movement_type="adjusted",
                quantity_change=quantity_change,
                reference_type="manual",
                notes="Manual adjustment"
            )
            db.add(movement)

API Endpoints

Product Endpoints (app/api/v1/vendor/products.py)

@router.get("", response_model=List[ProductResponse])
async def get_products(
    is_active: Optional[bool] = None,
    category: Optional[str] = None,
    search: Optional[str] = None,
    skip: int = 0,
    limit: int = 100,
    current_user: User = Depends(get_current_vendor_user),
    vendor: Vendor = Depends(get_current_vendor),
    db: Session = Depends(get_db)
):
    """Get vendor's product catalog"""
    service = ProductService()
    products = service.get_products(
        vendor.id, db, is_active, category, search, skip, limit
    )
    return products

@router.post("", response_model=ProductResponse)
async def create_product(
    product_data: ProductCreate,
    current_user: User = Depends(get_current_vendor_user),
    vendor: Vendor = Depends(get_current_vendor),
    db: Session = Depends(get_db)
):
    """Create product manually"""
    service = ProductService()
    product = await service.create_product(vendor.id, product_data, db)
    return product

@router.post("/from-marketplace", response_model=ProductResponse)
async def publish_from_marketplace(
    publish_data: ProductPublishFromMarketplace,
    current_user: User = Depends(get_current_vendor_user),
    vendor: Vendor = Depends(get_current_vendor),
    db: Session = Depends(get_db)
):
    """Publish marketplace product to catalog"""
    service = ProductService()
    product = await service.publish_from_marketplace(
        vendor.id, publish_data, db
    )
    return product

@router.get("/{product_id}", response_model=ProductResponse)
async def get_product(
    product_id: int,
    current_user: User = Depends(get_current_vendor_user),
    vendor: Vendor = Depends(get_current_vendor),
    db: Session = Depends(get_db)
):
    """Get product details"""
    product = db.query(Product).filter(
        Product.id == product_id,
        Product.vendor_id == vendor.id
    ).first()
    
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    
    return product

@router.put("/{product_id}", response_model=ProductResponse)
async def update_product(
    product_id: int,
    update_data: ProductUpdate,
    current_user: User = Depends(get_current_vendor_user),
    vendor: Vendor = Depends(get_current_vendor),
    db: Session = Depends(get_db)
):
    """Update product"""
    service = ProductService()
    product = await service.update_product(vendor.id, product_id, update_data, db)
    return product

@router.put("/{product_id}/toggle-active")
async def toggle_product_active(
    product_id: int,
    current_user: User = Depends(get_current_vendor_user),
    vendor: Vendor = Depends(get_current_vendor),
    db: Session = Depends(get_db)
):
    """Activate/deactivate product"""
    product = db.query(Product).filter(
        Product.id == product_id,
        Product.vendor_id == vendor.id
    ).first()
    
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    
    product.is_active = not product.is_active
    db.commit()
    
    return {"is_active": product.is_active}

@router.delete("/{product_id}")
async def delete_product(
    product_id: int,
    current_user: User = Depends(get_current_vendor_user),
    vendor: Vendor = Depends(get_current_vendor),
    db: Session = Depends(get_db)
):
    """Remove product from catalog"""
    product = db.query(Product).filter(
        Product.id == product_id,
        Product.vendor_id == vendor.id
    ).first()
    
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    
    # Mark as inactive instead of deleting
    product.is_active = False
    db.commit()
    
    return {"success": True}

🎨 Frontend Implementation

Templates

Browse Marketplace Products (templates/vendor/marketplace/browse.html)

Uses Alpine.js for reactive filtering, selection, and bulk publishing.

Product Catalog (templates/vendor/products/list.html)

Product management interface with search, filters, and quick actions.

Product Edit (templates/vendor/products/edit.html)

Detailed product editing with image management and inventory tracking.

Testing Checklist

Backend Tests

  • Product publishing from marketplace works
  • Manual product creation works
  • Product updates work correctly
  • Inventory tracking is accurate
  • SKU uniqueness is enforced
  • Vendor isolation maintained
  • Product search/filtering works
  • Slug generation works correctly

Frontend Tests

  • Browse marketplace products
  • Select multiple products for publishing
  • Publish single product with customization
  • View product catalog
  • Edit product details
  • Toggle product active status
  • Delete/deactivate products
  • Search and filter products

➡️ Next Steps

After completing Slice 3, move to Slice 4: Customer Shopping Experience to build the public-facing shop.


Slice 3 Status: 📋 Not Started
Dependencies: Slices 1 & 2 must be complete
Estimated Duration: 5 days