- Auto-fixed 4,496 lint issues (import sorting, modern syntax, etc.) - Added ignore rules for patterns intentional in this codebase: E402 (late imports), E712 (SQLAlchemy filters), B904 (raise from), SIM108/SIM105/SIM117 (readability preferences) - Added per-file ignores for tests and scripts - Excluded broken scripts/rename_terminology.py (has curly quotes) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
256 lines
8.9 KiB
Python
256 lines
8.9 KiB
Python
# app/modules/billing/services/feature_aggregator.py
|
|
"""
|
|
Feature aggregator service for cross-module feature discovery and usage tracking.
|
|
|
|
Discovers FeatureProviderProtocol implementations from all modules,
|
|
caches declarations, and provides aggregated usage data.
|
|
|
|
Usage:
|
|
from app.modules.billing.services.feature_aggregator import feature_aggregator
|
|
|
|
# Get all declared features
|
|
declarations = feature_aggregator.get_all_declarations()
|
|
|
|
# Get usage for a store
|
|
usage = feature_aggregator.get_store_usage(db, store_id)
|
|
|
|
# Check a limit
|
|
allowed, message = feature_aggregator.check_limit(db, "products_limit", store_id=store_id)
|
|
"""
|
|
|
|
import logging
|
|
from typing import TYPE_CHECKING
|
|
|
|
from app.modules.contracts.features import (
|
|
FeatureDeclaration,
|
|
FeatureProviderProtocol,
|
|
FeatureScope,
|
|
FeatureType,
|
|
FeatureUsage,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.orm import Session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FeatureAggregatorService:
|
|
"""
|
|
Singleton service that discovers and aggregates feature providers from all modules.
|
|
|
|
Discovers feature_provider from all modules via app.modules.registry.MODULES.
|
|
Caches declarations (they're static and don't change at runtime).
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._declarations_cache: dict[str, FeatureDeclaration] | None = None
|
|
self._providers_cache: list[FeatureProviderProtocol] | None = None
|
|
|
|
def _discover_providers(self) -> list[FeatureProviderProtocol]:
|
|
"""Discover all feature providers from registered modules."""
|
|
if self._providers_cache is not None:
|
|
return self._providers_cache
|
|
|
|
from app.modules.registry import MODULES
|
|
|
|
providers = []
|
|
for module in MODULES.values():
|
|
if module.has_feature_provider():
|
|
try:
|
|
provider = module.get_feature_provider_instance()
|
|
if provider is not None:
|
|
providers.append(provider)
|
|
logger.debug(
|
|
f"Discovered feature provider from module '{module.code}': "
|
|
f"category='{provider.feature_category}'"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to load feature provider from module '{module.code}': {e}"
|
|
)
|
|
|
|
self._providers_cache = providers
|
|
logger.info(f"Discovered {len(providers)} feature providers")
|
|
return providers
|
|
|
|
def _build_declarations(self) -> dict[str, FeatureDeclaration]:
|
|
"""Build and cache the feature declarations map."""
|
|
if self._declarations_cache is not None:
|
|
return self._declarations_cache
|
|
|
|
declarations: dict[str, FeatureDeclaration] = {}
|
|
for provider in self._discover_providers():
|
|
try:
|
|
for decl in provider.get_feature_declarations():
|
|
if decl.code in declarations:
|
|
logger.warning(
|
|
f"Duplicate feature code '{decl.code}' from "
|
|
f"category '{provider.feature_category}' "
|
|
f"(already declared by '{declarations[decl.code].category}')"
|
|
)
|
|
continue
|
|
declarations[decl.code] = decl
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to get declarations from provider "
|
|
f"'{provider.feature_category}': {e}"
|
|
)
|
|
|
|
self._declarations_cache = declarations
|
|
logger.info(f"Built feature catalog: {len(declarations)} features")
|
|
return declarations
|
|
|
|
# =========================================================================
|
|
# Public API — Declarations
|
|
# =========================================================================
|
|
|
|
def get_all_declarations(self) -> dict[str, FeatureDeclaration]:
|
|
"""
|
|
Get all feature declarations from all modules.
|
|
|
|
Returns:
|
|
Dict mapping feature_code -> FeatureDeclaration
|
|
"""
|
|
return self._build_declarations()
|
|
|
|
def get_declaration(self, feature_code: str) -> FeatureDeclaration | None:
|
|
"""Get a single feature declaration by code."""
|
|
return self._build_declarations().get(feature_code)
|
|
|
|
def get_declarations_by_category(self) -> dict[str, list[FeatureDeclaration]]:
|
|
"""
|
|
Get feature declarations grouped by category.
|
|
|
|
Returns:
|
|
Dict mapping category -> list of FeatureDeclaration, sorted by display_order
|
|
"""
|
|
by_category: dict[str, list[FeatureDeclaration]] = {}
|
|
for decl in self._build_declarations().values():
|
|
by_category.setdefault(decl.category, []).append(decl)
|
|
|
|
# Sort each category by display_order
|
|
for category in by_category:
|
|
by_category[category].sort(key=lambda d: d.display_order)
|
|
|
|
return by_category
|
|
|
|
def validate_feature_codes(self, codes: set[str]) -> set[str]:
|
|
"""
|
|
Validate feature codes against known declarations.
|
|
|
|
Args:
|
|
codes: Set of feature codes to validate
|
|
|
|
Returns:
|
|
Set of invalid codes (empty if all valid)
|
|
"""
|
|
known = set(self._build_declarations().keys())
|
|
return codes - known
|
|
|
|
# =========================================================================
|
|
# Public API — Usage
|
|
# =========================================================================
|
|
|
|
def get_store_usage(self, db: "Session", store_id: int) -> dict[str, FeatureUsage]:
|
|
"""
|
|
Get current usage for a specific store across all providers.
|
|
|
|
Args:
|
|
db: Database session
|
|
store_id: Store ID
|
|
|
|
Returns:
|
|
Dict mapping feature_code -> FeatureUsage
|
|
"""
|
|
usage: dict[str, FeatureUsage] = {}
|
|
for provider in self._discover_providers():
|
|
try:
|
|
for item in provider.get_store_usage(db, store_id):
|
|
usage[item.feature_code] = item
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to get store usage from provider "
|
|
f"'{provider.feature_category}': {e}"
|
|
)
|
|
return usage
|
|
|
|
def get_merchant_usage(
|
|
self, db: "Session", merchant_id: int, platform_id: int
|
|
) -> dict[str, FeatureUsage]:
|
|
"""
|
|
Get current usage aggregated across all merchant's stores.
|
|
|
|
Args:
|
|
db: Database session
|
|
merchant_id: Merchant ID
|
|
platform_id: Platform ID
|
|
|
|
Returns:
|
|
Dict mapping feature_code -> FeatureUsage
|
|
"""
|
|
usage: dict[str, FeatureUsage] = {}
|
|
for provider in self._discover_providers():
|
|
try:
|
|
for item in provider.get_merchant_usage(db, merchant_id, platform_id):
|
|
usage[item.feature_code] = item
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to get merchant usage from provider "
|
|
f"'{provider.feature_category}': {e}"
|
|
)
|
|
return usage
|
|
|
|
def get_usage_for_feature(
|
|
self,
|
|
db: "Session",
|
|
feature_code: str,
|
|
store_id: int | None = None,
|
|
merchant_id: int | None = None,
|
|
platform_id: int | None = None,
|
|
) -> FeatureUsage | None:
|
|
"""
|
|
Get usage for a specific feature, respecting its scope.
|
|
|
|
Args:
|
|
db: Database session
|
|
feature_code: Feature code to check
|
|
store_id: Store ID (for STORE-scoped features)
|
|
merchant_id: Merchant ID (for MERCHANT-scoped features)
|
|
platform_id: Platform ID (for MERCHANT-scoped features)
|
|
|
|
Returns:
|
|
FeatureUsage or None if not found
|
|
"""
|
|
decl = self.get_declaration(feature_code)
|
|
if not decl or decl.feature_type != FeatureType.QUANTITATIVE:
|
|
return None
|
|
|
|
if decl.scope == FeatureScope.STORE and store_id is not None:
|
|
usage = self.get_store_usage(db, store_id)
|
|
return usage.get(feature_code)
|
|
if decl.scope == FeatureScope.MERCHANT and merchant_id is not None and platform_id is not None:
|
|
usage = self.get_merchant_usage(db, merchant_id, platform_id)
|
|
return usage.get(feature_code)
|
|
|
|
return None
|
|
|
|
# =========================================================================
|
|
# Cache Management
|
|
# =========================================================================
|
|
|
|
def invalidate_cache(self) -> None:
|
|
"""Invalidate all caches. Call when modules are added/removed."""
|
|
self._declarations_cache = None
|
|
self._providers_cache = None
|
|
logger.debug("Feature aggregator cache invalidated")
|
|
|
|
|
|
# Singleton instance
|
|
feature_aggregator = FeatureAggregatorService()
|
|
|
|
__all__ = [
|
|
"feature_aggregator",
|
|
"FeatureAggregatorService",
|
|
]
|