# app/modules/billing/services/feature_aggregator.py """ Feature aggregator service for cross-module feature discovery and usage tracking. Discovers FeatureProviderProtocol implementations from all modules, caches declarations, and provides aggregated usage data. Usage: from app.modules.billing.services.feature_aggregator import feature_aggregator # Get all declared features declarations = feature_aggregator.get_all_declarations() # Get usage for a store usage = feature_aggregator.get_store_usage(db, store_id) # Check a limit allowed, message = feature_aggregator.check_limit(db, "products_limit", store_id=store_id) """ import logging from typing import TYPE_CHECKING from app.modules.contracts.features import ( FeatureDeclaration, FeatureProviderProtocol, FeatureScope, FeatureType, FeatureUsage, ) if TYPE_CHECKING: from sqlalchemy.orm import Session logger = logging.getLogger(__name__) class FeatureAggregatorService: """ Singleton service that discovers and aggregates feature providers from all modules. Discovers feature_provider from all modules via app.modules.registry.MODULES. Caches declarations (they're static and don't change at runtime). """ def __init__(self): self._declarations_cache: dict[str, FeatureDeclaration] | None = None self._providers_cache: list[FeatureProviderProtocol] | None = None def _discover_providers(self) -> list[FeatureProviderProtocol]: """Discover all feature providers from registered modules.""" if self._providers_cache is not None: return self._providers_cache from app.modules.registry import MODULES providers = [] for module in MODULES.values(): if module.has_feature_provider(): try: provider = module.get_feature_provider_instance() if provider is not None: providers.append(provider) logger.debug( f"Discovered feature provider from module '{module.code}': " f"category='{provider.feature_category}'" ) except Exception as e: logger.error( f"Failed to load feature provider from module '{module.code}': {e}" ) self._providers_cache = providers logger.info(f"Discovered {len(providers)} feature providers") return providers def _build_declarations(self) -> dict[str, FeatureDeclaration]: """Build and cache the feature declarations map.""" if self._declarations_cache is not None: return self._declarations_cache declarations: dict[str, FeatureDeclaration] = {} for provider in self._discover_providers(): try: for decl in provider.get_feature_declarations(): if decl.code in declarations: logger.warning( f"Duplicate feature code '{decl.code}' from " f"category '{provider.feature_category}' " f"(already declared by '{declarations[decl.code].category}')" ) continue declarations[decl.code] = decl except Exception as e: logger.error( f"Failed to get declarations from provider " f"'{provider.feature_category}': {e}" ) self._declarations_cache = declarations logger.info(f"Built feature catalog: {len(declarations)} features") return declarations # ========================================================================= # Public API — Declarations # ========================================================================= def get_all_declarations(self) -> dict[str, FeatureDeclaration]: """ Get all feature declarations from all modules. Returns: Dict mapping feature_code -> FeatureDeclaration """ return self._build_declarations() def get_declaration(self, feature_code: str) -> FeatureDeclaration | None: """Get a single feature declaration by code.""" return self._build_declarations().get(feature_code) def get_declarations_by_category(self) -> dict[str, list[FeatureDeclaration]]: """ Get feature declarations grouped by category. Returns: Dict mapping category -> list of FeatureDeclaration, sorted by display_order """ by_category: dict[str, list[FeatureDeclaration]] = {} for decl in self._build_declarations().values(): by_category.setdefault(decl.category, []).append(decl) # Sort each category by display_order for category in by_category: by_category[category].sort(key=lambda d: d.display_order) return by_category def validate_feature_codes(self, codes: set[str]) -> set[str]: """ Validate feature codes against known declarations. Args: codes: Set of feature codes to validate Returns: Set of invalid codes (empty if all valid) """ known = set(self._build_declarations().keys()) return codes - known # ========================================================================= # Public API — Usage # ========================================================================= def get_store_usage(self, db: "Session", store_id: int) -> dict[str, FeatureUsage]: """ Get current usage for a specific store across all providers. Args: db: Database session store_id: Store ID Returns: Dict mapping feature_code -> FeatureUsage """ usage: dict[str, FeatureUsage] = {} for provider in self._discover_providers(): try: for item in provider.get_store_usage(db, store_id): usage[item.feature_code] = item except Exception as e: logger.error( f"Failed to get store usage from provider " f"'{provider.feature_category}': {e}" ) return usage def get_merchant_usage( self, db: "Session", merchant_id: int, platform_id: int ) -> dict[str, FeatureUsage]: """ Get current usage aggregated across all merchant's stores. Args: db: Database session merchant_id: Merchant ID platform_id: Platform ID Returns: Dict mapping feature_code -> FeatureUsage """ usage: dict[str, FeatureUsage] = {} for provider in self._discover_providers(): try: for item in provider.get_merchant_usage(db, merchant_id, platform_id): usage[item.feature_code] = item except Exception as e: logger.error( f"Failed to get merchant usage from provider " f"'{provider.feature_category}': {e}" ) return usage def get_usage_for_feature( self, db: "Session", feature_code: str, store_id: int | None = None, merchant_id: int | None = None, platform_id: int | None = None, ) -> FeatureUsage | None: """ Get usage for a specific feature, respecting its scope. Args: db: Database session feature_code: Feature code to check store_id: Store ID (for STORE-scoped features) merchant_id: Merchant ID (for MERCHANT-scoped features) platform_id: Platform ID (for MERCHANT-scoped features) Returns: FeatureUsage or None if not found """ decl = self.get_declaration(feature_code) if not decl or decl.feature_type != FeatureType.QUANTITATIVE: return None if decl.scope == FeatureScope.STORE and store_id is not None: usage = self.get_store_usage(db, store_id) return usage.get(feature_code) if decl.scope == FeatureScope.MERCHANT and merchant_id is not None and platform_id is not None: usage = self.get_merchant_usage(db, merchant_id, platform_id) return usage.get(feature_code) return None # ========================================================================= # Cache Management # ========================================================================= def invalidate_cache(self) -> None: """Invalidate all caches. Call when modules are added/removed.""" self._declarations_cache = None self._providers_cache = None logger.debug("Feature aggregator cache invalidated") # Singleton instance feature_aggregator = FeatureAggregatorService() __all__ = [ "feature_aggregator", "FeatureAggregatorService", ]