Files
orion/app/modules/cms/services/media_service.py
Samir Boulahtit 2250054ba2 feat: consolidate media service, add merchant users page, fix metrics overlap
- Merge ImageService into MediaService with WebP variant generation,
  DB-backed storage stats, and module-driven media usage discovery
  via new MediaUsageProviderProtocol
- Add merchant users admin page with scoped user listing, stats
  endpoint, template, JS, and i18n strings (de/en/fr/lb)
- Fix merchant user metrics so Owners and Team Members are mutually
  exclusive (filter team_members on user_type="member" and exclude
  owner IDs) ensuring stat cards add up correctly
- Update billing and monitoring services to use media_service
- Update subscription-billing and feature-gating docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 21:17:11 +01:00

523 lines
16 KiB
Python

# app/modules/cms/services/media_service.py
"""
Media service for store media library management.
This module provides:
- File upload and storage
- Thumbnail generation for images
- Media metadata management
- Media usage tracking
"""
import logging
import mimetypes
import os
import shutil
import uuid
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.modules.cms.exceptions import (
MediaNotFoundException,
MediaUploadException,
MediaValidationException,
UnsupportedMediaTypeException,
MediaFileTooLargeException,
)
from app.modules.cms.models import MediaFile
logger = logging.getLogger(__name__)
# Base upload directory
UPLOAD_DIR = Path("uploads")
STORE_UPLOAD_DIR = UPLOAD_DIR / "stores"
# Allowed file types and their categories
ALLOWED_EXTENSIONS = {
# Images
"jpg": "image",
"jpeg": "image",
"png": "image",
"gif": "image",
"webp": "image",
"svg": "image",
# Videos
"mp4": "video",
"webm": "video",
"mov": "video",
# Documents
"pdf": "document",
"doc": "document",
"docx": "document",
"xls": "document",
"xlsx": "document",
"csv": "document",
"txt": "document",
}
# Maximum file sizes (in bytes)
MAX_FILE_SIZES = {
"image": 10 * 1024 * 1024, # 10 MB
"video": 100 * 1024 * 1024, # 100 MB
"document": 20 * 1024 * 1024, # 20 MB
}
# Thumbnail settings
THUMBNAIL_SIZE = (200, 200)
# Image variant settings (ported from ImageService)
IMAGE_VARIANT_QUALITY = 85
IMAGE_MAX_DIMENSION = 2000
IMAGE_VARIANT_SIZES = {
"800": 800,
"200": 200,
}
class MediaService:
"""Service for store media library operations."""
def _get_store_upload_path(self, store_id: int, folder: str = "general") -> Path:
"""Get the upload directory path for a store."""
return STORE_UPLOAD_DIR / str(store_id) / folder
def _ensure_upload_dir(self, path: Path) -> None:
"""Ensure upload directory exists."""
path.mkdir(parents=True, exist_ok=True)
def _get_file_extension(self, filename: str) -> str:
"""Extract file extension from filename."""
return filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
def _get_media_type(self, extension: str) -> str | None:
"""Get media type from file extension."""
return ALLOWED_EXTENSIONS.get(extension)
def _generate_unique_filename(self, original_filename: str) -> str:
"""Generate a unique filename using UUID."""
ext = self._get_file_extension(original_filename)
return f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex
def _validate_file(
self, filename: str, file_size: int
) -> tuple[str, str]:
"""
Validate uploaded file.
Returns:
Tuple of (extension, media_type)
Raises:
MediaValidationException: If file is invalid
UnsupportedMediaTypeException: If file type is not supported
MediaFileTooLargeException: If file exceeds size limit
"""
ext = self._get_file_extension(filename)
if not ext:
raise MediaValidationException("File must have an extension", field="file")
media_type = self._get_media_type(ext)
if not media_type:
raise UnsupportedMediaTypeException(
ext, allowed_types=list(ALLOWED_EXTENSIONS.keys())
)
max_size = MAX_FILE_SIZES.get(media_type, 10 * 1024 * 1024)
if file_size > max_size:
raise MediaFileTooLargeException(file_size, max_size, media_type)
return ext, media_type
def _get_image_dimensions(self, file_path: Path) -> tuple[int, int] | None:
"""Get image dimensions if PIL is available."""
try:
from PIL import Image
with Image.open(file_path) as img:
return img.size
except ImportError:
logger.debug("PIL not available, skipping image dimension detection")
return None
except Exception as e:
logger.warning(f"Could not get image dimensions: {e}")
return None
def _resize_image(self, img: "Image.Image", max_dimension: int) -> "Image.Image":
"""Resize image while maintaining aspect ratio.
Args:
img: PIL Image
max_dimension: Maximum width or height
Returns:
Resized PIL Image (or original if already smaller)
"""
width, height = img.size
if width <= max_dimension and height <= max_dimension:
return img
if width > height:
new_width = max_dimension
new_height = int(height * (max_dimension / width))
else:
new_height = max_dimension
new_width = int(width * (max_dimension / height))
from PIL import Image
return img.resize((new_width, new_height), Image.Resampling.LANCZOS)
def _generate_image_variants(
self, source_path: Path, filename_stem: str
) -> dict[str, str]:
"""Generate WebP image variants at multiple sizes.
Args:
source_path: Path to the original image file
filename_stem: UUID stem for naming variants
Returns:
Dict of size_name -> relative path (from UPLOAD_DIR)
"""
try:
from PIL import Image
variants: dict[str, str] = {}
parent_dir = source_path.parent
with Image.open(source_path) as img:
# Convert to RGB if needed (for PNG with transparency)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# Resize original if larger than max dimension
img = self._resize_image(img, IMAGE_MAX_DIMENSION)
for size_name, max_dim in IMAGE_VARIANT_SIZES.items():
variant_img = self._resize_image(img.copy(), max_dim)
variant_filename = f"{filename_stem}_{size_name}.webp"
variant_path = parent_dir / variant_filename
variant_img.save(
variant_path, "WEBP", quality=IMAGE_VARIANT_QUALITY
)
variants[size_name] = str(
variant_path.relative_to(UPLOAD_DIR)
)
logger.debug(
f"Generated {size_name}px variant: {variant_path}"
)
return variants
except ImportError:
logger.debug("PIL not available, skipping variant generation")
return {}
except Exception as e:
logger.warning(f"Could not generate image variants: {e}")
return {}
async def upload_file(
self,
db: Session,
store_id: int,
file_content: bytes,
filename: str,
folder: str = "general",
) -> MediaFile:
"""
Upload a file to the media library.
Args:
db: Database session
store_id: Store ID
file_content: File content as bytes
filename: Original filename
folder: Folder to store in (products, general, etc.)
Returns:
Created MediaFile record
"""
# Validate file
file_size = len(file_content)
ext, media_type = self._validate_file(filename, file_size)
# Generate unique filename
unique_filename = self._generate_unique_filename(filename)
# Get upload path
upload_path = self._get_store_upload_path(store_id, folder)
self._ensure_upload_dir(upload_path)
# Save file
file_path = upload_path / unique_filename
file_path.write_bytes(file_content)
# Get relative path for storage
relative_path = str(file_path.relative_to(UPLOAD_DIR))
# Get MIME type
mime_type, _ = mimetypes.guess_type(filename)
# Get image dimensions and generate variants
width, height = None, None
thumbnail_path = None
extra_metadata: dict[str, Any] = {}
if media_type == "image":
dimensions = self._get_image_dimensions(file_path)
if dimensions:
width, height = dimensions
# Generate WebP variants (800px, 200px)
stem = unique_filename.rsplit(".", 1)[0] if "." in unique_filename else unique_filename
variants = self._generate_image_variants(file_path, stem)
if variants:
extra_metadata["variants"] = variants
# Use the 200px variant as thumbnail
if "200" in variants:
thumbnail_path = variants["200"]
# Create database record
media_file = MediaFile(
store_id=store_id,
filename=unique_filename,
original_filename=filename,
file_path=relative_path,
media_type=media_type,
mime_type=mime_type,
file_size=file_size,
width=width,
height=height,
thumbnail_path=thumbnail_path,
folder=folder,
extra_metadata=extra_metadata if extra_metadata else None,
)
db.add(media_file)
db.flush()
db.refresh(media_file)
logger.info(
f"Uploaded media file {media_file.id} for store {store_id}: {filename}"
)
return media_file
def get_media(
self, db: Session, store_id: int, media_id: int
) -> MediaFile:
"""
Get a media file by ID.
Raises:
MediaNotFoundException: If media not found or doesn't belong to store
"""
media = (
db.query(MediaFile)
.filter(
MediaFile.id == media_id,
MediaFile.store_id == store_id,
)
.first()
)
if not media:
raise MediaNotFoundException(media_id)
return media
def get_media_library(
self,
db: Session,
store_id: int,
skip: int = 0,
limit: int = 100,
media_type: str | None = None,
folder: str | None = None,
search: str | None = None,
) -> tuple[list[MediaFile], int]:
"""
Get store media library with filtering.
Args:
db: Database session
store_id: Store ID
skip: Pagination offset
limit: Pagination limit
media_type: Filter by media type
folder: Filter by folder
search: Search in filename
Returns:
Tuple of (media_files, total_count)
"""
query = db.query(MediaFile).filter(MediaFile.store_id == store_id)
if media_type:
query = query.filter(MediaFile.media_type == media_type)
if folder:
query = query.filter(MediaFile.folder == folder)
if search:
search_pattern = f"%{search}%"
query = query.filter(
or_(
MediaFile.filename.ilike(search_pattern),
MediaFile.original_filename.ilike(search_pattern),
MediaFile.alt_text.ilike(search_pattern),
)
)
# Order by newest first
query = query.order_by(MediaFile.created_at.desc())
total = query.count()
media_files = query.offset(skip).limit(limit).all()
return media_files, total
def update_media_metadata(
self,
db: Session,
store_id: int,
media_id: int,
filename: str | None = None,
alt_text: str | None = None,
description: str | None = None,
folder: str | None = None,
metadata: dict | None = None,
) -> MediaFile:
"""
Update media file metadata.
Args:
db: Database session
store_id: Store ID
media_id: Media file ID
filename: New display filename
alt_text: Alt text for images
description: File description
folder: Move to different folder
metadata: Additional metadata
Returns:
Updated MediaFile
"""
media = self.get_media(db, store_id, media_id)
if filename is not None:
media.original_filename = filename
if alt_text is not None:
media.alt_text = alt_text
if description is not None:
media.description = description
if folder is not None and folder != media.folder:
# Move file to new folder
old_path = UPLOAD_DIR / media.file_path
new_dir = self._get_store_upload_path(store_id, folder)
self._ensure_upload_dir(new_dir)
new_path = new_dir / media.filename
if old_path.exists():
shutil.move(str(old_path), str(new_path))
media.file_path = str(new_path.relative_to(UPLOAD_DIR))
media.folder = folder
if metadata is not None:
media.extra_metadata = metadata
media.updated_at = datetime.now(UTC)
db.flush()
logger.info(f"Updated media metadata for {media_id}")
return media
def delete_media(
self, db: Session, store_id: int, media_id: int
) -> bool:
"""
Delete a media file.
Args:
db: Database session
store_id: Store ID
media_id: Media file ID
Returns:
True if deleted successfully
"""
media = self.get_media(db, store_id, media_id)
# Delete physical files
file_path = UPLOAD_DIR / media.file_path
if file_path.exists():
file_path.unlink()
if media.thumbnail_path:
thumb_path = UPLOAD_DIR / media.thumbnail_path
if thumb_path.exists():
thumb_path.unlink()
# Delete variant files
if media.extra_metadata and "variants" in media.extra_metadata:
for variant_path in media.extra_metadata["variants"].values():
vpath = UPLOAD_DIR / variant_path
if vpath.exists():
vpath.unlink()
# Delete database record
db.delete(media)
logger.info(f"Deleted media file {media_id} for store {store_id}")
return True
def get_storage_stats(self, db: Session) -> dict:
"""Get storage statistics from MediaFile records.
Returns:
Dict with storage metrics for health monitoring.
"""
total_files = db.query(func.count(MediaFile.id)).scalar() or 0
total_size = db.query(func.sum(MediaFile.file_size)).scalar() or 0
# Count distinct folders as proxy for directory count
directory_count = (
db.query(func.count(func.distinct(MediaFile.folder))).scalar() or 0
)
# Image-specific stats
image_count = (
db.query(func.count(MediaFile.id))
.filter(MediaFile.media_type == "image")
.scalar()
or 0
)
return {
"total_files": total_files,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2) if total_size else 0,
"total_size_gb": round(total_size / (1024 * 1024 * 1024), 3) if total_size else 0,
"directory_count": directory_count,
"max_files_per_dir": 0, # Not applicable for DB-backed tracking
"avg_files_per_dir": round(total_files / directory_count, 1) if directory_count else 0,
"products_estimated": image_count,
}
# Create service instance
media_service = MediaService()