# app/modules/cms/services/media_service.py """ Media service for store media library management. This module provides: - File upload and storage - Thumbnail generation for images - Media metadata management - Media usage tracking """ import logging import mimetypes import os import shutil import uuid from datetime import UTC, datetime from pathlib import Path from typing import Any from sqlalchemy import func, or_ from sqlalchemy.orm import Session from app.modules.cms.exceptions import ( MediaNotFoundException, MediaUploadException, MediaValidationException, UnsupportedMediaTypeException, MediaFileTooLargeException, ) from app.modules.cms.models import MediaFile logger = logging.getLogger(__name__) # Base upload directory UPLOAD_DIR = Path("uploads") STORE_UPLOAD_DIR = UPLOAD_DIR / "stores" # Allowed file types and their categories ALLOWED_EXTENSIONS = { # Images "jpg": "image", "jpeg": "image", "png": "image", "gif": "image", "webp": "image", "svg": "image", # Videos "mp4": "video", "webm": "video", "mov": "video", # Documents "pdf": "document", "doc": "document", "docx": "document", "xls": "document", "xlsx": "document", "csv": "document", "txt": "document", } # Maximum file sizes (in bytes) MAX_FILE_SIZES = { "image": 10 * 1024 * 1024, # 10 MB "video": 100 * 1024 * 1024, # 100 MB "document": 20 * 1024 * 1024, # 20 MB } # Thumbnail settings THUMBNAIL_SIZE = (200, 200) # Image variant settings (ported from ImageService) IMAGE_VARIANT_QUALITY = 85 IMAGE_MAX_DIMENSION = 2000 IMAGE_VARIANT_SIZES = { "800": 800, "200": 200, } class MediaService: """Service for store media library operations.""" def _get_store_upload_path(self, store_id: int, folder: str = "general") -> Path: """Get the upload directory path for a store.""" return STORE_UPLOAD_DIR / str(store_id) / folder def _ensure_upload_dir(self, path: Path) -> None: """Ensure upload directory exists.""" path.mkdir(parents=True, exist_ok=True) def _get_file_extension(self, filename: str) -> str: """Extract file extension from filename.""" return filename.rsplit(".", 1)[-1].lower() if "." in filename else "" def _get_media_type(self, extension: str) -> str | None: """Get media type from file extension.""" return ALLOWED_EXTENSIONS.get(extension) def _generate_unique_filename(self, original_filename: str) -> str: """Generate a unique filename using UUID.""" ext = self._get_file_extension(original_filename) return f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex def _validate_file( self, filename: str, file_size: int ) -> tuple[str, str]: """ Validate uploaded file. Returns: Tuple of (extension, media_type) Raises: MediaValidationException: If file is invalid UnsupportedMediaTypeException: If file type is not supported MediaFileTooLargeException: If file exceeds size limit """ ext = self._get_file_extension(filename) if not ext: raise MediaValidationException("File must have an extension", field="file") media_type = self._get_media_type(ext) if not media_type: raise UnsupportedMediaTypeException( ext, allowed_types=list(ALLOWED_EXTENSIONS.keys()) ) max_size = MAX_FILE_SIZES.get(media_type, 10 * 1024 * 1024) if file_size > max_size: raise MediaFileTooLargeException(file_size, max_size, media_type) return ext, media_type def _get_image_dimensions(self, file_path: Path) -> tuple[int, int] | None: """Get image dimensions if PIL is available.""" try: from PIL import Image with Image.open(file_path) as img: return img.size except ImportError: logger.debug("PIL not available, skipping image dimension detection") return None except Exception as e: logger.warning(f"Could not get image dimensions: {e}") return None def _resize_image(self, img: "Image.Image", max_dimension: int) -> "Image.Image": """Resize image while maintaining aspect ratio. Args: img: PIL Image max_dimension: Maximum width or height Returns: Resized PIL Image (or original if already smaller) """ width, height = img.size if width <= max_dimension and height <= max_dimension: return img if width > height: new_width = max_dimension new_height = int(height * (max_dimension / width)) else: new_height = max_dimension new_width = int(width * (max_dimension / height)) from PIL import Image return img.resize((new_width, new_height), Image.Resampling.LANCZOS) def _generate_image_variants( self, source_path: Path, filename_stem: str ) -> dict[str, str]: """Generate WebP image variants at multiple sizes. Args: source_path: Path to the original image file filename_stem: UUID stem for naming variants Returns: Dict of size_name -> relative path (from UPLOAD_DIR) """ try: from PIL import Image variants: dict[str, str] = {} parent_dir = source_path.parent with Image.open(source_path) as img: # Convert to RGB if needed (for PNG with transparency) if img.mode in ("RGBA", "P"): img = img.convert("RGB") # Resize original if larger than max dimension img = self._resize_image(img, IMAGE_MAX_DIMENSION) for size_name, max_dim in IMAGE_VARIANT_SIZES.items(): variant_img = self._resize_image(img.copy(), max_dim) variant_filename = f"{filename_stem}_{size_name}.webp" variant_path = parent_dir / variant_filename variant_img.save( variant_path, "WEBP", quality=IMAGE_VARIANT_QUALITY ) variants[size_name] = str( variant_path.relative_to(UPLOAD_DIR) ) logger.debug( f"Generated {size_name}px variant: {variant_path}" ) return variants except ImportError: logger.debug("PIL not available, skipping variant generation") return {} except Exception as e: logger.warning(f"Could not generate image variants: {e}") return {} async def upload_file( self, db: Session, store_id: int, file_content: bytes, filename: str, folder: str = "general", ) -> MediaFile: """ Upload a file to the media library. Args: db: Database session store_id: Store ID file_content: File content as bytes filename: Original filename folder: Folder to store in (products, general, etc.) Returns: Created MediaFile record """ # Validate file file_size = len(file_content) ext, media_type = self._validate_file(filename, file_size) # Generate unique filename unique_filename = self._generate_unique_filename(filename) # Get upload path upload_path = self._get_store_upload_path(store_id, folder) self._ensure_upload_dir(upload_path) # Save file file_path = upload_path / unique_filename file_path.write_bytes(file_content) # Get relative path for storage relative_path = str(file_path.relative_to(UPLOAD_DIR)) # Get MIME type mime_type, _ = mimetypes.guess_type(filename) # Get image dimensions and generate variants width, height = None, None thumbnail_path = None extra_metadata: dict[str, Any] = {} if media_type == "image": dimensions = self._get_image_dimensions(file_path) if dimensions: width, height = dimensions # Generate WebP variants (800px, 200px) stem = unique_filename.rsplit(".", 1)[0] if "." in unique_filename else unique_filename variants = self._generate_image_variants(file_path, stem) if variants: extra_metadata["variants"] = variants # Use the 200px variant as thumbnail if "200" in variants: thumbnail_path = variants["200"] # Create database record media_file = MediaFile( store_id=store_id, filename=unique_filename, original_filename=filename, file_path=relative_path, media_type=media_type, mime_type=mime_type, file_size=file_size, width=width, height=height, thumbnail_path=thumbnail_path, folder=folder, extra_metadata=extra_metadata if extra_metadata else None, ) db.add(media_file) db.flush() db.refresh(media_file) logger.info( f"Uploaded media file {media_file.id} for store {store_id}: {filename}" ) return media_file def get_media( self, db: Session, store_id: int, media_id: int ) -> MediaFile: """ Get a media file by ID. Raises: MediaNotFoundException: If media not found or doesn't belong to store """ media = ( db.query(MediaFile) .filter( MediaFile.id == media_id, MediaFile.store_id == store_id, ) .first() ) if not media: raise MediaNotFoundException(media_id) return media def get_media_library( self, db: Session, store_id: int, skip: int = 0, limit: int = 100, media_type: str | None = None, folder: str | None = None, search: str | None = None, ) -> tuple[list[MediaFile], int]: """ Get store media library with filtering. Args: db: Database session store_id: Store ID skip: Pagination offset limit: Pagination limit media_type: Filter by media type folder: Filter by folder search: Search in filename Returns: Tuple of (media_files, total_count) """ query = db.query(MediaFile).filter(MediaFile.store_id == store_id) if media_type: query = query.filter(MediaFile.media_type == media_type) if folder: query = query.filter(MediaFile.folder == folder) if search: search_pattern = f"%{search}%" query = query.filter( or_( MediaFile.filename.ilike(search_pattern), MediaFile.original_filename.ilike(search_pattern), MediaFile.alt_text.ilike(search_pattern), ) ) # Order by newest first query = query.order_by(MediaFile.created_at.desc()) total = query.count() media_files = query.offset(skip).limit(limit).all() return media_files, total def update_media_metadata( self, db: Session, store_id: int, media_id: int, filename: str | None = None, alt_text: str | None = None, description: str | None = None, folder: str | None = None, metadata: dict | None = None, ) -> MediaFile: """ Update media file metadata. Args: db: Database session store_id: Store ID media_id: Media file ID filename: New display filename alt_text: Alt text for images description: File description folder: Move to different folder metadata: Additional metadata Returns: Updated MediaFile """ media = self.get_media(db, store_id, media_id) if filename is not None: media.original_filename = filename if alt_text is not None: media.alt_text = alt_text if description is not None: media.description = description if folder is not None and folder != media.folder: # Move file to new folder old_path = UPLOAD_DIR / media.file_path new_dir = self._get_store_upload_path(store_id, folder) self._ensure_upload_dir(new_dir) new_path = new_dir / media.filename if old_path.exists(): shutil.move(str(old_path), str(new_path)) media.file_path = str(new_path.relative_to(UPLOAD_DIR)) media.folder = folder if metadata is not None: media.extra_metadata = metadata media.updated_at = datetime.now(UTC) db.flush() logger.info(f"Updated media metadata for {media_id}") return media def delete_media( self, db: Session, store_id: int, media_id: int ) -> bool: """ Delete a media file. Args: db: Database session store_id: Store ID media_id: Media file ID Returns: True if deleted successfully """ media = self.get_media(db, store_id, media_id) # Delete physical files file_path = UPLOAD_DIR / media.file_path if file_path.exists(): file_path.unlink() if media.thumbnail_path: thumb_path = UPLOAD_DIR / media.thumbnail_path if thumb_path.exists(): thumb_path.unlink() # Delete variant files if media.extra_metadata and "variants" in media.extra_metadata: for variant_path in media.extra_metadata["variants"].values(): vpath = UPLOAD_DIR / variant_path if vpath.exists(): vpath.unlink() # Delete database record db.delete(media) logger.info(f"Deleted media file {media_id} for store {store_id}") return True def get_storage_stats(self, db: Session) -> dict: """Get storage statistics from MediaFile records. Returns: Dict with storage metrics for health monitoring. """ total_files = db.query(func.count(MediaFile.id)).scalar() or 0 total_size = db.query(func.sum(MediaFile.file_size)).scalar() or 0 # Count distinct folders as proxy for directory count directory_count = ( db.query(func.count(func.distinct(MediaFile.folder))).scalar() or 0 ) # Image-specific stats image_count = ( db.query(func.count(MediaFile.id)) .filter(MediaFile.media_type == "image") .scalar() or 0 ) return { "total_files": total_files, "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2) if total_size else 0, "total_size_gb": round(total_size / (1024 * 1024 * 1024), 3) if total_size else 0, "directory_count": directory_count, "max_files_per_dir": 0, # Not applicable for DB-backed tracking "avg_files_per_dir": round(total_files / directory_count, 1) if directory_count else 0, "products_estimated": image_count, } # Create service instance media_service = MediaService()