Files
orion/app/services/media_service.py
Samir Boulahtit 0f9b80c634 refactor: migrate Feature to billing module and split ProductMedia to catalog
- Move Feature model from models/database/ to app/modules/billing/models/
  (tightly coupled to SubscriptionTier for tier-based access control)
- Move ProductMedia from models/database/media.py to app/modules/catalog/models/
  (product-specific media associations belong with catalog)
- Keep MediaFile as CORE in models/database/media.py (cross-cutting file storage)
- Convert legacy feature.py to re-export for backwards compatibility
- Update all imports to use canonical module locations

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 19:32:38 +01:00

553 lines
16 KiB
Python

# app/services/media_service.py
"""
Media service for vendor media library management.
This module provides:
- File upload and storage
- Thumbnail generation for images
- Media metadata management
- Media usage tracking
"""
import logging
import mimetypes
import os
import shutil
import uuid
from datetime import UTC, datetime
from pathlib import Path
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.exceptions.media import (
MediaNotFoundException,
MediaUploadException,
MediaValidationException,
UnsupportedMediaTypeException,
MediaFileTooLargeException,
)
from models.database.media import MediaFile
from app.modules.catalog.models import ProductMedia
logger = logging.getLogger(__name__)
# Base upload directory
UPLOAD_DIR = Path("uploads")
VENDOR_UPLOAD_DIR = UPLOAD_DIR / "vendors"
# Allowed file types and their categories
ALLOWED_EXTENSIONS = {
# Images
"jpg": "image",
"jpeg": "image",
"png": "image",
"gif": "image",
"webp": "image",
"svg": "image",
# Videos
"mp4": "video",
"webm": "video",
"mov": "video",
# Documents
"pdf": "document",
"doc": "document",
"docx": "document",
"xls": "document",
"xlsx": "document",
"csv": "document",
"txt": "document",
}
# Maximum file sizes (in bytes)
MAX_FILE_SIZES = {
"image": 10 * 1024 * 1024, # 10 MB
"video": 100 * 1024 * 1024, # 100 MB
"document": 20 * 1024 * 1024, # 20 MB
}
# Thumbnail settings
THUMBNAIL_SIZE = (200, 200)
class MediaService:
"""Service for vendor media library operations."""
def _get_vendor_upload_path(self, vendor_id: int, folder: str = "general") -> Path:
"""Get the upload directory path for a vendor."""
return VENDOR_UPLOAD_DIR / str(vendor_id) / folder
def _ensure_upload_dir(self, path: Path) -> None:
"""Ensure upload directory exists."""
path.mkdir(parents=True, exist_ok=True)
def _get_file_extension(self, filename: str) -> str:
"""Extract file extension from filename."""
return filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
def _get_media_type(self, extension: str) -> str | None:
"""Get media type from file extension."""
return ALLOWED_EXTENSIONS.get(extension)
def _generate_unique_filename(self, original_filename: str) -> str:
"""Generate a unique filename using UUID."""
ext = self._get_file_extension(original_filename)
return f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex
def _validate_file(
self, filename: str, file_size: int
) -> tuple[str, str]:
"""
Validate uploaded file.
Returns:
Tuple of (extension, media_type)
Raises:
MediaValidationException: If file is invalid
UnsupportedMediaTypeException: If file type is not supported
MediaFileTooLargeException: If file exceeds size limit
"""
ext = self._get_file_extension(filename)
if not ext:
raise MediaValidationException("File must have an extension", field="file")
media_type = self._get_media_type(ext)
if not media_type:
raise UnsupportedMediaTypeException(
ext, allowed_types=list(ALLOWED_EXTENSIONS.keys())
)
max_size = MAX_FILE_SIZES.get(media_type, 10 * 1024 * 1024)
if file_size > max_size:
raise MediaFileTooLargeException(file_size, max_size, media_type)
return ext, media_type
def _get_image_dimensions(self, file_path: Path) -> tuple[int, int] | None:
"""Get image dimensions if PIL is available."""
try:
from PIL import Image
with Image.open(file_path) as img:
return img.size
except ImportError:
logger.debug("PIL not available, skipping image dimension detection")
return None
except Exception as e:
logger.warning(f"Could not get image dimensions: {e}")
return None
def _generate_thumbnail(
self, source_path: Path, vendor_id: int
) -> str | None:
"""Generate thumbnail for image file."""
try:
from PIL import Image
# Create thumbnails directory
thumb_dir = self._get_vendor_upload_path(vendor_id, "thumbnails")
self._ensure_upload_dir(thumb_dir)
# Generate thumbnail filename
thumb_filename = f"thumb_{source_path.name}"
thumb_path = thumb_dir / thumb_filename
# Create thumbnail
with Image.open(source_path) as img:
img.thumbnail(THUMBNAIL_SIZE)
# Convert to RGB if needed (for PNG with transparency)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
img.save(thumb_path, "JPEG", quality=85)
# Return relative path
return str(thumb_path.relative_to(UPLOAD_DIR))
except ImportError:
logger.debug("PIL not available, skipping thumbnail generation")
return None
except Exception as e:
logger.warning(f"Could not generate thumbnail: {e}")
return None
async def upload_file(
self,
db: Session,
vendor_id: int,
file_content: bytes,
filename: str,
folder: str = "general",
) -> MediaFile:
"""
Upload a file to the media library.
Args:
db: Database session
vendor_id: Vendor ID
file_content: File content as bytes
filename: Original filename
folder: Folder to store in (products, general, etc.)
Returns:
Created MediaFile record
"""
# Validate file
file_size = len(file_content)
ext, media_type = self._validate_file(filename, file_size)
# Generate unique filename
unique_filename = self._generate_unique_filename(filename)
# Get upload path
upload_path = self._get_vendor_upload_path(vendor_id, folder)
self._ensure_upload_dir(upload_path)
# Save file
file_path = upload_path / unique_filename
file_path.write_bytes(file_content)
# Get relative path for storage
relative_path = str(file_path.relative_to(UPLOAD_DIR))
# Get MIME type
mime_type, _ = mimetypes.guess_type(filename)
# Get image dimensions and generate thumbnail
width, height = None, None
thumbnail_path = None
if media_type == "image":
dimensions = self._get_image_dimensions(file_path)
if dimensions:
width, height = dimensions
thumbnail_path = self._generate_thumbnail(file_path, vendor_id)
# Create database record
media_file = MediaFile(
vendor_id=vendor_id,
filename=unique_filename,
original_filename=filename,
file_path=relative_path,
media_type=media_type,
mime_type=mime_type,
file_size=file_size,
width=width,
height=height,
thumbnail_path=thumbnail_path,
folder=folder,
)
db.add(media_file)
db.flush()
db.refresh(media_file)
logger.info(
f"Uploaded media file {media_file.id} for vendor {vendor_id}: {filename}"
)
return media_file
def get_media(
self, db: Session, vendor_id: int, media_id: int
) -> MediaFile:
"""
Get a media file by ID.
Raises:
MediaNotFoundException: If media not found or doesn't belong to vendor
"""
media = (
db.query(MediaFile)
.filter(
MediaFile.id == media_id,
MediaFile.vendor_id == vendor_id,
)
.first()
)
if not media:
raise MediaNotFoundException(media_id)
return media
def get_media_library(
self,
db: Session,
vendor_id: int,
skip: int = 0,
limit: int = 100,
media_type: str | None = None,
folder: str | None = None,
search: str | None = None,
) -> tuple[list[MediaFile], int]:
"""
Get vendor media library with filtering.
Args:
db: Database session
vendor_id: Vendor ID
skip: Pagination offset
limit: Pagination limit
media_type: Filter by media type
folder: Filter by folder
search: Search in filename
Returns:
Tuple of (media_files, total_count)
"""
query = db.query(MediaFile).filter(MediaFile.vendor_id == vendor_id)
if media_type:
query = query.filter(MediaFile.media_type == media_type)
if folder:
query = query.filter(MediaFile.folder == folder)
if search:
search_pattern = f"%{search}%"
query = query.filter(
or_(
MediaFile.filename.ilike(search_pattern),
MediaFile.original_filename.ilike(search_pattern),
MediaFile.alt_text.ilike(search_pattern),
)
)
# Order by newest first
query = query.order_by(MediaFile.created_at.desc())
total = query.count()
media_files = query.offset(skip).limit(limit).all()
return media_files, total
def update_media_metadata(
self,
db: Session,
vendor_id: int,
media_id: int,
filename: str | None = None,
alt_text: str | None = None,
description: str | None = None,
folder: str | None = None,
metadata: dict | None = None,
) -> MediaFile:
"""
Update media file metadata.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
filename: New display filename
alt_text: Alt text for images
description: File description
folder: Move to different folder
metadata: Additional metadata
Returns:
Updated MediaFile
"""
media = self.get_media(db, vendor_id, media_id)
if filename is not None:
media.original_filename = filename
if alt_text is not None:
media.alt_text = alt_text
if description is not None:
media.description = description
if folder is not None and folder != media.folder:
# Move file to new folder
old_path = UPLOAD_DIR / media.file_path
new_dir = self._get_vendor_upload_path(vendor_id, folder)
self._ensure_upload_dir(new_dir)
new_path = new_dir / media.filename
if old_path.exists():
shutil.move(str(old_path), str(new_path))
media.file_path = str(new_path.relative_to(UPLOAD_DIR))
media.folder = folder
if metadata is not None:
media.extra_metadata = metadata
media.updated_at = datetime.now(UTC)
db.flush()
logger.info(f"Updated media metadata for {media_id}")
return media
def delete_media(
self, db: Session, vendor_id: int, media_id: int
) -> bool:
"""
Delete a media file.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
Returns:
True if deleted successfully
"""
media = self.get_media(db, vendor_id, media_id)
# Delete physical files
file_path = UPLOAD_DIR / media.file_path
if file_path.exists():
file_path.unlink()
if media.thumbnail_path:
thumb_path = UPLOAD_DIR / media.thumbnail_path
if thumb_path.exists():
thumb_path.unlink()
# Delete database record
db.delete(media)
logger.info(f"Deleted media file {media_id} for vendor {vendor_id}")
return True
def get_media_usage(
self, db: Session, vendor_id: int, media_id: int
) -> dict:
"""
Get where a media file is being used.
Returns:
Dict with products and other usage information
"""
media = self.get_media(db, vendor_id, media_id)
# Get product associations
product_usage = []
for assoc in media.product_associations:
product = assoc.product
if product:
product_usage.append({
"product_id": product.id,
"product_name": product.get_title() or f"Product {product.id}",
"usage_type": assoc.usage_type,
})
return {
"media_id": media_id,
"products": product_usage,
"other_usage": [],
"total_usage_count": len(product_usage),
}
def attach_to_product(
self,
db: Session,
vendor_id: int,
media_id: int,
product_id: int,
usage_type: str = "gallery",
display_order: int = 0,
) -> ProductMedia:
"""
Attach a media file to a product.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
product_id: Product ID
usage_type: How the media is used (main_image, gallery, etc.)
display_order: Order for galleries
Returns:
Created ProductMedia association
"""
# Verify media belongs to vendor
media = self.get_media(db, vendor_id, media_id)
# Check if already attached with same usage type
existing = (
db.query(ProductMedia)
.filter(
ProductMedia.product_id == product_id,
ProductMedia.media_id == media_id,
ProductMedia.usage_type == usage_type,
)
.first()
)
if existing:
existing.display_order = display_order
db.flush()
return existing
# Create association
product_media = ProductMedia(
product_id=product_id,
media_id=media_id,
usage_type=usage_type,
display_order=display_order,
)
db.add(product_media)
# Update usage count
media.usage_count = (media.usage_count or 0) + 1
db.flush()
return product_media
def detach_from_product(
self,
db: Session,
vendor_id: int,
media_id: int,
product_id: int,
usage_type: str | None = None,
) -> bool:
"""
Detach a media file from a product.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
product_id: Product ID
usage_type: Specific usage type to remove (None = all)
Returns:
True if detached
"""
# Verify media belongs to vendor
media = self.get_media(db, vendor_id, media_id)
query = db.query(ProductMedia).filter(
ProductMedia.product_id == product_id,
ProductMedia.media_id == media_id,
)
if usage_type:
query = query.filter(ProductMedia.usage_type == usage_type)
deleted_count = query.delete()
# Update usage count
if deleted_count > 0:
media.usage_count = max(0, (media.usage_count or 0) - deleted_count)
db.flush()
return deleted_count > 0
# Create service instance
media_service = MediaService()