feat: implement product search, media library, and vendor customers
- Add full-text product search in ProductService.search_products() searching titles, descriptions, SKUs, brands, and GTINs - Implement complete vendor media library with file uploads, thumbnails, folders, and product associations - Implement vendor customers API with listing, details, orders, statistics, and status management - Add shop search results UI with pagination and add-to-cart - Add vendor media library UI with drag-drop upload and grid view - Add database migration for media_files and product_media tables - Update TODO file with current launch status (~95% complete) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -261,6 +261,126 @@ class CustomerService:
|
||||
.first()
|
||||
)
|
||||
|
||||
def get_vendor_customers(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
search: str | None = None,
|
||||
is_active: bool | None = None,
|
||||
) -> tuple[list[Customer], int]:
|
||||
"""
|
||||
Get all customers for a vendor with filtering and pagination.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
skip: Pagination offset
|
||||
limit: Pagination limit
|
||||
search: Search in name/email
|
||||
is_active: Filter by active status
|
||||
|
||||
Returns:
|
||||
Tuple of (customers, total_count)
|
||||
"""
|
||||
from sqlalchemy import or_
|
||||
|
||||
query = db.query(Customer).filter(Customer.vendor_id == vendor_id)
|
||||
|
||||
if search:
|
||||
search_pattern = f"%{search}%"
|
||||
query = query.filter(
|
||||
or_(
|
||||
Customer.email.ilike(search_pattern),
|
||||
Customer.first_name.ilike(search_pattern),
|
||||
Customer.last_name.ilike(search_pattern),
|
||||
Customer.customer_number.ilike(search_pattern),
|
||||
)
|
||||
)
|
||||
|
||||
if is_active is not None:
|
||||
query = query.filter(Customer.is_active == is_active)
|
||||
|
||||
# Order by most recent first
|
||||
query = query.order_by(Customer.created_at.desc())
|
||||
|
||||
total = query.count()
|
||||
customers = query.offset(skip).limit(limit).all()
|
||||
|
||||
return customers, total
|
||||
|
||||
def get_customer_statistics(
|
||||
self, db: Session, vendor_id: int, customer_id: int
|
||||
) -> dict:
|
||||
"""
|
||||
Get detailed statistics for a customer.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
customer_id: Customer ID
|
||||
|
||||
Returns:
|
||||
Dict with customer statistics
|
||||
"""
|
||||
from sqlalchemy import func
|
||||
|
||||
from models.database.order import Order
|
||||
|
||||
customer = self.get_customer(db, vendor_id, customer_id)
|
||||
|
||||
# Get order statistics
|
||||
order_stats = (
|
||||
db.query(
|
||||
func.count(Order.id).label("total_orders"),
|
||||
func.sum(Order.total_cents).label("total_spent_cents"),
|
||||
func.avg(Order.total_cents).label("avg_order_cents"),
|
||||
func.max(Order.created_at).label("last_order_date"),
|
||||
)
|
||||
.filter(Order.customer_id == customer_id)
|
||||
.first()
|
||||
)
|
||||
|
||||
total_orders = order_stats.total_orders or 0
|
||||
total_spent_cents = order_stats.total_spent_cents or 0
|
||||
avg_order_cents = order_stats.avg_order_cents or 0
|
||||
|
||||
return {
|
||||
"customer_id": customer_id,
|
||||
"total_orders": total_orders,
|
||||
"total_spent": total_spent_cents / 100, # Convert to euros
|
||||
"average_order_value": avg_order_cents / 100 if avg_order_cents else 0.0,
|
||||
"last_order_date": order_stats.last_order_date,
|
||||
"member_since": customer.created_at,
|
||||
"is_active": customer.is_active,
|
||||
}
|
||||
|
||||
def toggle_customer_status(
|
||||
self, db: Session, vendor_id: int, customer_id: int
|
||||
) -> Customer:
|
||||
"""
|
||||
Toggle customer active status.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
customer_id: Customer ID
|
||||
|
||||
Returns:
|
||||
Customer: Updated customer
|
||||
"""
|
||||
customer = self.get_customer(db, vendor_id, customer_id)
|
||||
customer.is_active = not customer.is_active
|
||||
|
||||
db.flush()
|
||||
db.refresh(customer)
|
||||
|
||||
action = "activated" if customer.is_active else "deactivated"
|
||||
logger.info(f"Customer {action}: {customer.email} (ID: {customer.id})")
|
||||
|
||||
return customer
|
||||
|
||||
def update_customer(
|
||||
self,
|
||||
db: Session,
|
||||
|
||||
@@ -1 +1,555 @@
|
||||
# File and media management services
|
||||
# app/services/media_service.py
|
||||
"""
|
||||
Media service for vendor media library management.
|
||||
|
||||
This module provides:
|
||||
- File upload and storage
|
||||
- Thumbnail generation for images
|
||||
- Media metadata management
|
||||
- Media usage tracking
|
||||
"""
|
||||
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import shutil
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.exceptions import ValidationException
|
||||
from models.database.media import MediaFile, ProductMedia
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Base upload directory
|
||||
UPLOAD_DIR = Path("uploads")
|
||||
VENDOR_UPLOAD_DIR = UPLOAD_DIR / "vendors"
|
||||
|
||||
# Allowed file types and their categories
|
||||
ALLOWED_EXTENSIONS = {
|
||||
# Images
|
||||
"jpg": "image",
|
||||
"jpeg": "image",
|
||||
"png": "image",
|
||||
"gif": "image",
|
||||
"webp": "image",
|
||||
"svg": "image",
|
||||
# Videos
|
||||
"mp4": "video",
|
||||
"webm": "video",
|
||||
"mov": "video",
|
||||
# Documents
|
||||
"pdf": "document",
|
||||
"doc": "document",
|
||||
"docx": "document",
|
||||
"xls": "document",
|
||||
"xlsx": "document",
|
||||
"csv": "document",
|
||||
"txt": "document",
|
||||
}
|
||||
|
||||
# Maximum file sizes (in bytes)
|
||||
MAX_FILE_SIZES = {
|
||||
"image": 10 * 1024 * 1024, # 10 MB
|
||||
"video": 100 * 1024 * 1024, # 100 MB
|
||||
"document": 20 * 1024 * 1024, # 20 MB
|
||||
}
|
||||
|
||||
# Thumbnail settings
|
||||
THUMBNAIL_SIZE = (200, 200)
|
||||
|
||||
|
||||
class MediaNotFoundException(Exception):
|
||||
"""Raised when media file is not found."""
|
||||
|
||||
def __init__(self, media_id: int):
|
||||
self.media_id = media_id
|
||||
super().__init__(f"Media file {media_id} not found")
|
||||
|
||||
|
||||
class MediaService:
|
||||
"""Service for vendor media library operations."""
|
||||
|
||||
def _get_vendor_upload_path(self, vendor_id: int, folder: str = "general") -> Path:
|
||||
"""Get the upload directory path for a vendor."""
|
||||
return VENDOR_UPLOAD_DIR / str(vendor_id) / folder
|
||||
|
||||
def _ensure_upload_dir(self, path: Path) -> None:
|
||||
"""Ensure upload directory exists."""
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _get_file_extension(self, filename: str) -> str:
|
||||
"""Extract file extension from filename."""
|
||||
return filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
|
||||
|
||||
def _get_media_type(self, extension: str) -> str | None:
|
||||
"""Get media type from file extension."""
|
||||
return ALLOWED_EXTENSIONS.get(extension)
|
||||
|
||||
def _generate_unique_filename(self, original_filename: str) -> str:
|
||||
"""Generate a unique filename using UUID."""
|
||||
ext = self._get_file_extension(original_filename)
|
||||
return f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex
|
||||
|
||||
def _validate_file(
|
||||
self, filename: str, file_size: int
|
||||
) -> tuple[str, str]:
|
||||
"""
|
||||
Validate uploaded file.
|
||||
|
||||
Returns:
|
||||
Tuple of (extension, media_type)
|
||||
|
||||
Raises:
|
||||
ValidationException: If file is invalid
|
||||
"""
|
||||
ext = self._get_file_extension(filename)
|
||||
|
||||
if not ext:
|
||||
raise ValidationException("File must have an extension")
|
||||
|
||||
media_type = self._get_media_type(ext)
|
||||
if not media_type:
|
||||
allowed = ", ".join(sorted(ALLOWED_EXTENSIONS.keys()))
|
||||
raise ValidationException(
|
||||
f"File type '{ext}' not allowed. Allowed types: {allowed}"
|
||||
)
|
||||
|
||||
max_size = MAX_FILE_SIZES.get(media_type, 10 * 1024 * 1024)
|
||||
if file_size > max_size:
|
||||
max_mb = max_size / (1024 * 1024)
|
||||
raise ValidationException(
|
||||
f"File too large. Maximum size for {media_type} is {max_mb:.0f} MB"
|
||||
)
|
||||
|
||||
return ext, media_type
|
||||
|
||||
def _get_image_dimensions(self, file_path: Path) -> tuple[int, int] | None:
|
||||
"""Get image dimensions if PIL is available."""
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
with Image.open(file_path) as img:
|
||||
return img.size
|
||||
except ImportError:
|
||||
logger.debug("PIL not available, skipping image dimension detection")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not get image dimensions: {e}")
|
||||
return None
|
||||
|
||||
def _generate_thumbnail(
|
||||
self, source_path: Path, vendor_id: int
|
||||
) -> str | None:
|
||||
"""Generate thumbnail for image file."""
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
# Create thumbnails directory
|
||||
thumb_dir = self._get_vendor_upload_path(vendor_id, "thumbnails")
|
||||
self._ensure_upload_dir(thumb_dir)
|
||||
|
||||
# Generate thumbnail filename
|
||||
thumb_filename = f"thumb_{source_path.name}"
|
||||
thumb_path = thumb_dir / thumb_filename
|
||||
|
||||
# Create thumbnail
|
||||
with Image.open(source_path) as img:
|
||||
img.thumbnail(THUMBNAIL_SIZE)
|
||||
# Convert to RGB if needed (for PNG with transparency)
|
||||
if img.mode in ("RGBA", "P"):
|
||||
img = img.convert("RGB")
|
||||
img.save(thumb_path, "JPEG", quality=85)
|
||||
|
||||
# Return relative path
|
||||
return str(thumb_path.relative_to(UPLOAD_DIR))
|
||||
|
||||
except ImportError:
|
||||
logger.debug("PIL not available, skipping thumbnail generation")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not generate thumbnail: {e}")
|
||||
return None
|
||||
|
||||
async def upload_file(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
file_content: bytes,
|
||||
filename: str,
|
||||
folder: str = "general",
|
||||
) -> MediaFile:
|
||||
"""
|
||||
Upload a file to the media library.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
file_content: File content as bytes
|
||||
filename: Original filename
|
||||
folder: Folder to store in (products, general, etc.)
|
||||
|
||||
Returns:
|
||||
Created MediaFile record
|
||||
"""
|
||||
# Validate file
|
||||
file_size = len(file_content)
|
||||
ext, media_type = self._validate_file(filename, file_size)
|
||||
|
||||
# Generate unique filename
|
||||
unique_filename = self._generate_unique_filename(filename)
|
||||
|
||||
# Get upload path
|
||||
upload_path = self._get_vendor_upload_path(vendor_id, folder)
|
||||
self._ensure_upload_dir(upload_path)
|
||||
|
||||
# Save file
|
||||
file_path = upload_path / unique_filename
|
||||
file_path.write_bytes(file_content)
|
||||
|
||||
# Get relative path for storage
|
||||
relative_path = str(file_path.relative_to(UPLOAD_DIR))
|
||||
|
||||
# Get MIME type
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
|
||||
# Get image dimensions and generate thumbnail
|
||||
width, height = None, None
|
||||
thumbnail_path = None
|
||||
|
||||
if media_type == "image":
|
||||
dimensions = self._get_image_dimensions(file_path)
|
||||
if dimensions:
|
||||
width, height = dimensions
|
||||
thumbnail_path = self._generate_thumbnail(file_path, vendor_id)
|
||||
|
||||
# Create database record
|
||||
media_file = MediaFile(
|
||||
vendor_id=vendor_id,
|
||||
filename=unique_filename,
|
||||
original_filename=filename,
|
||||
file_path=relative_path,
|
||||
media_type=media_type,
|
||||
mime_type=mime_type,
|
||||
file_size=file_size,
|
||||
width=width,
|
||||
height=height,
|
||||
thumbnail_path=thumbnail_path,
|
||||
folder=folder,
|
||||
)
|
||||
|
||||
db.add(media_file)
|
||||
db.flush()
|
||||
db.refresh(media_file)
|
||||
|
||||
logger.info(
|
||||
f"Uploaded media file {media_file.id} for vendor {vendor_id}: {filename}"
|
||||
)
|
||||
|
||||
return media_file
|
||||
|
||||
def get_media(
|
||||
self, db: Session, vendor_id: int, media_id: int
|
||||
) -> MediaFile:
|
||||
"""
|
||||
Get a media file by ID.
|
||||
|
||||
Raises:
|
||||
MediaNotFoundException: If media not found or doesn't belong to vendor
|
||||
"""
|
||||
media = (
|
||||
db.query(MediaFile)
|
||||
.filter(
|
||||
MediaFile.id == media_id,
|
||||
MediaFile.vendor_id == vendor_id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not media:
|
||||
raise MediaNotFoundException(media_id)
|
||||
|
||||
return media
|
||||
|
||||
def get_media_library(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
media_type: str | None = None,
|
||||
folder: str | None = None,
|
||||
search: str | None = None,
|
||||
) -> tuple[list[MediaFile], int]:
|
||||
"""
|
||||
Get vendor media library with filtering.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
skip: Pagination offset
|
||||
limit: Pagination limit
|
||||
media_type: Filter by media type
|
||||
folder: Filter by folder
|
||||
search: Search in filename
|
||||
|
||||
Returns:
|
||||
Tuple of (media_files, total_count)
|
||||
"""
|
||||
query = db.query(MediaFile).filter(MediaFile.vendor_id == vendor_id)
|
||||
|
||||
if media_type:
|
||||
query = query.filter(MediaFile.media_type == media_type)
|
||||
|
||||
if folder:
|
||||
query = query.filter(MediaFile.folder == folder)
|
||||
|
||||
if search:
|
||||
search_pattern = f"%{search}%"
|
||||
query = query.filter(
|
||||
or_(
|
||||
MediaFile.filename.ilike(search_pattern),
|
||||
MediaFile.original_filename.ilike(search_pattern),
|
||||
MediaFile.alt_text.ilike(search_pattern),
|
||||
)
|
||||
)
|
||||
|
||||
# Order by newest first
|
||||
query = query.order_by(MediaFile.created_at.desc())
|
||||
|
||||
total = query.count()
|
||||
media_files = query.offset(skip).limit(limit).all()
|
||||
|
||||
return media_files, total
|
||||
|
||||
def update_media_metadata(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
media_id: int,
|
||||
filename: str | None = None,
|
||||
alt_text: str | None = None,
|
||||
description: str | None = None,
|
||||
folder: str | None = None,
|
||||
metadata: dict | None = None,
|
||||
) -> MediaFile:
|
||||
"""
|
||||
Update media file metadata.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
media_id: Media file ID
|
||||
filename: New display filename
|
||||
alt_text: Alt text for images
|
||||
description: File description
|
||||
folder: Move to different folder
|
||||
metadata: Additional metadata
|
||||
|
||||
Returns:
|
||||
Updated MediaFile
|
||||
"""
|
||||
media = self.get_media(db, vendor_id, media_id)
|
||||
|
||||
if filename is not None:
|
||||
media.original_filename = filename
|
||||
|
||||
if alt_text is not None:
|
||||
media.alt_text = alt_text
|
||||
|
||||
if description is not None:
|
||||
media.description = description
|
||||
|
||||
if folder is not None and folder != media.folder:
|
||||
# Move file to new folder
|
||||
old_path = UPLOAD_DIR / media.file_path
|
||||
new_dir = self._get_vendor_upload_path(vendor_id, folder)
|
||||
self._ensure_upload_dir(new_dir)
|
||||
new_path = new_dir / media.filename
|
||||
|
||||
if old_path.exists():
|
||||
shutil.move(str(old_path), str(new_path))
|
||||
media.file_path = str(new_path.relative_to(UPLOAD_DIR))
|
||||
|
||||
media.folder = folder
|
||||
|
||||
if metadata is not None:
|
||||
media.extra_metadata = metadata
|
||||
|
||||
media.updated_at = datetime.now(UTC)
|
||||
db.flush()
|
||||
|
||||
logger.info(f"Updated media metadata for {media_id}")
|
||||
|
||||
return media
|
||||
|
||||
def delete_media(
|
||||
self, db: Session, vendor_id: int, media_id: int
|
||||
) -> bool:
|
||||
"""
|
||||
Delete a media file.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
media_id: Media file ID
|
||||
|
||||
Returns:
|
||||
True if deleted successfully
|
||||
"""
|
||||
media = self.get_media(db, vendor_id, media_id)
|
||||
|
||||
# Delete physical files
|
||||
file_path = UPLOAD_DIR / media.file_path
|
||||
if file_path.exists():
|
||||
file_path.unlink()
|
||||
|
||||
if media.thumbnail_path:
|
||||
thumb_path = UPLOAD_DIR / media.thumbnail_path
|
||||
if thumb_path.exists():
|
||||
thumb_path.unlink()
|
||||
|
||||
# Delete database record
|
||||
db.delete(media)
|
||||
|
||||
logger.info(f"Deleted media file {media_id} for vendor {vendor_id}")
|
||||
|
||||
return True
|
||||
|
||||
def get_media_usage(
|
||||
self, db: Session, vendor_id: int, media_id: int
|
||||
) -> dict:
|
||||
"""
|
||||
Get where a media file is being used.
|
||||
|
||||
Returns:
|
||||
Dict with products and other usage information
|
||||
"""
|
||||
media = self.get_media(db, vendor_id, media_id)
|
||||
|
||||
# Get product associations
|
||||
product_usage = []
|
||||
for assoc in media.product_associations:
|
||||
product = assoc.product
|
||||
if product:
|
||||
product_usage.append({
|
||||
"product_id": product.id,
|
||||
"product_name": product.get_title() or f"Product {product.id}",
|
||||
"usage_type": assoc.usage_type,
|
||||
})
|
||||
|
||||
return {
|
||||
"media_id": media_id,
|
||||
"products": product_usage,
|
||||
"other_usage": [],
|
||||
"total_usage_count": len(product_usage),
|
||||
}
|
||||
|
||||
def attach_to_product(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
media_id: int,
|
||||
product_id: int,
|
||||
usage_type: str = "gallery",
|
||||
display_order: int = 0,
|
||||
) -> ProductMedia:
|
||||
"""
|
||||
Attach a media file to a product.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
media_id: Media file ID
|
||||
product_id: Product ID
|
||||
usage_type: How the media is used (main_image, gallery, etc.)
|
||||
display_order: Order for galleries
|
||||
|
||||
Returns:
|
||||
Created ProductMedia association
|
||||
"""
|
||||
# Verify media belongs to vendor
|
||||
media = self.get_media(db, vendor_id, media_id)
|
||||
|
||||
# Check if already attached with same usage type
|
||||
existing = (
|
||||
db.query(ProductMedia)
|
||||
.filter(
|
||||
ProductMedia.product_id == product_id,
|
||||
ProductMedia.media_id == media_id,
|
||||
ProductMedia.usage_type == usage_type,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if existing:
|
||||
existing.display_order = display_order
|
||||
db.flush()
|
||||
return existing
|
||||
|
||||
# Create association
|
||||
product_media = ProductMedia(
|
||||
product_id=product_id,
|
||||
media_id=media_id,
|
||||
usage_type=usage_type,
|
||||
display_order=display_order,
|
||||
)
|
||||
|
||||
db.add(product_media)
|
||||
|
||||
# Update usage count
|
||||
media.usage_count = (media.usage_count or 0) + 1
|
||||
|
||||
db.flush()
|
||||
|
||||
return product_media
|
||||
|
||||
def detach_from_product(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
media_id: int,
|
||||
product_id: int,
|
||||
usage_type: str | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Detach a media file from a product.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
media_id: Media file ID
|
||||
product_id: Product ID
|
||||
usage_type: Specific usage type to remove (None = all)
|
||||
|
||||
Returns:
|
||||
True if detached
|
||||
"""
|
||||
# Verify media belongs to vendor
|
||||
media = self.get_media(db, vendor_id, media_id)
|
||||
|
||||
query = db.query(ProductMedia).filter(
|
||||
ProductMedia.product_id == product_id,
|
||||
ProductMedia.media_id == media_id,
|
||||
)
|
||||
|
||||
if usage_type:
|
||||
query = query.filter(ProductMedia.usage_type == usage_type)
|
||||
|
||||
deleted_count = query.delete()
|
||||
|
||||
# Update usage count
|
||||
if deleted_count > 0:
|
||||
media.usage_count = max(0, (media.usage_count or 0) - deleted_count)
|
||||
|
||||
db.flush()
|
||||
|
||||
return deleted_count > 0
|
||||
|
||||
|
||||
# Create service instance
|
||||
media_service = MediaService()
|
||||
|
||||
@@ -242,6 +242,89 @@ class ProductService:
|
||||
logger.error(f"Error getting vendor products: {str(e)}")
|
||||
raise ValidationException("Failed to retrieve products")
|
||||
|
||||
def search_products(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
query: str,
|
||||
skip: int = 0,
|
||||
limit: int = 50,
|
||||
language: str = "en",
|
||||
) -> tuple[list[Product], int]:
|
||||
"""
|
||||
Search products in vendor catalog.
|
||||
|
||||
Searches across:
|
||||
- Product title and description (from translations)
|
||||
- Product SKU, brand, and GTIN
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
query: Search query string
|
||||
skip: Pagination offset
|
||||
limit: Pagination limit
|
||||
language: Language for translation search (default: 'en')
|
||||
|
||||
Returns:
|
||||
Tuple of (products, total_count)
|
||||
"""
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.orm import joinedload
|
||||
|
||||
from models.database.product_translation import ProductTranslation
|
||||
|
||||
try:
|
||||
# Prepare search pattern for LIKE queries
|
||||
search_pattern = f"%{query}%"
|
||||
|
||||
# Build base query with translation join
|
||||
base_query = (
|
||||
db.query(Product)
|
||||
.outerjoin(
|
||||
ProductTranslation,
|
||||
(Product.id == ProductTranslation.product_id)
|
||||
& (ProductTranslation.language == language),
|
||||
)
|
||||
.filter(
|
||||
Product.vendor_id == vendor_id,
|
||||
Product.is_active == True,
|
||||
)
|
||||
.filter(
|
||||
or_(
|
||||
# Search in translations
|
||||
ProductTranslation.title.ilike(search_pattern),
|
||||
ProductTranslation.description.ilike(search_pattern),
|
||||
ProductTranslation.short_description.ilike(search_pattern),
|
||||
# Search in product fields
|
||||
Product.vendor_sku.ilike(search_pattern),
|
||||
Product.brand.ilike(search_pattern),
|
||||
Product.gtin.ilike(search_pattern),
|
||||
)
|
||||
)
|
||||
.distinct()
|
||||
)
|
||||
|
||||
# Get total count
|
||||
total = base_query.count()
|
||||
|
||||
# Get paginated results with eager loading for performance
|
||||
products = (
|
||||
base_query.options(joinedload(Product.translations))
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Search '{query}' for vendor {vendor_id}: {total} results"
|
||||
)
|
||||
return products, total
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error searching products: {str(e)}")
|
||||
raise ValidationException("Failed to search products")
|
||||
|
||||
|
||||
# Create service instance
|
||||
product_service = ProductService()
|
||||
|
||||
Reference in New Issue
Block a user