# app/modules/marketplace/services/marketplace_metrics.py """ Metrics provider for the marketplace module. Provides metrics for: - Imported products (staging area) - Import jobs - Marketplace statistics """ import logging from datetime import datetime, timedelta from typing import TYPE_CHECKING from sqlalchemy import func from sqlalchemy.orm import Session from app.modules.contracts.metrics import ( MetricValue, MetricsContext, MetricsProviderProtocol, ) if TYPE_CHECKING: pass logger = logging.getLogger(__name__) class MarketplaceMetricsProvider: """ Metrics provider for marketplace module. Provides import and staging metrics for store and platform dashboards. """ @property def metrics_category(self) -> str: return "marketplace" def get_store_metrics( self, db: Session, store_id: int, context: MetricsContext | None = None, ) -> list[MetricValue]: """ Get marketplace metrics for a specific store. Provides: - Imported products (staging) - Import job statistics """ from app.modules.marketplace.models import MarketplaceImportJob, MarketplaceProduct from app.modules.tenancy.models import Store try: # Get store name for MarketplaceProduct queries # (MarketplaceProduct uses store_name, not store_id) store = db.query(Store).filter(Store.id == store_id).first() store_name = store.name if store else "" # Staging products staging_products = ( db.query(MarketplaceProduct) .filter(MarketplaceProduct.store_name == store_name) .count() ) # Import jobs total_imports = ( db.query(MarketplaceImportJob) .filter(MarketplaceImportJob.store_id == store_id) .count() ) successful_imports = ( db.query(MarketplaceImportJob) .filter( MarketplaceImportJob.store_id == store_id, MarketplaceImportJob.status == "completed", ) .count() ) failed_imports = ( db.query(MarketplaceImportJob) .filter( MarketplaceImportJob.store_id == store_id, MarketplaceImportJob.status == "failed", ) .count() ) pending_imports = ( db.query(MarketplaceImportJob) .filter( MarketplaceImportJob.store_id == store_id, MarketplaceImportJob.status == "pending", ) .count() ) # Import success rate success_rate = ( round(successful_imports / total_imports * 100, 1) if total_imports > 0 else 0 ) # Recent imports (last 30 days) date_from = context.date_from if context else None if date_from is None: date_from = datetime.utcnow() - timedelta(days=30) recent_imports = ( db.query(MarketplaceImportJob) .filter( MarketplaceImportJob.store_id == store_id, MarketplaceImportJob.created_at >= date_from, ) .count() ) return [ MetricValue( key="marketplace.staging_products", value=staging_products, label="Staging Products", category="marketplace", icon="inbox", description="Products in staging area", ), MetricValue( key="marketplace.total_imports", value=total_imports, label="Total Imports", category="marketplace", icon="download", description="Total import jobs", ), MetricValue( key="marketplace.successful_imports", value=successful_imports, label="Successful Imports", category="marketplace", icon="check-circle", description="Completed import jobs", ), MetricValue( key="marketplace.failed_imports", value=failed_imports, label="Failed Imports", category="marketplace", icon="x-circle", description="Failed import jobs", ), MetricValue( key="marketplace.pending_imports", value=pending_imports, label="Pending Imports", category="marketplace", icon="clock", description="Import jobs waiting to process", ), MetricValue( key="marketplace.success_rate", value=success_rate, label="Success Rate", category="marketplace", icon="percent", unit="%", description="Import success rate", ), MetricValue( key="marketplace.recent_imports", value=recent_imports, label="Recent Imports", category="marketplace", icon="activity", description="Imports in the selected period", ), ] except Exception as e: logger.warning(f"Failed to get marketplace store metrics: {e}") return [] def get_platform_metrics( self, db: Session, platform_id: int, context: MetricsContext | None = None, ) -> list[MetricValue]: """ Get marketplace metrics aggregated for a platform. Aggregates import and staging data across all stores. """ from app.modules.marketplace.models import MarketplaceImportJob, MarketplaceProduct from app.modules.tenancy.models import StorePlatform try: # Get all store IDs for this platform using StorePlatform junction table store_ids = ( db.query(StorePlatform.store_id) .filter( StorePlatform.platform_id == platform_id, StorePlatform.is_active == True, ) .subquery() ) # Total staging products (across all stores) # Note: MarketplaceProduct doesn't have direct platform_id link total_staging_products = db.query(MarketplaceProduct).count() # Unique marketplaces unique_marketplaces = ( db.query(func.count(func.distinct(MarketplaceProduct.marketplace))) .filter(MarketplaceProduct.marketplace.isnot(None)) .scalar() or 0 ) # Unique brands unique_brands = ( db.query(func.count(func.distinct(MarketplaceProduct.brand))) .filter( MarketplaceProduct.brand.isnot(None), MarketplaceProduct.brand != "", ) .scalar() or 0 ) # Import jobs total_imports = ( db.query(MarketplaceImportJob) .filter(MarketplaceImportJob.store_id.in_(store_ids)) .count() ) successful_imports = ( db.query(MarketplaceImportJob) .filter( MarketplaceImportJob.store_id.in_(store_ids), MarketplaceImportJob.status.in_(["completed", "completed_with_errors"]), ) .count() ) failed_imports = ( db.query(MarketplaceImportJob) .filter( MarketplaceImportJob.store_id.in_(store_ids), MarketplaceImportJob.status == "failed", ) .count() ) pending_imports = ( db.query(MarketplaceImportJob) .filter( MarketplaceImportJob.store_id.in_(store_ids), MarketplaceImportJob.status == "pending", ) .count() ) processing_imports = ( db.query(MarketplaceImportJob) .filter( MarketplaceImportJob.store_id.in_(store_ids), MarketplaceImportJob.status == "processing", ) .count() ) # Success rate success_rate = ( round(successful_imports / total_imports * 100, 1) if total_imports > 0 else 0 ) # Stores with imports stores_with_imports = ( db.query(func.count(func.distinct(MarketplaceImportJob.store_id))) .filter(MarketplaceImportJob.store_id.in_(store_ids)) .scalar() or 0 ) return [ MetricValue( key="marketplace.total_staging", value=total_staging_products, label="Staging Products", category="marketplace", icon="inbox", description="Total products in staging", ), MetricValue( key="marketplace.unique_marketplaces", value=unique_marketplaces, label="Marketplaces", category="marketplace", icon="globe", description="Unique marketplace sources", ), MetricValue( key="marketplace.unique_brands", value=unique_brands, label="Brands", category="marketplace", icon="tag", description="Unique product brands", ), MetricValue( key="marketplace.total_imports", value=total_imports, label="Total Imports", category="marketplace", icon="download", description="Total import jobs", ), MetricValue( key="marketplace.successful_imports", value=successful_imports, label="Successful", category="marketplace", icon="check-circle", description="Completed import jobs", ), MetricValue( key="marketplace.failed_imports", value=failed_imports, label="Failed", category="marketplace", icon="x-circle", description="Failed import jobs", ), MetricValue( key="marketplace.pending_imports", value=pending_imports, label="Pending", category="marketplace", icon="clock", description="Jobs waiting to process", ), MetricValue( key="marketplace.processing_imports", value=processing_imports, label="Processing", category="marketplace", icon="loader", description="Jobs currently processing", ), MetricValue( key="marketplace.success_rate", value=success_rate, label="Success Rate", category="marketplace", icon="percent", unit="%", description="Import success rate", ), MetricValue( key="marketplace.stores_importing", value=stores_with_imports, label="Stores Importing", category="marketplace", icon="store", description="Stores using imports", ), ] except Exception as e: logger.warning(f"Failed to get marketplace platform metrics: {e}") return [] # Singleton instance marketplace_metrics_provider = MarketplaceMetricsProvider() __all__ = ["MarketplaceMetricsProvider", "marketplace_metrics_provider"]