# app/services/media_service.py """ Media service for vendor media library management. This module provides: - File upload and storage - Thumbnail generation for images - Media metadata management - Media usage tracking """ import logging import mimetypes import os import shutil import uuid from datetime import UTC, datetime from pathlib import Path from sqlalchemy import or_ from sqlalchemy.orm import Session from app.exceptions.media import ( MediaNotFoundException, MediaUploadException, MediaValidationException, UnsupportedMediaTypeException, MediaFileTooLargeException, ) from models.database.media import MediaFile, ProductMedia logger = logging.getLogger(__name__) # Base upload directory UPLOAD_DIR = Path("uploads") VENDOR_UPLOAD_DIR = UPLOAD_DIR / "vendors" # Allowed file types and their categories ALLOWED_EXTENSIONS = { # Images "jpg": "image", "jpeg": "image", "png": "image", "gif": "image", "webp": "image", "svg": "image", # Videos "mp4": "video", "webm": "video", "mov": "video", # Documents "pdf": "document", "doc": "document", "docx": "document", "xls": "document", "xlsx": "document", "csv": "document", "txt": "document", } # Maximum file sizes (in bytes) MAX_FILE_SIZES = { "image": 10 * 1024 * 1024, # 10 MB "video": 100 * 1024 * 1024, # 100 MB "document": 20 * 1024 * 1024, # 20 MB } # Thumbnail settings THUMBNAIL_SIZE = (200, 200) class MediaService: """Service for vendor media library operations.""" def _get_vendor_upload_path(self, vendor_id: int, folder: str = "general") -> Path: """Get the upload directory path for a vendor.""" return VENDOR_UPLOAD_DIR / str(vendor_id) / folder def _ensure_upload_dir(self, path: Path) -> None: """Ensure upload directory exists.""" path.mkdir(parents=True, exist_ok=True) def _get_file_extension(self, filename: str) -> str: """Extract file extension from filename.""" return filename.rsplit(".", 1)[-1].lower() if "." in filename else "" def _get_media_type(self, extension: str) -> str | None: """Get media type from file extension.""" return ALLOWED_EXTENSIONS.get(extension) def _generate_unique_filename(self, original_filename: str) -> str: """Generate a unique filename using UUID.""" ext = self._get_file_extension(original_filename) return f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex def _validate_file( self, filename: str, file_size: int ) -> tuple[str, str]: """ Validate uploaded file. Returns: Tuple of (extension, media_type) Raises: MediaValidationException: If file is invalid UnsupportedMediaTypeException: If file type is not supported MediaFileTooLargeException: If file exceeds size limit """ ext = self._get_file_extension(filename) if not ext: raise MediaValidationException("File must have an extension", field="file") media_type = self._get_media_type(ext) if not media_type: raise UnsupportedMediaTypeException( ext, allowed_types=list(ALLOWED_EXTENSIONS.keys()) ) max_size = MAX_FILE_SIZES.get(media_type, 10 * 1024 * 1024) if file_size > max_size: raise MediaFileTooLargeException(file_size, max_size, media_type) return ext, media_type def _get_image_dimensions(self, file_path: Path) -> tuple[int, int] | None: """Get image dimensions if PIL is available.""" try: from PIL import Image with Image.open(file_path) as img: return img.size except ImportError: logger.debug("PIL not available, skipping image dimension detection") return None except Exception as e: logger.warning(f"Could not get image dimensions: {e}") return None def _generate_thumbnail( self, source_path: Path, vendor_id: int ) -> str | None: """Generate thumbnail for image file.""" try: from PIL import Image # Create thumbnails directory thumb_dir = self._get_vendor_upload_path(vendor_id, "thumbnails") self._ensure_upload_dir(thumb_dir) # Generate thumbnail filename thumb_filename = f"thumb_{source_path.name}" thumb_path = thumb_dir / thumb_filename # Create thumbnail with Image.open(source_path) as img: img.thumbnail(THUMBNAIL_SIZE) # Convert to RGB if needed (for PNG with transparency) if img.mode in ("RGBA", "P"): img = img.convert("RGB") img.save(thumb_path, "JPEG", quality=85) # Return relative path return str(thumb_path.relative_to(UPLOAD_DIR)) except ImportError: logger.debug("PIL not available, skipping thumbnail generation") return None except Exception as e: logger.warning(f"Could not generate thumbnail: {e}") return None async def upload_file( self, db: Session, vendor_id: int, file_content: bytes, filename: str, folder: str = "general", ) -> MediaFile: """ Upload a file to the media library. Args: db: Database session vendor_id: Vendor ID file_content: File content as bytes filename: Original filename folder: Folder to store in (products, general, etc.) Returns: Created MediaFile record """ # Validate file file_size = len(file_content) ext, media_type = self._validate_file(filename, file_size) # Generate unique filename unique_filename = self._generate_unique_filename(filename) # Get upload path upload_path = self._get_vendor_upload_path(vendor_id, folder) self._ensure_upload_dir(upload_path) # Save file file_path = upload_path / unique_filename file_path.write_bytes(file_content) # Get relative path for storage relative_path = str(file_path.relative_to(UPLOAD_DIR)) # Get MIME type mime_type, _ = mimetypes.guess_type(filename) # Get image dimensions and generate thumbnail width, height = None, None thumbnail_path = None if media_type == "image": dimensions = self._get_image_dimensions(file_path) if dimensions: width, height = dimensions thumbnail_path = self._generate_thumbnail(file_path, vendor_id) # Create database record media_file = MediaFile( vendor_id=vendor_id, filename=unique_filename, original_filename=filename, file_path=relative_path, media_type=media_type, mime_type=mime_type, file_size=file_size, width=width, height=height, thumbnail_path=thumbnail_path, folder=folder, ) db.add(media_file) db.flush() db.refresh(media_file) logger.info( f"Uploaded media file {media_file.id} for vendor {vendor_id}: {filename}" ) return media_file def get_media( self, db: Session, vendor_id: int, media_id: int ) -> MediaFile: """ Get a media file by ID. Raises: MediaNotFoundException: If media not found or doesn't belong to vendor """ media = ( db.query(MediaFile) .filter( MediaFile.id == media_id, MediaFile.vendor_id == vendor_id, ) .first() ) if not media: raise MediaNotFoundException(media_id) return media def get_media_library( self, db: Session, vendor_id: int, skip: int = 0, limit: int = 100, media_type: str | None = None, folder: str | None = None, search: str | None = None, ) -> tuple[list[MediaFile], int]: """ Get vendor media library with filtering. Args: db: Database session vendor_id: Vendor ID skip: Pagination offset limit: Pagination limit media_type: Filter by media type folder: Filter by folder search: Search in filename Returns: Tuple of (media_files, total_count) """ query = db.query(MediaFile).filter(MediaFile.vendor_id == vendor_id) if media_type: query = query.filter(MediaFile.media_type == media_type) if folder: query = query.filter(MediaFile.folder == folder) if search: search_pattern = f"%{search}%" query = query.filter( or_( MediaFile.filename.ilike(search_pattern), MediaFile.original_filename.ilike(search_pattern), MediaFile.alt_text.ilike(search_pattern), ) ) # Order by newest first query = query.order_by(MediaFile.created_at.desc()) total = query.count() media_files = query.offset(skip).limit(limit).all() return media_files, total def update_media_metadata( self, db: Session, vendor_id: int, media_id: int, filename: str | None = None, alt_text: str | None = None, description: str | None = None, folder: str | None = None, metadata: dict | None = None, ) -> MediaFile: """ Update media file metadata. Args: db: Database session vendor_id: Vendor ID media_id: Media file ID filename: New display filename alt_text: Alt text for images description: File description folder: Move to different folder metadata: Additional metadata Returns: Updated MediaFile """ media = self.get_media(db, vendor_id, media_id) if filename is not None: media.original_filename = filename if alt_text is not None: media.alt_text = alt_text if description is not None: media.description = description if folder is not None and folder != media.folder: # Move file to new folder old_path = UPLOAD_DIR / media.file_path new_dir = self._get_vendor_upload_path(vendor_id, folder) self._ensure_upload_dir(new_dir) new_path = new_dir / media.filename if old_path.exists(): shutil.move(str(old_path), str(new_path)) media.file_path = str(new_path.relative_to(UPLOAD_DIR)) media.folder = folder if metadata is not None: media.extra_metadata = metadata media.updated_at = datetime.now(UTC) db.flush() logger.info(f"Updated media metadata for {media_id}") return media def delete_media( self, db: Session, vendor_id: int, media_id: int ) -> bool: """ Delete a media file. Args: db: Database session vendor_id: Vendor ID media_id: Media file ID Returns: True if deleted successfully """ media = self.get_media(db, vendor_id, media_id) # Delete physical files file_path = UPLOAD_DIR / media.file_path if file_path.exists(): file_path.unlink() if media.thumbnail_path: thumb_path = UPLOAD_DIR / media.thumbnail_path if thumb_path.exists(): thumb_path.unlink() # Delete database record db.delete(media) logger.info(f"Deleted media file {media_id} for vendor {vendor_id}") return True def get_media_usage( self, db: Session, vendor_id: int, media_id: int ) -> dict: """ Get where a media file is being used. Returns: Dict with products and other usage information """ media = self.get_media(db, vendor_id, media_id) # Get product associations product_usage = [] for assoc in media.product_associations: product = assoc.product if product: product_usage.append({ "product_id": product.id, "product_name": product.get_title() or f"Product {product.id}", "usage_type": assoc.usage_type, }) return { "media_id": media_id, "products": product_usage, "other_usage": [], "total_usage_count": len(product_usage), } def attach_to_product( self, db: Session, vendor_id: int, media_id: int, product_id: int, usage_type: str = "gallery", display_order: int = 0, ) -> ProductMedia: """ Attach a media file to a product. Args: db: Database session vendor_id: Vendor ID media_id: Media file ID product_id: Product ID usage_type: How the media is used (main_image, gallery, etc.) display_order: Order for galleries Returns: Created ProductMedia association """ # Verify media belongs to vendor media = self.get_media(db, vendor_id, media_id) # Check if already attached with same usage type existing = ( db.query(ProductMedia) .filter( ProductMedia.product_id == product_id, ProductMedia.media_id == media_id, ProductMedia.usage_type == usage_type, ) .first() ) if existing: existing.display_order = display_order db.flush() return existing # Create association product_media = ProductMedia( product_id=product_id, media_id=media_id, usage_type=usage_type, display_order=display_order, ) db.add(product_media) # Update usage count media.usage_count = (media.usage_count or 0) + 1 db.flush() return product_media def detach_from_product( self, db: Session, vendor_id: int, media_id: int, product_id: int, usage_type: str | None = None, ) -> bool: """ Detach a media file from a product. Args: db: Database session vendor_id: Vendor ID media_id: Media file ID product_id: Product ID usage_type: Specific usage type to remove (None = all) Returns: True if detached """ # Verify media belongs to vendor media = self.get_media(db, vendor_id, media_id) query = db.query(ProductMedia).filter( ProductMedia.product_id == product_id, ProductMedia.media_id == media_id, ) if usage_type: query = query.filter(ProductMedia.usage_type == usage_type) deleted_count = query.delete() # Update usage count if deleted_count > 0: media.usage_count = max(0, (media.usage_count or 0) - deleted_count) db.flush() return deleted_count > 0 # Create service instance media_service = MediaService()