- Replace 153 broad `except Exception` with specific types (SQLAlchemyError, TemplateError, OSError, SMTPException, ClientError, etc.) across 37 services - Break catalog↔inventory circular dependency (IMPORT-004) - Create 19 skeleton test files for MOD-024 coverage - Exclude aggregator services from MOD-024 (false positives) - Update test mocks to match narrowed exception types Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
521 lines
16 KiB
Python
521 lines
16 KiB
Python
# app/modules/cms/services/media_service.py
|
|
"""
|
|
Media service for store media library management.
|
|
|
|
This module provides:
|
|
- File upload and storage
|
|
- Thumbnail generation for images
|
|
- Media metadata management
|
|
- Media usage tracking
|
|
"""
|
|
|
|
import logging
|
|
import mimetypes
|
|
import shutil
|
|
import uuid
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from sqlalchemy import func, or_
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.modules.cms.exceptions import (
|
|
MediaFileTooLargeException,
|
|
MediaNotFoundException,
|
|
MediaValidationException,
|
|
UnsupportedMediaTypeException,
|
|
)
|
|
from app.modules.cms.models import MediaFile
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Base upload directory
|
|
UPLOAD_DIR = Path("uploads")
|
|
STORE_UPLOAD_DIR = UPLOAD_DIR / "stores"
|
|
|
|
# Allowed file types and their categories
|
|
ALLOWED_EXTENSIONS = {
|
|
# Images
|
|
"jpg": "image",
|
|
"jpeg": "image",
|
|
"png": "image",
|
|
"gif": "image",
|
|
"webp": "image",
|
|
"svg": "image",
|
|
# Videos
|
|
"mp4": "video",
|
|
"webm": "video",
|
|
"mov": "video",
|
|
# Documents
|
|
"pdf": "document",
|
|
"doc": "document",
|
|
"docx": "document",
|
|
"xls": "document",
|
|
"xlsx": "document",
|
|
"csv": "document",
|
|
"txt": "document",
|
|
}
|
|
|
|
# Maximum file sizes (in bytes)
|
|
MAX_FILE_SIZES = {
|
|
"image": 10 * 1024 * 1024, # 10 MB
|
|
"video": 100 * 1024 * 1024, # 100 MB
|
|
"document": 20 * 1024 * 1024, # 20 MB
|
|
}
|
|
|
|
# Thumbnail settings
|
|
THUMBNAIL_SIZE = (200, 200)
|
|
|
|
# Image variant settings (ported from ImageService)
|
|
IMAGE_VARIANT_QUALITY = 85
|
|
IMAGE_MAX_DIMENSION = 2000
|
|
IMAGE_VARIANT_SIZES = {
|
|
"800": 800,
|
|
"200": 200,
|
|
}
|
|
|
|
|
|
class MediaService:
|
|
"""Service for store media library operations."""
|
|
|
|
def _get_store_upload_path(self, store_id: int, folder: str = "general") -> Path:
|
|
"""Get the upload directory path for a store."""
|
|
return STORE_UPLOAD_DIR / str(store_id) / folder
|
|
|
|
def _ensure_upload_dir(self, path: Path) -> None:
|
|
"""Ensure upload directory exists."""
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _get_file_extension(self, filename: str) -> str:
|
|
"""Extract file extension from filename."""
|
|
return filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
|
|
|
|
def _get_media_type(self, extension: str) -> str | None:
|
|
"""Get media type from file extension."""
|
|
return ALLOWED_EXTENSIONS.get(extension)
|
|
|
|
def _generate_unique_filename(self, original_filename: str) -> str:
|
|
"""Generate a unique filename using UUID."""
|
|
ext = self._get_file_extension(original_filename)
|
|
return f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex
|
|
|
|
def _validate_file(
|
|
self, filename: str, file_size: int
|
|
) -> tuple[str, str]:
|
|
"""
|
|
Validate uploaded file.
|
|
|
|
Returns:
|
|
Tuple of (extension, media_type)
|
|
|
|
Raises:
|
|
MediaValidationException: If file is invalid
|
|
UnsupportedMediaTypeException: If file type is not supported
|
|
MediaFileTooLargeException: If file exceeds size limit
|
|
"""
|
|
ext = self._get_file_extension(filename)
|
|
|
|
if not ext:
|
|
raise MediaValidationException("File must have an extension", field="file")
|
|
|
|
media_type = self._get_media_type(ext)
|
|
if not media_type:
|
|
raise UnsupportedMediaTypeException(
|
|
ext, allowed_types=list(ALLOWED_EXTENSIONS.keys())
|
|
)
|
|
|
|
max_size = MAX_FILE_SIZES.get(media_type, 10 * 1024 * 1024)
|
|
if file_size > max_size:
|
|
raise MediaFileTooLargeException(file_size, max_size, media_type)
|
|
|
|
return ext, media_type
|
|
|
|
def _get_image_dimensions(self, file_path: Path) -> tuple[int, int] | None:
|
|
"""Get image dimensions if PIL is available."""
|
|
try:
|
|
from PIL import Image
|
|
|
|
with Image.open(file_path) as img:
|
|
return img.size
|
|
except ImportError:
|
|
logger.debug("PIL not available, skipping image dimension detection")
|
|
return None
|
|
except OSError as e:
|
|
logger.warning(f"Could not get image dimensions: {e}")
|
|
return None
|
|
|
|
def _resize_image(self, img: "Image.Image", max_dimension: int) -> "Image.Image":
|
|
"""Resize image while maintaining aspect ratio.
|
|
|
|
Args:
|
|
img: PIL Image
|
|
max_dimension: Maximum width or height
|
|
|
|
Returns:
|
|
Resized PIL Image (or original if already smaller)
|
|
"""
|
|
width, height = img.size
|
|
if width <= max_dimension and height <= max_dimension:
|
|
return img
|
|
|
|
if width > height:
|
|
new_width = max_dimension
|
|
new_height = int(height * (max_dimension / width))
|
|
else:
|
|
new_height = max_dimension
|
|
new_width = int(width * (max_dimension / height))
|
|
|
|
from PIL import Image
|
|
return img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
def _generate_image_variants(
|
|
self, source_path: Path, filename_stem: str
|
|
) -> dict[str, str]:
|
|
"""Generate WebP image variants at multiple sizes.
|
|
|
|
Args:
|
|
source_path: Path to the original image file
|
|
filename_stem: UUID stem for naming variants
|
|
|
|
Returns:
|
|
Dict of size_name -> relative path (from UPLOAD_DIR)
|
|
"""
|
|
try:
|
|
from PIL import Image
|
|
|
|
variants: dict[str, str] = {}
|
|
parent_dir = source_path.parent
|
|
|
|
with Image.open(source_path) as img:
|
|
# Convert to RGB if needed (for PNG with transparency)
|
|
if img.mode in ("RGBA", "P"):
|
|
img = img.convert("RGB")
|
|
|
|
# Resize original if larger than max dimension
|
|
img = self._resize_image(img, IMAGE_MAX_DIMENSION)
|
|
|
|
for size_name, max_dim in IMAGE_VARIANT_SIZES.items():
|
|
variant_img = self._resize_image(img.copy(), max_dim)
|
|
variant_filename = f"{filename_stem}_{size_name}.webp"
|
|
variant_path = parent_dir / variant_filename
|
|
|
|
variant_img.save(
|
|
variant_path, "WEBP", quality=IMAGE_VARIANT_QUALITY
|
|
)
|
|
|
|
variants[size_name] = str(
|
|
variant_path.relative_to(UPLOAD_DIR)
|
|
)
|
|
logger.debug(
|
|
f"Generated {size_name}px variant: {variant_path}"
|
|
)
|
|
|
|
return variants
|
|
|
|
except ImportError:
|
|
logger.debug("PIL not available, skipping variant generation")
|
|
return {}
|
|
except OSError as e:
|
|
logger.warning(f"Could not generate image variants: {e}")
|
|
return {}
|
|
|
|
async def upload_file(
|
|
self,
|
|
db: Session,
|
|
store_id: int,
|
|
file_content: bytes,
|
|
filename: str,
|
|
folder: str = "general",
|
|
) -> MediaFile:
|
|
"""
|
|
Upload a file to the media library.
|
|
|
|
Args:
|
|
db: Database session
|
|
store_id: Store ID
|
|
file_content: File content as bytes
|
|
filename: Original filename
|
|
folder: Folder to store in (products, general, etc.)
|
|
|
|
Returns:
|
|
Created MediaFile record
|
|
"""
|
|
# Validate file
|
|
file_size = len(file_content)
|
|
ext, media_type = self._validate_file(filename, file_size)
|
|
|
|
# Generate unique filename
|
|
unique_filename = self._generate_unique_filename(filename)
|
|
|
|
# Get upload path
|
|
upload_path = self._get_store_upload_path(store_id, folder)
|
|
self._ensure_upload_dir(upload_path)
|
|
|
|
# Save file
|
|
file_path = upload_path / unique_filename
|
|
file_path.write_bytes(file_content)
|
|
|
|
# Get relative path for storage
|
|
relative_path = str(file_path.relative_to(UPLOAD_DIR))
|
|
|
|
# Get MIME type
|
|
mime_type, _ = mimetypes.guess_type(filename)
|
|
|
|
# Get image dimensions and generate variants
|
|
width, height = None, None
|
|
thumbnail_path = None
|
|
extra_metadata: dict[str, Any] = {}
|
|
|
|
if media_type == "image":
|
|
dimensions = self._get_image_dimensions(file_path)
|
|
if dimensions:
|
|
width, height = dimensions
|
|
|
|
# Generate WebP variants (800px, 200px)
|
|
stem = unique_filename.rsplit(".", 1)[0] if "." in unique_filename else unique_filename
|
|
variants = self._generate_image_variants(file_path, stem)
|
|
if variants:
|
|
extra_metadata["variants"] = variants
|
|
# Use the 200px variant as thumbnail
|
|
if "200" in variants:
|
|
thumbnail_path = variants["200"]
|
|
|
|
# Create database record
|
|
media_file = MediaFile(
|
|
store_id=store_id,
|
|
filename=unique_filename,
|
|
original_filename=filename,
|
|
file_path=relative_path,
|
|
media_type=media_type,
|
|
mime_type=mime_type,
|
|
file_size=file_size,
|
|
width=width,
|
|
height=height,
|
|
thumbnail_path=thumbnail_path,
|
|
folder=folder,
|
|
extra_metadata=extra_metadata if extra_metadata else None,
|
|
)
|
|
|
|
db.add(media_file)
|
|
db.flush()
|
|
db.refresh(media_file)
|
|
|
|
logger.info(
|
|
f"Uploaded media file {media_file.id} for store {store_id}: {filename}"
|
|
)
|
|
|
|
return media_file
|
|
|
|
def get_media(
|
|
self, db: Session, store_id: int, media_id: int
|
|
) -> MediaFile:
|
|
"""
|
|
Get a media file by ID.
|
|
|
|
Raises:
|
|
MediaNotFoundException: If media not found or doesn't belong to store
|
|
"""
|
|
media = (
|
|
db.query(MediaFile)
|
|
.filter(
|
|
MediaFile.id == media_id,
|
|
MediaFile.store_id == store_id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if not media:
|
|
raise MediaNotFoundException(media_id)
|
|
|
|
return media
|
|
|
|
def get_media_library(
|
|
self,
|
|
db: Session,
|
|
store_id: int,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
media_type: str | None = None,
|
|
folder: str | None = None,
|
|
search: str | None = None,
|
|
) -> tuple[list[MediaFile], int]:
|
|
"""
|
|
Get store media library with filtering.
|
|
|
|
Args:
|
|
db: Database session
|
|
store_id: Store ID
|
|
skip: Pagination offset
|
|
limit: Pagination limit
|
|
media_type: Filter by media type
|
|
folder: Filter by folder
|
|
search: Search in filename
|
|
|
|
Returns:
|
|
Tuple of (media_files, total_count)
|
|
"""
|
|
query = db.query(MediaFile).filter(MediaFile.store_id == store_id)
|
|
|
|
if media_type:
|
|
query = query.filter(MediaFile.media_type == media_type)
|
|
|
|
if folder:
|
|
query = query.filter(MediaFile.folder == folder)
|
|
|
|
if search:
|
|
search_pattern = f"%{search}%"
|
|
query = query.filter(
|
|
or_(
|
|
MediaFile.filename.ilike(search_pattern),
|
|
MediaFile.original_filename.ilike(search_pattern),
|
|
MediaFile.alt_text.ilike(search_pattern),
|
|
)
|
|
)
|
|
|
|
# Order by newest first
|
|
query = query.order_by(MediaFile.created_at.desc())
|
|
|
|
total = query.count()
|
|
media_files = query.offset(skip).limit(limit).all()
|
|
|
|
return media_files, total
|
|
|
|
def update_media_metadata(
|
|
self,
|
|
db: Session,
|
|
store_id: int,
|
|
media_id: int,
|
|
filename: str | None = None,
|
|
alt_text: str | None = None,
|
|
description: str | None = None,
|
|
folder: str | None = None,
|
|
metadata: dict | None = None,
|
|
) -> MediaFile:
|
|
"""
|
|
Update media file metadata.
|
|
|
|
Args:
|
|
db: Database session
|
|
store_id: Store ID
|
|
media_id: Media file ID
|
|
filename: New display filename
|
|
alt_text: Alt text for images
|
|
description: File description
|
|
folder: Move to different folder
|
|
metadata: Additional metadata
|
|
|
|
Returns:
|
|
Updated MediaFile
|
|
"""
|
|
media = self.get_media(db, store_id, media_id)
|
|
|
|
if filename is not None:
|
|
media.original_filename = filename
|
|
|
|
if alt_text is not None:
|
|
media.alt_text = alt_text
|
|
|
|
if description is not None:
|
|
media.description = description
|
|
|
|
if folder is not None and folder != media.folder:
|
|
# Move file to new folder
|
|
old_path = UPLOAD_DIR / media.file_path
|
|
new_dir = self._get_store_upload_path(store_id, folder)
|
|
self._ensure_upload_dir(new_dir)
|
|
new_path = new_dir / media.filename
|
|
|
|
if old_path.exists():
|
|
shutil.move(str(old_path), str(new_path))
|
|
media.file_path = str(new_path.relative_to(UPLOAD_DIR))
|
|
|
|
media.folder = folder
|
|
|
|
if metadata is not None:
|
|
media.extra_metadata = metadata
|
|
|
|
media.updated_at = datetime.now(UTC)
|
|
db.flush()
|
|
|
|
logger.info(f"Updated media metadata for {media_id}")
|
|
|
|
return media
|
|
|
|
def delete_media(
|
|
self, db: Session, store_id: int, media_id: int
|
|
) -> bool:
|
|
"""
|
|
Delete a media file.
|
|
|
|
Args:
|
|
db: Database session
|
|
store_id: Store ID
|
|
media_id: Media file ID
|
|
|
|
Returns:
|
|
True if deleted successfully
|
|
"""
|
|
media = self.get_media(db, store_id, media_id)
|
|
|
|
# Delete physical files
|
|
file_path = UPLOAD_DIR / media.file_path
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
|
|
if media.thumbnail_path:
|
|
thumb_path = UPLOAD_DIR / media.thumbnail_path
|
|
if thumb_path.exists():
|
|
thumb_path.unlink()
|
|
|
|
# Delete variant files
|
|
if media.extra_metadata and "variants" in media.extra_metadata:
|
|
for variant_path in media.extra_metadata["variants"].values():
|
|
vpath = UPLOAD_DIR / variant_path
|
|
if vpath.exists():
|
|
vpath.unlink()
|
|
|
|
# Delete database record
|
|
db.delete(media)
|
|
|
|
logger.info(f"Deleted media file {media_id} for store {store_id}")
|
|
|
|
return True
|
|
|
|
def get_storage_stats(self, db: Session) -> dict:
|
|
"""Get storage statistics from MediaFile records.
|
|
|
|
Returns:
|
|
Dict with storage metrics for health monitoring.
|
|
"""
|
|
total_files = db.query(func.count(MediaFile.id)).scalar() or 0
|
|
total_size = db.query(func.sum(MediaFile.file_size)).scalar() or 0
|
|
|
|
# Count distinct folders as proxy for directory count
|
|
directory_count = (
|
|
db.query(func.count(func.distinct(MediaFile.folder))).scalar() or 0
|
|
)
|
|
|
|
# Image-specific stats
|
|
image_count = (
|
|
db.query(func.count(MediaFile.id))
|
|
.filter(MediaFile.media_type == "image")
|
|
.scalar()
|
|
or 0
|
|
)
|
|
|
|
return {
|
|
"total_files": total_files,
|
|
"total_size_bytes": total_size,
|
|
"total_size_mb": round(total_size / (1024 * 1024), 2) if total_size else 0,
|
|
"total_size_gb": round(total_size / (1024 * 1024 * 1024), 3) if total_size else 0,
|
|
"directory_count": directory_count,
|
|
"max_files_per_dir": 0, # Not applicable for DB-backed tracking
|
|
"avg_files_per_dir": round(total_files / directory_count, 1) if directory_count else 0,
|
|
"products_estimated": image_count,
|
|
}
|
|
|
|
|
|
# Create service instance
|
|
media_service = MediaService()
|