refactor: fix architecture violations with provider patterns and dependency inversion

Major changes:
- Add AuditProvider protocol for cross-module audit logging
- Move customer order operations to orders module (dependency inversion)
- Add customer order metrics via MetricsProvider pattern
- Fix missing db parameter in get_admin_context() calls
- Move ProductMedia relationship to catalog module (proper ownership)
- Add marketplace breakdown stats to marketplace_widgets

New files:
- contracts/audit.py - AuditProviderProtocol
- core/services/audit_aggregator.py - Aggregates audit providers
- monitoring/services/audit_provider.py - Monitoring audit implementation
- orders/services/customer_order_service.py - Customer order operations
- orders/routes/api/vendor_customer_orders.py - Customer order endpoints
- catalog/services/product_media_service.py - Product media service
- Architecture documentation for patterns

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-04 21:32:32 +01:00
parent bd43e21940
commit 39dff4ab7d
34 changed files with 2751 additions and 407 deletions

View File

@@ -16,14 +16,11 @@ import shutil
import uuid
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any
from sqlalchemy import or_
from sqlalchemy.orm import Session
if TYPE_CHECKING:
from app.modules.catalog.models import ProductMedia
from app.modules.cms.exceptions import (
MediaNotFoundException,
MediaUploadException,
@@ -420,147 +417,9 @@ class MediaService:
return True
def get_media_usage(
self, db: Session, vendor_id: int, media_id: int
) -> dict:
"""
Get where a media file is being used.
Returns:
Dict with products and other usage information
"""
media = self.get_media(db, vendor_id, media_id)
# Get product associations
product_usage = []
for assoc in media.product_associations:
product = assoc.product
if product:
product_usage.append({
"product_id": product.id,
"product_name": product.get_title() or f"Product {product.id}",
"usage_type": assoc.usage_type,
})
return {
"media_id": media_id,
"products": product_usage,
"other_usage": [],
"total_usage_count": len(product_usage),
}
def attach_to_product(
self,
db: Session,
vendor_id: int,
media_id: int,
product_id: int,
usage_type: str = "gallery",
display_order: int = 0,
) -> "ProductMedia | None":
"""
Attach a media file to a product.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
product_id: Product ID
usage_type: How the media is used (main_image, gallery, etc.)
display_order: Order for galleries
Returns:
Created ProductMedia association, or None if catalog module unavailable
"""
try:
from app.modules.catalog.models import ProductMedia
except ImportError:
logger.warning("Catalog module not available for media-product attachment")
return None
# Verify media belongs to vendor
media = self.get_media(db, vendor_id, media_id)
# Check if already attached with same usage type
existing = (
db.query(ProductMedia)
.filter(
ProductMedia.product_id == product_id,
ProductMedia.media_id == media_id,
ProductMedia.usage_type == usage_type,
)
.first()
)
if existing:
existing.display_order = display_order
db.flush()
return existing
# Create association
product_media = ProductMedia(
product_id=product_id,
media_id=media_id,
usage_type=usage_type,
display_order=display_order,
)
db.add(product_media)
# Update usage count
media.usage_count = (media.usage_count or 0) + 1
db.flush()
return product_media
def detach_from_product(
self,
db: Session,
vendor_id: int,
media_id: int,
product_id: int,
usage_type: str | None = None,
) -> bool:
"""
Detach a media file from a product.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
product_id: Product ID
usage_type: Specific usage type to remove (None = all)
Returns:
True if detached, False if catalog module unavailable
"""
try:
from app.modules.catalog.models import ProductMedia
except ImportError:
logger.warning("Catalog module not available for media-product detachment")
return False
# Verify media belongs to vendor
media = self.get_media(db, vendor_id, media_id)
query = db.query(ProductMedia).filter(
ProductMedia.product_id == product_id,
ProductMedia.media_id == media_id,
)
if usage_type:
query = query.filter(ProductMedia.usage_type == usage_type)
deleted_count = query.delete()
# Update usage count
if deleted_count > 0:
media.usage_count = max(0, (media.usage_count or 0) - deleted_count)
db.flush()
return deleted_count > 0
# Note: Product-specific methods (attach_to_product, detach_from_product,
# get_media_usage) have been moved to catalog.services.product_media_service
# CMS media_service is now consumer-agnostic.
# Create service instance