feat: consolidate media service, add merchant users page, fix metrics overlap
- Merge ImageService into MediaService with WebP variant generation, DB-backed storage stats, and module-driven media usage discovery via new MediaUsageProviderProtocol - Add merchant users admin page with scoped user listing, stats endpoint, template, JS, and i18n strings (de/en/fr/lb) - Fix merchant user metrics so Owners and Team Members are mutually exclusive (filter team_members on user_type="member" and exclude owner IDs) ensuring stat cards add up correctly - Update billing and monitoring services to use media_service - Update subscription-billing and feature-gating docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -18,7 +18,7 @@ from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy import func, or_
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.modules.cms.exceptions import (
|
||||
@@ -69,6 +69,14 @@ MAX_FILE_SIZES = {
|
||||
# Thumbnail settings
|
||||
THUMBNAIL_SIZE = (200, 200)
|
||||
|
||||
# Image variant settings (ported from ImageService)
|
||||
IMAGE_VARIANT_QUALITY = 85
|
||||
IMAGE_MAX_DIMENSION = 2000
|
||||
IMAGE_VARIANT_SIZES = {
|
||||
"800": 800,
|
||||
"200": 200,
|
||||
}
|
||||
|
||||
|
||||
class MediaService:
|
||||
"""Service for store media library operations."""
|
||||
@@ -139,38 +147,80 @@ class MediaService:
|
||||
logger.warning(f"Could not get image dimensions: {e}")
|
||||
return None
|
||||
|
||||
def _generate_thumbnail(
|
||||
self, source_path: Path, store_id: int
|
||||
) -> str | None:
|
||||
"""Generate thumbnail for image file."""
|
||||
def _resize_image(self, img: "Image.Image", max_dimension: int) -> "Image.Image":
|
||||
"""Resize image while maintaining aspect ratio.
|
||||
|
||||
Args:
|
||||
img: PIL Image
|
||||
max_dimension: Maximum width or height
|
||||
|
||||
Returns:
|
||||
Resized PIL Image (or original if already smaller)
|
||||
"""
|
||||
width, height = img.size
|
||||
if width <= max_dimension and height <= max_dimension:
|
||||
return img
|
||||
|
||||
if width > height:
|
||||
new_width = max_dimension
|
||||
new_height = int(height * (max_dimension / width))
|
||||
else:
|
||||
new_height = max_dimension
|
||||
new_width = int(width * (max_dimension / height))
|
||||
|
||||
from PIL import Image
|
||||
return img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||||
|
||||
def _generate_image_variants(
|
||||
self, source_path: Path, filename_stem: str
|
||||
) -> dict[str, str]:
|
||||
"""Generate WebP image variants at multiple sizes.
|
||||
|
||||
Args:
|
||||
source_path: Path to the original image file
|
||||
filename_stem: UUID stem for naming variants
|
||||
|
||||
Returns:
|
||||
Dict of size_name -> relative path (from UPLOAD_DIR)
|
||||
"""
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
# Create thumbnails directory
|
||||
thumb_dir = self._get_store_upload_path(store_id, "thumbnails")
|
||||
self._ensure_upload_dir(thumb_dir)
|
||||
variants: dict[str, str] = {}
|
||||
parent_dir = source_path.parent
|
||||
|
||||
# Generate thumbnail filename
|
||||
thumb_filename = f"thumb_{source_path.name}"
|
||||
thumb_path = thumb_dir / thumb_filename
|
||||
|
||||
# Create thumbnail
|
||||
with Image.open(source_path) as img:
|
||||
img.thumbnail(THUMBNAIL_SIZE)
|
||||
# Convert to RGB if needed (for PNG with transparency)
|
||||
if img.mode in ("RGBA", "P"):
|
||||
img = img.convert("RGB")
|
||||
img.save(thumb_path, "JPEG", quality=85)
|
||||
|
||||
# Return relative path
|
||||
return str(thumb_path.relative_to(UPLOAD_DIR))
|
||||
# Resize original if larger than max dimension
|
||||
img = self._resize_image(img, IMAGE_MAX_DIMENSION)
|
||||
|
||||
for size_name, max_dim in IMAGE_VARIANT_SIZES.items():
|
||||
variant_img = self._resize_image(img.copy(), max_dim)
|
||||
variant_filename = f"{filename_stem}_{size_name}.webp"
|
||||
variant_path = parent_dir / variant_filename
|
||||
|
||||
variant_img.save(
|
||||
variant_path, "WEBP", quality=IMAGE_VARIANT_QUALITY
|
||||
)
|
||||
|
||||
variants[size_name] = str(
|
||||
variant_path.relative_to(UPLOAD_DIR)
|
||||
)
|
||||
logger.debug(
|
||||
f"Generated {size_name}px variant: {variant_path}"
|
||||
)
|
||||
|
||||
return variants
|
||||
|
||||
except ImportError:
|
||||
logger.debug("PIL not available, skipping thumbnail generation")
|
||||
return None
|
||||
logger.debug("PIL not available, skipping variant generation")
|
||||
return {}
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not generate thumbnail: {e}")
|
||||
return None
|
||||
logger.warning(f"Could not generate image variants: {e}")
|
||||
return {}
|
||||
|
||||
async def upload_file(
|
||||
self,
|
||||
@@ -214,15 +264,24 @@ class MediaService:
|
||||
# Get MIME type
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
|
||||
# Get image dimensions and generate thumbnail
|
||||
# Get image dimensions and generate variants
|
||||
width, height = None, None
|
||||
thumbnail_path = None
|
||||
extra_metadata: dict[str, Any] = {}
|
||||
|
||||
if media_type == "image":
|
||||
dimensions = self._get_image_dimensions(file_path)
|
||||
if dimensions:
|
||||
width, height = dimensions
|
||||
thumbnail_path = self._generate_thumbnail(file_path, store_id)
|
||||
|
||||
# Generate WebP variants (800px, 200px)
|
||||
stem = unique_filename.rsplit(".", 1)[0] if "." in unique_filename else unique_filename
|
||||
variants = self._generate_image_variants(file_path, stem)
|
||||
if variants:
|
||||
extra_metadata["variants"] = variants
|
||||
# Use the 200px variant as thumbnail
|
||||
if "200" in variants:
|
||||
thumbnail_path = variants["200"]
|
||||
|
||||
# Create database record
|
||||
media_file = MediaFile(
|
||||
@@ -237,6 +296,7 @@ class MediaService:
|
||||
height=height,
|
||||
thumbnail_path=thumbnail_path,
|
||||
folder=folder,
|
||||
extra_metadata=extra_metadata if extra_metadata else None,
|
||||
)
|
||||
|
||||
db.add(media_file)
|
||||
@@ -410,6 +470,13 @@ class MediaService:
|
||||
if thumb_path.exists():
|
||||
thumb_path.unlink()
|
||||
|
||||
# Delete variant files
|
||||
if media.extra_metadata and "variants" in media.extra_metadata:
|
||||
for variant_path in media.extra_metadata["variants"].values():
|
||||
vpath = UPLOAD_DIR / variant_path
|
||||
if vpath.exists():
|
||||
vpath.unlink()
|
||||
|
||||
# Delete database record
|
||||
db.delete(media)
|
||||
|
||||
@@ -417,9 +484,38 @@ class MediaService:
|
||||
|
||||
return True
|
||||
|
||||
# Note: Product-specific methods (attach_to_product, detach_from_product,
|
||||
# get_media_usage) have been moved to catalog.services.product_media_service
|
||||
# CMS media_service is now consumer-agnostic.
|
||||
def get_storage_stats(self, db: Session) -> dict:
|
||||
"""Get storage statistics from MediaFile records.
|
||||
|
||||
Returns:
|
||||
Dict with storage metrics for health monitoring.
|
||||
"""
|
||||
total_files = db.query(func.count(MediaFile.id)).scalar() or 0
|
||||
total_size = db.query(func.sum(MediaFile.file_size)).scalar() or 0
|
||||
|
||||
# Count distinct folders as proxy for directory count
|
||||
directory_count = (
|
||||
db.query(func.count(func.distinct(MediaFile.folder))).scalar() or 0
|
||||
)
|
||||
|
||||
# Image-specific stats
|
||||
image_count = (
|
||||
db.query(func.count(MediaFile.id))
|
||||
.filter(MediaFile.media_type == "image")
|
||||
.scalar()
|
||||
or 0
|
||||
)
|
||||
|
||||
return {
|
||||
"total_files": total_files,
|
||||
"total_size_bytes": total_size,
|
||||
"total_size_mb": round(total_size / (1024 * 1024), 2) if total_size else 0,
|
||||
"total_size_gb": round(total_size / (1024 * 1024 * 1024), 3) if total_size else 0,
|
||||
"directory_count": directory_count,
|
||||
"max_files_per_dir": 0, # Not applicable for DB-backed tracking
|
||||
"avg_files_per_dir": round(total_files / directory_count, 1) if directory_count else 0,
|
||||
"products_estimated": image_count,
|
||||
}
|
||||
|
||||
|
||||
# Create service instance
|
||||
|
||||
Reference in New Issue
Block a user