Files
orion/app/services/image_service.py
Samir Boulahtit e21abd4c32 fix: suppress false positive security warnings with noqa comments
- Add SEC-034 noqa comments to HTTP/HTTPS validation code
- Add SEC-041 noqa to MD5 hash used for cache keys (not crypto)
- Add {# sanitized #} comments to templates using |safe filter
- Fix validator regex to detect sanitized comments after Jinja closing tags
- Add vendor/** to ignore list for third-party libraries

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-25 22:21:14 +01:00

308 lines
9.6 KiB
Python

# app/services/image_service.py
"""
Image upload and management service.
Provides:
- Image upload with automatic optimization
- WebP conversion
- Multiple size variant generation
- Sharded directory structure for performance
"""
import hashlib
import logging
import os
from datetime import datetime
from io import BytesIO
from pathlib import Path
from PIL import Image
from app.exceptions import ValidationException
logger = logging.getLogger(__name__)
# Maximum upload size (10MB)
MAX_UPLOAD_SIZE = 10 * 1024 * 1024
class ImageService:
"""Service for image upload and management."""
# Supported image formats
ALLOWED_EXTENSIONS = {"jpg", "jpeg", "png", "gif", "webp"}
ALLOWED_CONTENT_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
# Size variants to generate
SIZES = {
"original": None, # No max dimension, just optimize
"800": 800, # Medium size for product cards
"200": 200, # Thumbnail for grids
}
# Quality settings
QUALITY = 85
MAX_DIMENSION = 2000 # Max dimension for original
def __init__(self, upload_dir: str = "static/uploads"):
"""Initialize image service.
Args:
upload_dir: Base directory for uploads (relative to project root)
"""
self.upload_dir = Path(upload_dir)
self.products_dir = self.upload_dir / "products"
# Ensure directories exist
self.products_dir.mkdir(parents=True, exist_ok=True)
def upload_product_image(
self,
file_content: bytes,
filename: str,
vendor_id: int,
product_id: int | None = None,
content_type: str | None = None,
) -> dict:
"""Upload and process a product image.
Args:
file_content: Raw file bytes
filename: Original filename
vendor_id: Vendor ID for path generation
product_id: Optional product ID
content_type: MIME type of the uploaded file
Returns:
Dict with image info and URLs
Raises:
ValidationException: If file is too large or invalid type
"""
# Validate file size
if len(file_content) > MAX_UPLOAD_SIZE:
raise ValidationException(
f"File too large. Maximum size: {MAX_UPLOAD_SIZE // (1024*1024)}MB"
)
# Validate content type
if not content_type or not content_type.startswith("image/"):
raise ValidationException("Invalid file type. Only images are allowed.")
# Validate file extension
ext = self._get_extension(filename)
if ext not in self.ALLOWED_EXTENSIONS:
raise ValidationException(
f"Invalid file type: {ext}. Allowed: {', '.join(self.ALLOWED_EXTENSIONS)}"
)
# Generate unique hash for this image
image_hash = self._generate_hash(vendor_id, product_id, filename)
# Determine sharded directory path
shard_path = self._get_shard_path(image_hash)
full_dir = self.products_dir / shard_path
full_dir.mkdir(parents=True, exist_ok=True)
# Load and process image
try:
img = Image.open(BytesIO(file_content))
# Convert to RGB if necessary (for PNG with alpha)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# Get original dimensions
original_width, original_height = img.size
# Process and save variants
urls = {}
total_size = 0
for size_name, max_dim in self.SIZES.items():
processed_img = self._resize_image(img.copy(), max_dim)
file_path = full_dir / f"{image_hash}_{size_name}.webp"
# Save as WebP
processed_img.save(file_path, "WEBP", quality=self.QUALITY)
# Track size
file_size = file_path.stat().st_size
total_size += file_size
# Generate URL path (relative to static)
url_path = f"/static/uploads/products/{shard_path}/{image_hash}_{size_name}.webp"
urls[size_name] = url_path
logger.debug(f"Saved {size_name}: {file_path} ({file_size} bytes)")
logger.info(
f"Uploaded image {image_hash} for vendor {vendor_id}: "
f"{len(urls)} variants, {total_size} bytes total"
)
return {
"id": image_hash,
"urls": urls,
"size_bytes": total_size,
"dimensions": {
"width": original_width,
"height": original_height,
},
"path": str(shard_path),
}
except Exception as e:
logger.error(f"Failed to process image: {e}")
raise ValueError(f"Failed to process image: {e}")
def delete_product_image(self, image_hash: str) -> bool:
"""Delete all variants of a product image.
Args:
image_hash: The image hash/ID
Returns:
True if deleted, False if not found
"""
shard_path = self._get_shard_path(image_hash)
full_dir = self.products_dir / shard_path
if not full_dir.exists():
return False
deleted = False
for size_name in self.SIZES:
file_path = full_dir / f"{image_hash}_{size_name}.webp"
if file_path.exists():
file_path.unlink()
deleted = True
logger.debug(f"Deleted: {file_path}")
# Clean up empty directories
self._cleanup_empty_dirs(full_dir)
if deleted:
logger.info(f"Deleted image {image_hash}")
return deleted
def get_storage_stats(self) -> dict:
"""Get storage statistics.
Returns:
Dict with storage metrics
"""
total_files = 0
total_size = 0
max_files_per_dir = 0
dir_count = 0
for root, dirs, files in os.walk(self.products_dir):
webp_files = [f for f in files if f.endswith(".webp")]
file_count = len(webp_files)
total_files += file_count
if file_count > 0:
dir_count += 1
max_files_per_dir = max(max_files_per_dir, file_count)
for f in webp_files:
file_path = Path(root) / f
total_size += file_path.stat().st_size
# Calculate average files per directory
avg_files_per_dir = total_files / dir_count if dir_count > 0 else 0
return {
"total_files": total_files,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"total_size_gb": round(total_size / (1024 * 1024 * 1024), 3),
"directory_count": dir_count,
"max_files_per_dir": max_files_per_dir,
"avg_files_per_dir": round(avg_files_per_dir, 1),
"products_estimated": total_files // 3, # 3 variants per image
}
def _generate_hash(
self, vendor_id: int, product_id: int | None, filename: str
) -> str:
"""Generate unique hash for image.
Args:
vendor_id: Vendor ID
product_id: Product ID (optional)
filename: Original filename
Returns:
8-character hex hash
"""
timestamp = datetime.utcnow().isoformat()
content = f"{vendor_id}:{product_id}:{timestamp}:{filename}"
return hashlib.md5(content.encode()).hexdigest()[:8] # noqa: SEC-041
def _get_shard_path(self, image_hash: str) -> str:
"""Get sharded directory path from hash.
Uses first 4 characters to create 2-level directory structure.
This creates 256 possible directories at each level.
Args:
image_hash: 8-character hash
Returns:
Path like "0a/1b"
"""
return f"{image_hash[:2]}/{image_hash[2:4]}"
def _get_extension(self, filename: str) -> str:
"""Get lowercase file extension."""
return filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
def _resize_image(self, img: Image.Image, max_dimension: int | None) -> Image.Image:
"""Resize image while maintaining aspect ratio.
Args:
img: PIL Image
max_dimension: Maximum width or height (None = use MAX_DIMENSION)
Returns:
Resized PIL Image
"""
if max_dimension is None:
max_dimension = self.MAX_DIMENSION
width, height = img.size
# Only resize if larger than max
if width <= max_dimension and height <= max_dimension:
return img
# Calculate new dimensions maintaining aspect ratio
if width > height:
new_width = max_dimension
new_height = int(height * (max_dimension / width))
else:
new_height = max_dimension
new_width = int(width * (max_dimension / height))
return img.resize((new_width, new_height), Image.Resampling.LANCZOS)
def _cleanup_empty_dirs(self, dir_path: Path):
"""Remove empty directories up the tree."""
try:
# Try to remove the directory and its parents if empty
while dir_path != self.products_dir:
if dir_path.exists() and not any(dir_path.iterdir()):
dir_path.rmdir()
dir_path = dir_path.parent
else:
break
except OSError:
pass # Directory not empty or other error
# Create service instance
image_service = ImageService()