# app/modules/cms/services/media_service.py """ Media service for store media library management. This module provides: - File upload and storage - Thumbnail generation for images - Media metadata management - Media usage tracking """ import logging import mimetypes import shutil import uuid from datetime import UTC, datetime from pathlib import Path from typing import Any from sqlalchemy import func, or_ from sqlalchemy.orm import Session from app.modules.cms.exceptions import ( MediaFileTooLargeException, MediaNotFoundException, MediaValidationException, UnsupportedMediaTypeException, ) from app.modules.cms.models import MediaFile logger = logging.getLogger(__name__) # Base upload directory UPLOAD_DIR = Path("uploads") STORE_UPLOAD_DIR = UPLOAD_DIR / "stores" # Allowed file types and their categories ALLOWED_EXTENSIONS = { # Images "jpg": "image", "jpeg": "image", "png": "image", "gif": "image", "webp": "image", "svg": "image", # Videos "mp4": "video", "webm": "video", "mov": "video", # Documents "pdf": "document", "doc": "document", "docx": "document", "xls": "document", "xlsx": "document", "csv": "document", "txt": "document", } # Maximum file sizes (in bytes) MAX_FILE_SIZES = { "image": 10 * 1024 * 1024, # 10 MB "video": 100 * 1024 * 1024, # 100 MB "document": 20 * 1024 * 1024, # 20 MB } # Thumbnail settings THUMBNAIL_SIZE = (200, 200) # Image variant settings (ported from ImageService) IMAGE_VARIANT_QUALITY = 85 IMAGE_MAX_DIMENSION = 2000 IMAGE_VARIANT_SIZES = { "800": 800, "200": 200, } class MediaService: """Service for store media library operations.""" def _get_store_upload_path(self, store_id: int, folder: str = "general") -> Path: """Get the upload directory path for a store.""" return STORE_UPLOAD_DIR / str(store_id) / folder def _ensure_upload_dir(self, path: Path) -> None: """Ensure upload directory exists.""" path.mkdir(parents=True, exist_ok=True) def _get_file_extension(self, filename: str) -> str: """Extract file extension from filename.""" return filename.rsplit(".", 1)[-1].lower() if "." in filename else "" def _get_media_type(self, extension: str) -> str | None: """Get media type from file extension.""" return ALLOWED_EXTENSIONS.get(extension) def _generate_unique_filename(self, original_filename: str) -> str: """Generate a unique filename using UUID.""" ext = self._get_file_extension(original_filename) return f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex def _validate_file( self, filename: str, file_size: int ) -> tuple[str, str]: """ Validate uploaded file. Returns: Tuple of (extension, media_type) Raises: MediaValidationException: If file is invalid UnsupportedMediaTypeException: If file type is not supported MediaFileTooLargeException: If file exceeds size limit """ ext = self._get_file_extension(filename) if not ext: raise MediaValidationException("File must have an extension", field="file") media_type = self._get_media_type(ext) if not media_type: raise UnsupportedMediaTypeException( ext, allowed_types=list(ALLOWED_EXTENSIONS.keys()) ) max_size = MAX_FILE_SIZES.get(media_type, 10 * 1024 * 1024) if file_size > max_size: raise MediaFileTooLargeException(file_size, max_size, media_type) return ext, media_type def _get_image_dimensions(self, file_path: Path) -> tuple[int, int] | None: """Get image dimensions if PIL is available.""" try: from PIL import Image with Image.open(file_path) as img: return img.size except ImportError: logger.debug("PIL not available, skipping image dimension detection") return None except OSError as e: logger.warning(f"Could not get image dimensions: {e}") return None def _resize_image(self, img: "Image.Image", max_dimension: int) -> "Image.Image": """Resize image while maintaining aspect ratio. Args: img: PIL Image max_dimension: Maximum width or height Returns: Resized PIL Image (or original if already smaller) """ width, height = img.size if width <= max_dimension and height <= max_dimension: return img if width > height: new_width = max_dimension new_height = int(height * (max_dimension / width)) else: new_height = max_dimension new_width = int(width * (max_dimension / height)) from PIL import Image return img.resize((new_width, new_height), Image.Resampling.LANCZOS) def _generate_image_variants( self, source_path: Path, filename_stem: str ) -> dict[str, str]: """Generate WebP image variants at multiple sizes. Args: source_path: Path to the original image file filename_stem: UUID stem for naming variants Returns: Dict of size_name -> relative path (from UPLOAD_DIR) """ try: from PIL import Image variants: dict[str, str] = {} parent_dir = source_path.parent with Image.open(source_path) as img: # Convert to RGB if needed (for PNG with transparency) if img.mode in ("RGBA", "P"): img = img.convert("RGB") # Resize original if larger than max dimension img = self._resize_image(img, IMAGE_MAX_DIMENSION) for size_name, max_dim in IMAGE_VARIANT_SIZES.items(): variant_img = self._resize_image(img.copy(), max_dim) variant_filename = f"{filename_stem}_{size_name}.webp" variant_path = parent_dir / variant_filename variant_img.save( variant_path, "WEBP", quality=IMAGE_VARIANT_QUALITY ) variants[size_name] = str( variant_path.relative_to(UPLOAD_DIR) ) logger.debug( f"Generated {size_name}px variant: {variant_path}" ) return variants except ImportError: logger.debug("PIL not available, skipping variant generation") return {} except OSError as e: logger.warning(f"Could not generate image variants: {e}") return {} async def upload_file( self, db: Session, store_id: int, file_content: bytes, filename: str, folder: str = "general", ) -> MediaFile: """ Upload a file to the media library. Args: db: Database session store_id: Store ID file_content: File content as bytes filename: Original filename folder: Folder to store in (products, general, etc.) Returns: Created MediaFile record """ # Validate file file_size = len(file_content) ext, media_type = self._validate_file(filename, file_size) # Generate unique filename unique_filename = self._generate_unique_filename(filename) # Get upload path upload_path = self._get_store_upload_path(store_id, folder) self._ensure_upload_dir(upload_path) # Save file file_path = upload_path / unique_filename file_path.write_bytes(file_content) # Get relative path for storage relative_path = str(file_path.relative_to(UPLOAD_DIR)) # Get MIME type mime_type, _ = mimetypes.guess_type(filename) # Get image dimensions and generate variants width, height = None, None thumbnail_path = None extra_metadata: dict[str, Any] = {} if media_type == "image": dimensions = self._get_image_dimensions(file_path) if dimensions: width, height = dimensions # Generate WebP variants (800px, 200px) stem = unique_filename.rsplit(".", 1)[0] if "." in unique_filename else unique_filename variants = self._generate_image_variants(file_path, stem) if variants: extra_metadata["variants"] = variants # Use the 200px variant as thumbnail if "200" in variants: thumbnail_path = variants["200"] # Create database record media_file = MediaFile( store_id=store_id, filename=unique_filename, original_filename=filename, file_path=relative_path, media_type=media_type, mime_type=mime_type, file_size=file_size, width=width, height=height, thumbnail_path=thumbnail_path, folder=folder, extra_metadata=extra_metadata if extra_metadata else None, ) db.add(media_file) db.flush() db.refresh(media_file) logger.info( f"Uploaded media file {media_file.id} for store {store_id}: {filename}" ) return media_file def get_media( self, db: Session, store_id: int, media_id: int ) -> MediaFile: """ Get a media file by ID. Raises: MediaNotFoundException: If media not found or doesn't belong to store """ media = ( db.query(MediaFile) .filter( MediaFile.id == media_id, MediaFile.store_id == store_id, ) .first() ) if not media: raise MediaNotFoundException(media_id) return media def get_media_library( self, db: Session, store_id: int, skip: int = 0, limit: int = 100, media_type: str | None = None, folder: str | None = None, search: str | None = None, ) -> tuple[list[MediaFile], int]: """ Get store media library with filtering. Args: db: Database session store_id: Store ID skip: Pagination offset limit: Pagination limit media_type: Filter by media type folder: Filter by folder search: Search in filename Returns: Tuple of (media_files, total_count) """ query = db.query(MediaFile).filter(MediaFile.store_id == store_id) if media_type: query = query.filter(MediaFile.media_type == media_type) if folder: query = query.filter(MediaFile.folder == folder) if search: search_pattern = f"%{search}%" query = query.filter( or_( MediaFile.filename.ilike(search_pattern), MediaFile.original_filename.ilike(search_pattern), MediaFile.alt_text.ilike(search_pattern), ) ) # Order by newest first query = query.order_by(MediaFile.created_at.desc()) total = query.count() media_files = query.offset(skip).limit(limit).all() return media_files, total def update_media_metadata( self, db: Session, store_id: int, media_id: int, filename: str | None = None, alt_text: str | None = None, description: str | None = None, folder: str | None = None, metadata: dict | None = None, ) -> MediaFile: """ Update media file metadata. Args: db: Database session store_id: Store ID media_id: Media file ID filename: New display filename alt_text: Alt text for images description: File description folder: Move to different folder metadata: Additional metadata Returns: Updated MediaFile """ media = self.get_media(db, store_id, media_id) if filename is not None: media.original_filename = filename if alt_text is not None: media.alt_text = alt_text if description is not None: media.description = description if folder is not None and folder != media.folder: # Move file to new folder old_path = UPLOAD_DIR / media.file_path new_dir = self._get_store_upload_path(store_id, folder) self._ensure_upload_dir(new_dir) new_path = new_dir / media.filename if old_path.exists(): shutil.move(str(old_path), str(new_path)) media.file_path = str(new_path.relative_to(UPLOAD_DIR)) media.folder = folder if metadata is not None: media.extra_metadata = metadata media.updated_at = datetime.now(UTC) db.flush() logger.info(f"Updated media metadata for {media_id}") return media def delete_media( self, db: Session, store_id: int, media_id: int ) -> bool: """ Delete a media file. Args: db: Database session store_id: Store ID media_id: Media file ID Returns: True if deleted successfully """ media = self.get_media(db, store_id, media_id) # Delete physical files file_path = UPLOAD_DIR / media.file_path if file_path.exists(): file_path.unlink() if media.thumbnail_path: thumb_path = UPLOAD_DIR / media.thumbnail_path if thumb_path.exists(): thumb_path.unlink() # Delete variant files if media.extra_metadata and "variants" in media.extra_metadata: for variant_path in media.extra_metadata["variants"].values(): vpath = UPLOAD_DIR / variant_path if vpath.exists(): vpath.unlink() # Delete database record db.delete(media) logger.info(f"Deleted media file {media_id} for store {store_id}") return True def get_storage_stats(self, db: Session) -> dict: """Get storage statistics from MediaFile records. Returns: Dict with storage metrics for health monitoring. """ total_files = db.query(func.count(MediaFile.id)).scalar() or 0 total_size = db.query(func.sum(MediaFile.file_size)).scalar() or 0 # Count distinct folders as proxy for directory count directory_count = ( db.query(func.count(func.distinct(MediaFile.folder))).scalar() or 0 ) # Image-specific stats image_count = ( db.query(func.count(MediaFile.id)) .filter(MediaFile.media_type == "image") .scalar() or 0 ) return { "total_files": total_files, "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2) if total_size else 0, "total_size_gb": round(total_size / (1024 * 1024 * 1024), 3) if total_size else 0, "directory_count": directory_count, "max_files_per_dir": 0, # Not applicable for DB-backed tracking "avg_files_per_dir": round(total_files / directory_count, 1) if directory_count else 0, "products_estimated": image_count, } # Create service instance media_service = MediaService()