# app/services/image_service.py """ Image upload and management service. Provides: - Image upload with automatic optimization - WebP conversion - Multiple size variant generation - Sharded directory structure for performance """ import hashlib import logging import os from datetime import datetime from io import BytesIO from pathlib import Path from PIL import Image from app.exceptions import ValidationException logger = logging.getLogger(__name__) # Maximum upload size (10MB) MAX_UPLOAD_SIZE = 10 * 1024 * 1024 class ImageService: """Service for image upload and management.""" # Supported image formats ALLOWED_EXTENSIONS = {"jpg", "jpeg", "png", "gif", "webp"} ALLOWED_CONTENT_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} # Size variants to generate SIZES = { "original": None, # No max dimension, just optimize "800": 800, # Medium size for product cards "200": 200, # Thumbnail for grids } # Quality settings QUALITY = 85 MAX_DIMENSION = 2000 # Max dimension for original def __init__(self, upload_dir: str = "static/uploads"): """Initialize image service. Args: upload_dir: Base directory for uploads (relative to project root) """ self.upload_dir = Path(upload_dir) self.products_dir = self.upload_dir / "products" # Ensure directories exist self.products_dir.mkdir(parents=True, exist_ok=True) def upload_product_image( self, file_content: bytes, filename: str, vendor_id: int, product_id: int | None = None, content_type: str | None = None, ) -> dict: """Upload and process a product image. Args: file_content: Raw file bytes filename: Original filename vendor_id: Vendor ID for path generation product_id: Optional product ID content_type: MIME type of the uploaded file Returns: Dict with image info and URLs Raises: ValidationException: If file is too large or invalid type """ # Validate file size if len(file_content) > MAX_UPLOAD_SIZE: raise ValidationException( f"File too large. Maximum size: {MAX_UPLOAD_SIZE // (1024*1024)}MB" ) # Validate content type if not content_type or not content_type.startswith("image/"): raise ValidationException("Invalid file type. Only images are allowed.") # Validate file extension ext = self._get_extension(filename) if ext not in self.ALLOWED_EXTENSIONS: raise ValidationException( f"Invalid file type: {ext}. Allowed: {', '.join(self.ALLOWED_EXTENSIONS)}" ) # Generate unique hash for this image image_hash = self._generate_hash(vendor_id, product_id, filename) # Determine sharded directory path shard_path = self._get_shard_path(image_hash) full_dir = self.products_dir / shard_path full_dir.mkdir(parents=True, exist_ok=True) # Load and process image try: img = Image.open(BytesIO(file_content)) # Convert to RGB if necessary (for PNG with alpha) if img.mode in ("RGBA", "P"): img = img.convert("RGB") # Get original dimensions original_width, original_height = img.size # Process and save variants urls = {} total_size = 0 for size_name, max_dim in self.SIZES.items(): processed_img = self._resize_image(img.copy(), max_dim) file_path = full_dir / f"{image_hash}_{size_name}.webp" # Save as WebP processed_img.save(file_path, "WEBP", quality=self.QUALITY) # Track size file_size = file_path.stat().st_size total_size += file_size # Generate URL path (relative to static) url_path = f"/static/uploads/products/{shard_path}/{image_hash}_{size_name}.webp" urls[size_name] = url_path logger.debug(f"Saved {size_name}: {file_path} ({file_size} bytes)") logger.info( f"Uploaded image {image_hash} for vendor {vendor_id}: " f"{len(urls)} variants, {total_size} bytes total" ) return { "id": image_hash, "urls": urls, "size_bytes": total_size, "dimensions": { "width": original_width, "height": original_height, }, "path": str(shard_path), } except Exception as e: logger.error(f"Failed to process image: {e}") raise ValueError(f"Failed to process image: {e}") def delete_product_image(self, image_hash: str) -> bool: """Delete all variants of a product image. Args: image_hash: The image hash/ID Returns: True if deleted, False if not found """ shard_path = self._get_shard_path(image_hash) full_dir = self.products_dir / shard_path if not full_dir.exists(): return False deleted = False for size_name in self.SIZES: file_path = full_dir / f"{image_hash}_{size_name}.webp" if file_path.exists(): file_path.unlink() deleted = True logger.debug(f"Deleted: {file_path}") # Clean up empty directories self._cleanup_empty_dirs(full_dir) if deleted: logger.info(f"Deleted image {image_hash}") return deleted def get_storage_stats(self) -> dict: """Get storage statistics. Returns: Dict with storage metrics """ total_files = 0 total_size = 0 max_files_per_dir = 0 dir_count = 0 for root, dirs, files in os.walk(self.products_dir): webp_files = [f for f in files if f.endswith(".webp")] file_count = len(webp_files) total_files += file_count if file_count > 0: dir_count += 1 max_files_per_dir = max(max_files_per_dir, file_count) for f in webp_files: file_path = Path(root) / f total_size += file_path.stat().st_size # Calculate average files per directory avg_files_per_dir = total_files / dir_count if dir_count > 0 else 0 return { "total_files": total_files, "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2), "total_size_gb": round(total_size / (1024 * 1024 * 1024), 3), "directory_count": dir_count, "max_files_per_dir": max_files_per_dir, "avg_files_per_dir": round(avg_files_per_dir, 1), "products_estimated": total_files // 3, # 3 variants per image } def _generate_hash( self, vendor_id: int, product_id: int | None, filename: str ) -> str: """Generate unique hash for image. Args: vendor_id: Vendor ID product_id: Product ID (optional) filename: Original filename Returns: 8-character hex hash """ timestamp = datetime.utcnow().isoformat() content = f"{vendor_id}:{product_id}:{timestamp}:{filename}" return hashlib.md5(content.encode()).hexdigest()[:8] # noqa: SEC-041 def _get_shard_path(self, image_hash: str) -> str: """Get sharded directory path from hash. Uses first 4 characters to create 2-level directory structure. This creates 256 possible directories at each level. Args: image_hash: 8-character hash Returns: Path like "0a/1b" """ return f"{image_hash[:2]}/{image_hash[2:4]}" def _get_extension(self, filename: str) -> str: """Get lowercase file extension.""" return filename.rsplit(".", 1)[-1].lower() if "." in filename else "" def _resize_image(self, img: Image.Image, max_dimension: int | None) -> Image.Image: """Resize image while maintaining aspect ratio. Args: img: PIL Image max_dimension: Maximum width or height (None = use MAX_DIMENSION) Returns: Resized PIL Image """ if max_dimension is None: max_dimension = self.MAX_DIMENSION width, height = img.size # Only resize if larger than max if width <= max_dimension and height <= max_dimension: return img # Calculate new dimensions maintaining aspect ratio if width > height: new_width = max_dimension new_height = int(height * (max_dimension / width)) else: new_height = max_dimension new_width = int(width * (max_dimension / height)) return img.resize((new_width, new_height), Image.Resampling.LANCZOS) def _cleanup_empty_dirs(self, dir_path: Path): """Remove empty directories up the tree.""" try: # Try to remove the directory and its parents if empty while dir_path != self.products_dir: if dir_path.exists() and not any(dir_path.iterdir()): dir_path.rmdir() dir_path = dir_path.parent else: break except OSError: pass # Directory not empty or other error # Create service instance image_service = ImageService()