Files
orion/app/services/vendor_product_service.py
Samir Boulahtit fa2a3bf89a feat: make Product fully independent from MarketplaceProduct
- Add is_digital and product_type columns to Product model
- Remove is_digital/product_type properties that derived from MarketplaceProduct
- Update Create form with translation tabs, GTIN type, sale price, VAT rate, image
- Update Edit form to allow editing is_digital (remove disabled state)
- Add Availability field to Edit form
- Fix Detail page for directly created products (no marketplace source)
- Update vendor_product_service to handle new fields in create/update
- Add VendorProductCreate/Update schema fields for translations and is_digital
- Add unit tests for is_digital column and direct product creation
- Add integration tests for create/update API with new fields
- Create product-architecture.md documenting the independent copy pattern
- Add migration y3d4e5f6g7h8 for is_digital and product_type columns

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 01:11:00 +01:00

465 lines
16 KiB
Python

# app/services/vendor_product_service.py
"""
Vendor product service for managing vendor-specific product catalogs.
This module provides:
- Vendor product catalog browsing
- Product search and filtering
- Product statistics
- Product removal from catalogs
"""
import logging
from sqlalchemy import func
from sqlalchemy.orm import Session, joinedload
from app.exceptions import ProductNotFoundException
from models.database.product import Product
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
class VendorProductService:
"""Service for vendor product catalog operations."""
def get_products(
self,
db: Session,
skip: int = 0,
limit: int = 50,
search: str | None = None,
vendor_id: int | None = None,
is_active: bool | None = None,
is_featured: bool | None = None,
language: str = "en",
) -> tuple[list[dict], int]:
"""
Get vendor products with search and filtering.
Returns:
Tuple of (products list as dicts, total count)
"""
query = (
db.query(Product)
.join(Vendor, Product.vendor_id == Vendor.id)
.options(
joinedload(Product.vendor),
joinedload(Product.marketplace_product),
joinedload(Product.translations),
)
)
if search:
search_term = f"%{search}%"
query = query.filter(Product.vendor_sku.ilike(search_term))
if vendor_id:
query = query.filter(Product.vendor_id == vendor_id)
if is_active is not None:
query = query.filter(Product.is_active == is_active)
if is_featured is not None:
query = query.filter(Product.is_featured == is_featured)
total = query.count()
products = (
query.order_by(Product.updated_at.desc()).offset(skip).limit(limit).all()
)
result = []
for product in products:
result.append(self._build_product_list_item(product, language))
return result, total
def get_product_stats(self, db: Session, vendor_id: int | None = None) -> dict:
"""Get vendor product statistics for admin dashboard.
Args:
db: Database session
vendor_id: Optional vendor ID to filter stats
Returns:
Dict with product counts (total, active, inactive, etc.)
"""
# Base query filter
base_filter = Product.vendor_id == vendor_id if vendor_id else True
total = db.query(func.count(Product.id)).filter(base_filter).scalar() or 0
active = (
db.query(func.count(Product.id))
.filter(base_filter)
.filter(Product.is_active == True) # noqa: E712
.scalar()
or 0
)
inactive = total - active
featured = (
db.query(func.count(Product.id))
.filter(base_filter)
.filter(Product.is_featured == True) # noqa: E712
.scalar()
or 0
)
# Digital/physical counts
digital = (
db.query(func.count(Product.id))
.filter(base_filter)
.join(Product.marketplace_product)
.filter(Product.marketplace_product.has(is_digital=True))
.scalar()
or 0
)
physical = total - digital
# Count by vendor (only when not filtered by vendor_id)
by_vendor = {}
if not vendor_id:
vendor_counts = (
db.query(
Vendor.name,
func.count(Product.id),
)
.join(Vendor, Product.vendor_id == Vendor.id)
.group_by(Vendor.name)
.all()
)
by_vendor = {name or "unknown": count for name, count in vendor_counts}
return {
"total": total,
"active": active,
"inactive": inactive,
"featured": featured,
"digital": digital,
"physical": physical,
"by_vendor": by_vendor,
}
def get_catalog_vendors(self, db: Session) -> list[dict]:
"""Get list of vendors with products in their catalogs."""
vendors = (
db.query(Vendor.id, Vendor.name, Vendor.vendor_code)
.join(Product, Vendor.id == Product.vendor_id)
.distinct()
.all()
)
return [
{"id": v.id, "name": v.name, "vendor_code": v.vendor_code} for v in vendors
]
def get_product_detail(self, db: Session, product_id: int) -> dict:
"""Get detailed vendor product information including override info."""
product = (
db.query(Product)
.options(
joinedload(Product.vendor),
joinedload(Product.marketplace_product),
joinedload(Product.translations),
)
.filter(Product.id == product_id)
.first()
)
if not product:
raise ProductNotFoundException(product_id)
mp = product.marketplace_product
source_comparison_info = product.get_source_comparison_info()
# Get marketplace product translations (for "view original source")
mp_translations = {}
if mp:
for t in mp.translations:
mp_translations[t.language] = {
"title": t.title,
"description": t.description,
"short_description": t.short_description,
}
# Get vendor translations
vendor_translations = {}
for t in product.translations:
vendor_translations[t.language] = {
"title": t.title,
"description": t.description,
}
return {
"id": product.id,
"vendor_id": product.vendor_id,
"vendor_name": product.vendor.name if product.vendor else None,
"vendor_code": product.vendor.vendor_code if product.vendor else None,
"marketplace_product_id": product.marketplace_product_id,
"vendor_sku": product.vendor_sku,
# Product identifiers
"gtin": product.gtin,
"gtin_type": product.gtin_type or "ean13",
# Product fields with source comparison info
**source_comparison_info,
# Vendor-specific fields
"is_featured": product.is_featured,
"is_active": product.is_active,
"display_order": product.display_order,
"min_quantity": product.min_quantity,
"max_quantity": product.max_quantity,
# Supplier tracking
"supplier": product.supplier,
"supplier_product_id": product.supplier_product_id,
"cost": product.cost,
"margin_percent": product.margin_percent,
# Tax/profit info
"tax_rate_percent": product.tax_rate_percent,
"net_price": product.net_price,
"vat_amount": product.vat_amount,
"profit": product.profit,
"profit_margin_percent": product.profit_margin_percent,
# Digital fulfillment
"download_url": product.download_url,
"license_type": product.license_type,
"fulfillment_email_template": product.fulfillment_email_template,
# Source info from marketplace product
"source_marketplace": mp.marketplace if mp else None,
"source_vendor": mp.vendor_name if mp else None,
"source_gtin": mp.gtin if mp else None,
"source_sku": mp.sku if mp else None,
# Translations
"marketplace_translations": mp_translations,
"vendor_translations": vendor_translations,
# Timestamps
"created_at": product.created_at.isoformat()
if product.created_at
else None,
"updated_at": product.updated_at.isoformat()
if product.updated_at
else None,
}
def create_product(self, db: Session, data: dict) -> Product:
"""Create a new vendor product.
Args:
db: Database session
data: Product data dict (includes translations dict for multiple languages)
Returns:
Created Product instance
"""
from models.database.product_translation import ProductTranslation
# Determine product_type from is_digital flag
is_digital = data.get("is_digital", False)
product_type = "digital" if is_digital else data.get("product_type", "physical")
product = Product(
vendor_id=data["vendor_id"],
vendor_sku=data.get("vendor_sku"),
brand=data.get("brand"),
gtin=data.get("gtin"),
gtin_type=data.get("gtin_type"),
currency=data.get("currency", "EUR"),
tax_rate_percent=data.get("tax_rate_percent", 17),
availability=data.get("availability"),
primary_image_url=data.get("primary_image_url"),
is_active=data.get("is_active", True),
is_featured=data.get("is_featured", False),
is_digital=is_digital,
product_type=product_type,
)
# Handle price fields via setters (convert to cents)
if data.get("price") is not None:
product.price = data["price"]
if data.get("sale_price") is not None:
product.sale_price = data["sale_price"]
db.add(product)
db.flush() # Get the product ID
# Handle translations dict (new format with multiple languages)
translations = data.get("translations")
if translations:
for lang, trans_data in translations.items():
if trans_data and (trans_data.get("title") or trans_data.get("description")):
translation = ProductTranslation(
product_id=product.id,
language=lang,
title=trans_data.get("title"),
description=trans_data.get("description"),
)
db.add(translation)
else:
# Fallback for old format with single title/description
title = data.get("title")
description = data.get("description")
if title or description:
translation = ProductTranslation(
product_id=product.id,
language="en",
title=title,
description=description,
)
db.add(translation)
db.flush()
logger.info(f"Created vendor product {product.id} for vendor {data['vendor_id']}")
return product
def update_product(self, db: Session, product_id: int, data: dict) -> Product:
"""Update a vendor product.
Args:
db: Database session
product_id: Product ID to update
data: Fields to update (may include translations dict)
Returns:
Updated Product instance
"""
from models.database.product_translation import ProductTranslation
product = (
db.query(Product)
.options(joinedload(Product.translations))
.filter(Product.id == product_id)
.first()
)
if not product:
raise ProductNotFoundException(product_id)
# Handle translations separately
if "translations" in data and data["translations"]:
existing_translations = {t.language: t for t in product.translations}
for lang, trans_data in data["translations"].items():
if lang in existing_translations:
# Update existing translation
if "title" in trans_data:
existing_translations[lang].title = trans_data["title"]
if "description" in trans_data:
existing_translations[lang].description = trans_data["description"]
else:
# Create new translation
new_trans = ProductTranslation(
product_id=product_id,
language=lang,
title=trans_data.get("title"),
description=trans_data.get("description"),
)
db.add(new_trans)
# Handle price (convert to cents)
if "price" in data and data["price"] is not None:
product.price = data["price"] # Uses property setter to convert to cents
if "sale_price" in data:
product.sale_price = data["sale_price"] # Uses property setter
if "cost" in data:
product.cost = data["cost"] # Uses property setter
# Update other allowed fields
updatable_fields = [
"vendor_sku",
"brand",
"gtin",
"gtin_type",
"currency",
"tax_rate_percent",
"availability",
"is_digital",
"is_active",
"is_featured",
"primary_image_url",
"supplier",
]
for field in updatable_fields:
if field in data:
setattr(product, field, data[field])
db.flush()
logger.info(f"Updated vendor product {product_id}")
return product
def remove_product(self, db: Session, product_id: int) -> dict:
"""Remove a product from vendor catalog."""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise ProductNotFoundException(product_id)
vendor_name = product.vendor.name if product.vendor else "Unknown"
db.delete(product)
db.flush()
logger.info(f"Removed product {product_id} from vendor {vendor_name} catalog")
return {"message": f"Product removed from {vendor_name}'s catalog"}
def _build_product_list_item(self, product: Product, language: str) -> dict:
"""Build a product list item dict."""
mp = product.marketplace_product
# Get title: prefer vendor translations, fallback to marketplace translations
title = None
# First try vendor's own translations
if product.translations:
for trans in product.translations:
if trans.language == language and trans.title:
title = trans.title
break
# Fallback to English if requested language not found
if not title:
for trans in product.translations:
if trans.language == "en" and trans.title:
title = trans.title
break
# Fallback to marketplace translations
if not title and mp:
title = mp.get_title(language)
return {
"id": product.id,
"vendor_id": product.vendor_id,
"vendor_name": product.vendor.name if product.vendor else None,
"vendor_code": product.vendor.vendor_code if product.vendor else None,
"marketplace_product_id": product.marketplace_product_id,
"vendor_sku": product.vendor_sku,
"title": title,
"brand": product.brand,
"price": product.price,
"currency": product.currency,
# Effective price/currency for UI (same as price/currency for now)
"effective_price": product.price,
"effective_currency": product.currency,
"is_active": product.is_active,
"is_featured": product.is_featured,
"is_digital": product.is_digital,
"image_url": product.primary_image_url,
"source_marketplace": mp.marketplace if mp else None,
"source_vendor": mp.vendor_name if mp else None,
"created_at": product.created_at.isoformat()
if product.created_at
else None,
"updated_at": product.updated_at.isoformat()
if product.updated_at
else None,
}
# Create service instance
vendor_product_service = VendorProductService()