Files
orion/app/modules/billing/services/feature_aggregator.py
Samir Boulahtit f20266167d
Some checks failed
CI / ruff (push) Failing after 7s
CI / pytest (push) Failing after 1s
CI / architecture (push) Failing after 9s
CI / dependency-scanning (push) Successful in 27s
CI / audit (push) Successful in 8s
CI / docs (push) Has been skipped
fix(lint): auto-fix ruff violations and tune lint rules
- Auto-fixed 4,496 lint issues (import sorting, modern syntax, etc.)
- Added ignore rules for patterns intentional in this codebase:
  E402 (late imports), E712 (SQLAlchemy filters), B904 (raise from),
  SIM108/SIM105/SIM117 (readability preferences)
- Added per-file ignores for tests and scripts
- Excluded broken scripts/rename_terminology.py (has curly quotes)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 23:10:42 +01:00

256 lines
8.9 KiB
Python

# app/modules/billing/services/feature_aggregator.py
"""
Feature aggregator service for cross-module feature discovery and usage tracking.
Discovers FeatureProviderProtocol implementations from all modules,
caches declarations, and provides aggregated usage data.
Usage:
from app.modules.billing.services.feature_aggregator import feature_aggregator
# Get all declared features
declarations = feature_aggregator.get_all_declarations()
# Get usage for a store
usage = feature_aggregator.get_store_usage(db, store_id)
# Check a limit
allowed, message = feature_aggregator.check_limit(db, "products_limit", store_id=store_id)
"""
import logging
from typing import TYPE_CHECKING
from app.modules.contracts.features import (
FeatureDeclaration,
FeatureProviderProtocol,
FeatureScope,
FeatureType,
FeatureUsage,
)
if TYPE_CHECKING:
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class FeatureAggregatorService:
"""
Singleton service that discovers and aggregates feature providers from all modules.
Discovers feature_provider from all modules via app.modules.registry.MODULES.
Caches declarations (they're static and don't change at runtime).
"""
def __init__(self):
self._declarations_cache: dict[str, FeatureDeclaration] | None = None
self._providers_cache: list[FeatureProviderProtocol] | None = None
def _discover_providers(self) -> list[FeatureProviderProtocol]:
"""Discover all feature providers from registered modules."""
if self._providers_cache is not None:
return self._providers_cache
from app.modules.registry import MODULES
providers = []
for module in MODULES.values():
if module.has_feature_provider():
try:
provider = module.get_feature_provider_instance()
if provider is not None:
providers.append(provider)
logger.debug(
f"Discovered feature provider from module '{module.code}': "
f"category='{provider.feature_category}'"
)
except Exception as e:
logger.error(
f"Failed to load feature provider from module '{module.code}': {e}"
)
self._providers_cache = providers
logger.info(f"Discovered {len(providers)} feature providers")
return providers
def _build_declarations(self) -> dict[str, FeatureDeclaration]:
"""Build and cache the feature declarations map."""
if self._declarations_cache is not None:
return self._declarations_cache
declarations: dict[str, FeatureDeclaration] = {}
for provider in self._discover_providers():
try:
for decl in provider.get_feature_declarations():
if decl.code in declarations:
logger.warning(
f"Duplicate feature code '{decl.code}' from "
f"category '{provider.feature_category}' "
f"(already declared by '{declarations[decl.code].category}')"
)
continue
declarations[decl.code] = decl
except Exception as e:
logger.error(
f"Failed to get declarations from provider "
f"'{provider.feature_category}': {e}"
)
self._declarations_cache = declarations
logger.info(f"Built feature catalog: {len(declarations)} features")
return declarations
# =========================================================================
# Public API — Declarations
# =========================================================================
def get_all_declarations(self) -> dict[str, FeatureDeclaration]:
"""
Get all feature declarations from all modules.
Returns:
Dict mapping feature_code -> FeatureDeclaration
"""
return self._build_declarations()
def get_declaration(self, feature_code: str) -> FeatureDeclaration | None:
"""Get a single feature declaration by code."""
return self._build_declarations().get(feature_code)
def get_declarations_by_category(self) -> dict[str, list[FeatureDeclaration]]:
"""
Get feature declarations grouped by category.
Returns:
Dict mapping category -> list of FeatureDeclaration, sorted by display_order
"""
by_category: dict[str, list[FeatureDeclaration]] = {}
for decl in self._build_declarations().values():
by_category.setdefault(decl.category, []).append(decl)
# Sort each category by display_order
for category in by_category:
by_category[category].sort(key=lambda d: d.display_order)
return by_category
def validate_feature_codes(self, codes: set[str]) -> set[str]:
"""
Validate feature codes against known declarations.
Args:
codes: Set of feature codes to validate
Returns:
Set of invalid codes (empty if all valid)
"""
known = set(self._build_declarations().keys())
return codes - known
# =========================================================================
# Public API — Usage
# =========================================================================
def get_store_usage(self, db: "Session", store_id: int) -> dict[str, FeatureUsage]:
"""
Get current usage for a specific store across all providers.
Args:
db: Database session
store_id: Store ID
Returns:
Dict mapping feature_code -> FeatureUsage
"""
usage: dict[str, FeatureUsage] = {}
for provider in self._discover_providers():
try:
for item in provider.get_store_usage(db, store_id):
usage[item.feature_code] = item
except Exception as e:
logger.error(
f"Failed to get store usage from provider "
f"'{provider.feature_category}': {e}"
)
return usage
def get_merchant_usage(
self, db: "Session", merchant_id: int, platform_id: int
) -> dict[str, FeatureUsage]:
"""
Get current usage aggregated across all merchant's stores.
Args:
db: Database session
merchant_id: Merchant ID
platform_id: Platform ID
Returns:
Dict mapping feature_code -> FeatureUsage
"""
usage: dict[str, FeatureUsage] = {}
for provider in self._discover_providers():
try:
for item in provider.get_merchant_usage(db, merchant_id, platform_id):
usage[item.feature_code] = item
except Exception as e:
logger.error(
f"Failed to get merchant usage from provider "
f"'{provider.feature_category}': {e}"
)
return usage
def get_usage_for_feature(
self,
db: "Session",
feature_code: str,
store_id: int | None = None,
merchant_id: int | None = None,
platform_id: int | None = None,
) -> FeatureUsage | None:
"""
Get usage for a specific feature, respecting its scope.
Args:
db: Database session
feature_code: Feature code to check
store_id: Store ID (for STORE-scoped features)
merchant_id: Merchant ID (for MERCHANT-scoped features)
platform_id: Platform ID (for MERCHANT-scoped features)
Returns:
FeatureUsage or None if not found
"""
decl = self.get_declaration(feature_code)
if not decl or decl.feature_type != FeatureType.QUANTITATIVE:
return None
if decl.scope == FeatureScope.STORE and store_id is not None:
usage = self.get_store_usage(db, store_id)
return usage.get(feature_code)
if decl.scope == FeatureScope.MERCHANT and merchant_id is not None and platform_id is not None:
usage = self.get_merchant_usage(db, merchant_id, platform_id)
return usage.get(feature_code)
return None
# =========================================================================
# Cache Management
# =========================================================================
def invalidate_cache(self) -> None:
"""Invalidate all caches. Call when modules are added/removed."""
self._declarations_cache = None
self._providers_cache = None
logger.debug("Feature aggregator cache invalidated")
# Singleton instance
feature_aggregator = FeatureAggregatorService()
__all__ = [
"feature_aggregator",
"FeatureAggregatorService",
]