Some checks failed
Enforce MOD-025/MOD-026 rules: zero top-level cross-module model imports remain in any service file. All 66 files migrated using deferred import patterns (method-body, _get_model() helpers, instance-cached self._Model) and new cross-module service methods in tenancy. Documentation updated with Pattern 6 (deferred imports), migration plan marked complete, and violations status reflects 84→0 service-layer violations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
534 lines
19 KiB
Python
534 lines
19 KiB
Python
# app/modules/billing/services/feature_service.py
|
|
"""
|
|
Feature-agnostic billing service for merchant-level access control.
|
|
|
|
Zero knowledge of what features exist. Works with:
|
|
- TierFeatureLimit (tier -> feature mappings)
|
|
- MerchantFeatureOverride (per-merchant exceptions)
|
|
- FeatureAggregatorService (discovers features from modules)
|
|
|
|
Usage:
|
|
from app.modules.billing.services.feature_service import feature_service
|
|
|
|
# Check if merchant has feature
|
|
if feature_service.has_feature(db, merchant_id, platform_id, "analytics_dashboard"):
|
|
...
|
|
|
|
# Check quantitative limit
|
|
allowed, msg = feature_service.check_resource_limit(
|
|
db, "products_limit", store_id=store_id
|
|
)
|
|
|
|
# Get feature summary for merchant portal
|
|
summary = feature_service.get_merchant_features_summary(db, merchant_id, platform_id)
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass
|
|
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from app.modules.billing.models import (
|
|
MerchantFeatureOverride,
|
|
MerchantSubscription,
|
|
SubscriptionTier,
|
|
)
|
|
from app.modules.billing.models.tier_feature_limit import TierFeatureLimit
|
|
from app.modules.contracts.features import FeatureType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class FeatureSummary:
|
|
"""Summary of a feature for merchant portal display."""
|
|
|
|
code: str
|
|
name_key: str
|
|
description_key: str
|
|
category: str
|
|
feature_type: str # "binary" or "quantitative"
|
|
scope: str # "store" or "merchant"
|
|
enabled: bool
|
|
limit: int | None # For quantitative: effective limit
|
|
current: int | None # For quantitative: current usage
|
|
remaining: int | None # For quantitative: remaining capacity
|
|
percent_used: float | None # For quantitative: usage percentage
|
|
is_override: bool # Whether an override is applied
|
|
unit_key: str | None
|
|
ui_icon: str | None
|
|
|
|
|
|
class FeatureCache:
|
|
"""
|
|
In-memory cache for merchant features.
|
|
|
|
Caches (merchant_id, platform_id) -> set of feature codes with TTL.
|
|
"""
|
|
|
|
def __init__(self, ttl_seconds: int = 300):
|
|
self._cache: dict[tuple[int, int], tuple[set[str], float]] = {}
|
|
self._ttl = ttl_seconds
|
|
|
|
def get(self, merchant_id: int, platform_id: int) -> set[str] | None:
|
|
key = (merchant_id, platform_id)
|
|
if key not in self._cache:
|
|
return None
|
|
features, timestamp = self._cache[key]
|
|
if time.time() - timestamp > self._ttl:
|
|
del self._cache[key]
|
|
return None
|
|
return features
|
|
|
|
def set(self, merchant_id: int, platform_id: int, features: set[str]) -> None:
|
|
self._cache[(merchant_id, platform_id)] = (features, time.time())
|
|
|
|
def invalidate(self, merchant_id: int, platform_id: int) -> None:
|
|
self._cache.pop((merchant_id, platform_id), None)
|
|
|
|
def invalidate_all(self) -> None:
|
|
self._cache.clear()
|
|
|
|
|
|
class FeatureService:
|
|
"""
|
|
Feature-agnostic service for merchant-level billing.
|
|
|
|
Resolves feature access through:
|
|
1. MerchantSubscription -> SubscriptionTier -> TierFeatureLimit
|
|
2. MerchantFeatureOverride (admin-set exceptions)
|
|
3. FeatureAggregator (for usage counts and declarations)
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._cache = FeatureCache(ttl_seconds=300)
|
|
|
|
# =========================================================================
|
|
# Store -> Merchant Resolution
|
|
# =========================================================================
|
|
|
|
def _get_merchant_for_store(self, db: Session, store_id: int) -> tuple[int | None, int | None]:
|
|
"""
|
|
Resolve store_id to (merchant_id, platform_id).
|
|
|
|
Returns:
|
|
Tuple of (merchant_id, platform_id), either may be None
|
|
"""
|
|
from app.modules.tenancy.services.platform_service import platform_service
|
|
from app.modules.tenancy.services.store_service import store_service
|
|
|
|
store = store_service.get_store_by_id_optional(db, store_id)
|
|
if not store:
|
|
return None, None
|
|
|
|
merchant_id = store.merchant_id
|
|
platform_id = platform_service.get_primary_platform_id_for_store(db, store_id)
|
|
|
|
return merchant_id, platform_id
|
|
|
|
def _get_merchant_and_platforms_for_store(
|
|
self, db: Session, store_id: int
|
|
) -> tuple[int | None, list[int]]:
|
|
"""
|
|
Resolve store_id to (merchant_id, [platform_ids]).
|
|
|
|
Returns all active platform IDs for the store's merchant,
|
|
ordered with the primary platform first.
|
|
"""
|
|
from app.modules.tenancy.services.platform_service import platform_service
|
|
from app.modules.tenancy.services.store_service import store_service
|
|
|
|
store = store_service.get_store_by_id_optional(db, store_id)
|
|
if not store:
|
|
return None, []
|
|
|
|
platform_ids = platform_service.get_active_platform_ids_for_store(db, store_id)
|
|
return store.merchant_id, platform_ids
|
|
|
|
def _get_subscription(
|
|
self, db: Session, merchant_id: int, platform_id: int
|
|
) -> MerchantSubscription | None:
|
|
"""Get merchant subscription for a platform."""
|
|
return (
|
|
db.query(MerchantSubscription)
|
|
.options(joinedload(MerchantSubscription.tier).joinedload(SubscriptionTier.feature_limits))
|
|
.filter(
|
|
MerchantSubscription.merchant_id == merchant_id,
|
|
MerchantSubscription.platform_id == platform_id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
# =========================================================================
|
|
# Feature Availability
|
|
# =========================================================================
|
|
|
|
def has_feature(
|
|
self, db: Session, merchant_id: int, platform_id: int, feature_code: str
|
|
) -> bool:
|
|
"""
|
|
Check if merchant has access to a specific feature.
|
|
|
|
Checks:
|
|
1. MerchantFeatureOverride (force enable/disable)
|
|
2. TierFeatureLimit (tier assignment)
|
|
|
|
Args:
|
|
db: Database session
|
|
merchant_id: Merchant ID
|
|
platform_id: Platform ID
|
|
feature_code: Feature code to check
|
|
|
|
Returns:
|
|
True if merchant has access to the feature
|
|
"""
|
|
# Check override first
|
|
override = (
|
|
db.query(MerchantFeatureOverride)
|
|
.filter(
|
|
MerchantFeatureOverride.merchant_id == merchant_id,
|
|
MerchantFeatureOverride.platform_id == platform_id,
|
|
MerchantFeatureOverride.feature_code == feature_code,
|
|
)
|
|
.first()
|
|
)
|
|
if override is not None:
|
|
return override.is_enabled
|
|
|
|
# Check tier assignment
|
|
subscription = self._get_subscription(db, merchant_id, platform_id)
|
|
if not subscription or not subscription.is_active or not subscription.tier:
|
|
return False
|
|
|
|
return subscription.tier.has_feature(feature_code)
|
|
|
|
def has_feature_for_store(
|
|
self, db: Session, store_id: int, feature_code: str
|
|
) -> bool:
|
|
"""
|
|
Convenience method that resolves the store -> merchant -> platform
|
|
hierarchy and checks whether the merchant has access to a feature.
|
|
|
|
Looks up the store's merchant_id and platform_id, then delegates
|
|
to has_feature().
|
|
|
|
Args:
|
|
db: Database session.
|
|
store_id: The store ID to resolve.
|
|
feature_code: The feature code to check.
|
|
|
|
Returns:
|
|
True if the resolved merchant has access to the feature,
|
|
False if the store/merchant cannot be resolved or lacks access.
|
|
"""
|
|
merchant_id, platform_id = self._get_merchant_for_store(db, store_id)
|
|
if merchant_id is None or platform_id is None:
|
|
return False
|
|
return self.has_feature(db, merchant_id, platform_id, feature_code)
|
|
|
|
def get_merchant_feature_codes(
|
|
self, db: Session, merchant_id: int, platform_id: int
|
|
) -> set[str]:
|
|
"""Get all feature codes available to merchant on a platform."""
|
|
# Check cache
|
|
cached = self._cache.get(merchant_id, platform_id)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
features: set[str] = set()
|
|
|
|
# Get tier features
|
|
subscription = self._get_subscription(db, merchant_id, platform_id)
|
|
if subscription and subscription.is_active and subscription.tier:
|
|
features = subscription.tier.get_feature_codes()
|
|
|
|
# Apply overrides
|
|
overrides = (
|
|
db.query(MerchantFeatureOverride)
|
|
.filter(
|
|
MerchantFeatureOverride.merchant_id == merchant_id,
|
|
MerchantFeatureOverride.platform_id == platform_id,
|
|
)
|
|
.all()
|
|
)
|
|
for override in overrides:
|
|
if override.is_enabled:
|
|
features.add(override.feature_code)
|
|
else:
|
|
features.discard(override.feature_code)
|
|
|
|
self._cache.set(merchant_id, platform_id, features)
|
|
return features
|
|
|
|
# =========================================================================
|
|
# Effective Limits
|
|
# =========================================================================
|
|
|
|
def get_effective_limit(
|
|
self, db: Session, merchant_id: int, platform_id: int, feature_code: str
|
|
) -> int | None:
|
|
"""
|
|
Get the effective limit for a feature (override or tier default).
|
|
|
|
Returns:
|
|
Limit value, or None for unlimited
|
|
"""
|
|
# Check override first
|
|
override = (
|
|
db.query(MerchantFeatureOverride)
|
|
.filter(
|
|
MerchantFeatureOverride.merchant_id == merchant_id,
|
|
MerchantFeatureOverride.platform_id == platform_id,
|
|
MerchantFeatureOverride.feature_code == feature_code,
|
|
)
|
|
.first()
|
|
)
|
|
if override is not None:
|
|
return override.limit_value
|
|
|
|
# Get from tier
|
|
subscription = self._get_subscription(db, merchant_id, platform_id)
|
|
if not subscription or not subscription.tier:
|
|
return 0 # No subscription = no access
|
|
|
|
return subscription.tier.get_limit_for_feature(feature_code)
|
|
|
|
# =========================================================================
|
|
# Resource Limit Checks
|
|
# =========================================================================
|
|
|
|
def check_resource_limit(
|
|
self,
|
|
db: Session,
|
|
feature_code: str,
|
|
store_id: int | None = None,
|
|
merchant_id: int | None = None,
|
|
platform_id: int | None = None,
|
|
) -> tuple[bool, str | None]:
|
|
"""
|
|
Check if a resource limit allows adding more items.
|
|
|
|
Resolves store -> merchant if needed. Gets the declaration to
|
|
determine scope, then checks usage against limit.
|
|
|
|
Args:
|
|
db: Database session
|
|
feature_code: Feature code (e.g., "products_limit")
|
|
store_id: Store ID (if checking per-store)
|
|
merchant_id: Merchant ID (if already known)
|
|
platform_id: Platform ID (if already known)
|
|
|
|
Returns:
|
|
(allowed, error_message) tuple
|
|
"""
|
|
from app.modules.billing.services.feature_aggregator import feature_aggregator
|
|
|
|
# Resolve store -> merchant if needed
|
|
if merchant_id is None and store_id is not None:
|
|
merchant_id, platform_id = self._get_merchant_for_store(db, store_id)
|
|
|
|
if merchant_id is None or platform_id is None:
|
|
return False, "No subscription found"
|
|
|
|
# Check subscription is active
|
|
subscription = self._get_subscription(db, merchant_id, platform_id)
|
|
if not subscription or not subscription.is_active:
|
|
return False, "Subscription is not active"
|
|
|
|
# Get feature declaration
|
|
decl = feature_aggregator.get_declaration(feature_code)
|
|
if decl is None:
|
|
logger.warning(f"Unknown feature code: {feature_code}")
|
|
return True, None # Unknown features are allowed by default
|
|
|
|
# Binary features: just check if enabled
|
|
if decl.feature_type == FeatureType.BINARY:
|
|
if self.has_feature(db, merchant_id, platform_id, feature_code):
|
|
return True, None
|
|
return False, f"Feature '{feature_code}' requires an upgrade"
|
|
|
|
# Quantitative: check usage against limit
|
|
limit = self.get_effective_limit(db, merchant_id, platform_id, feature_code)
|
|
if limit is None: # Unlimited
|
|
return True, None
|
|
|
|
# Get current usage based on scope
|
|
usage = feature_aggregator.get_usage_for_feature(
|
|
db, feature_code,
|
|
store_id=store_id,
|
|
merchant_id=merchant_id,
|
|
platform_id=platform_id,
|
|
)
|
|
current = usage.current_count if usage else 0
|
|
|
|
if current >= limit:
|
|
return False, (
|
|
f"Limit reached ({current}/{limit} {decl.unit_key or feature_code}). "
|
|
f"Upgrade to increase your limit."
|
|
)
|
|
|
|
return True, None
|
|
|
|
# =========================================================================
|
|
# Feature Summary
|
|
# =========================================================================
|
|
|
|
def get_merchant_features_summary(
|
|
self, db: Session, merchant_id: int, platform_id: int
|
|
) -> list[FeatureSummary]:
|
|
"""
|
|
Get complete feature summary for merchant portal display.
|
|
|
|
Returns all features with current status, limits, and usage.
|
|
"""
|
|
from app.modules.billing.services.feature_aggregator import feature_aggregator
|
|
|
|
declarations = feature_aggregator.get_all_declarations()
|
|
merchant_features = self.get_merchant_feature_codes(db, merchant_id, platform_id)
|
|
|
|
# Preload overrides
|
|
overrides = {
|
|
o.feature_code: o
|
|
for o in db.query(MerchantFeatureOverride).filter(
|
|
MerchantFeatureOverride.merchant_id == merchant_id,
|
|
MerchantFeatureOverride.platform_id == platform_id,
|
|
).all()
|
|
}
|
|
|
|
# Get all usage at once
|
|
merchant_usage = feature_aggregator.get_merchant_usage(db, merchant_id, platform_id)
|
|
|
|
summaries = []
|
|
for code, decl in sorted(declarations.items(), key=lambda x: (x[1].category, x[1].display_order)):
|
|
enabled = code in merchant_features
|
|
is_override = code in overrides
|
|
limit = self.get_effective_limit(db, merchant_id, platform_id, code) if decl.feature_type == FeatureType.QUANTITATIVE else None
|
|
|
|
current = None
|
|
remaining = None
|
|
percent_used = None
|
|
|
|
if decl.feature_type == FeatureType.QUANTITATIVE:
|
|
usage_data = merchant_usage.get(code)
|
|
current = usage_data.current_count if usage_data else 0
|
|
if limit is not None:
|
|
remaining = max(0, limit - current)
|
|
percent_used = min(100.0, (current / limit * 100)) if limit > 0 else 0.0
|
|
|
|
summaries.append(FeatureSummary(
|
|
code=code,
|
|
name_key=decl.name_key,
|
|
description_key=decl.description_key,
|
|
category=decl.category,
|
|
feature_type=decl.feature_type.value,
|
|
scope=decl.scope.value,
|
|
enabled=enabled,
|
|
limit=limit,
|
|
current=current,
|
|
remaining=remaining,
|
|
percent_used=percent_used,
|
|
is_override=is_override,
|
|
unit_key=decl.unit_key,
|
|
ui_icon=decl.ui_icon,
|
|
))
|
|
|
|
return summaries
|
|
|
|
# =========================================================================
|
|
# Tier Feature Limit Management
|
|
# =========================================================================
|
|
|
|
def get_tier_feature_limits(self, db: Session, tier_id: int) -> list:
|
|
"""Get feature limits for a tier."""
|
|
return (
|
|
db.query(TierFeatureLimit)
|
|
.filter(TierFeatureLimit.tier_id == tier_id)
|
|
.order_by(TierFeatureLimit.feature_code)
|
|
.all()
|
|
)
|
|
|
|
def upsert_tier_feature_limits(self, db: Session, tier_id: int, entries: list[dict]) -> list:
|
|
"""Replace feature limits for a tier. Returns list of new TierFeatureLimit objects."""
|
|
db.query(TierFeatureLimit).filter(TierFeatureLimit.tier_id == tier_id).delete()
|
|
new_rows = []
|
|
for entry in entries:
|
|
if not entry.get("enabled", True):
|
|
continue
|
|
row = TierFeatureLimit(
|
|
tier_id=tier_id,
|
|
feature_code=entry["feature_code"],
|
|
limit_value=entry.get("limit_value"),
|
|
)
|
|
db.add(row) # noqa: PERF006
|
|
new_rows.append(row)
|
|
return new_rows
|
|
|
|
# =========================================================================
|
|
# Merchant Feature Override Management
|
|
# =========================================================================
|
|
|
|
def get_merchant_overrides(self, db: Session, merchant_id: int) -> list:
|
|
"""Get feature overrides for a merchant."""
|
|
return (
|
|
db.query(MerchantFeatureOverride)
|
|
.filter(MerchantFeatureOverride.merchant_id == merchant_id)
|
|
.order_by(MerchantFeatureOverride.feature_code)
|
|
.all()
|
|
)
|
|
|
|
def upsert_merchant_overrides(
|
|
self, db: Session, merchant_id: int, platform_id: int, entries: list[dict]
|
|
) -> list:
|
|
"""Upsert feature overrides for a merchant."""
|
|
results = []
|
|
for entry in entries:
|
|
existing = (
|
|
db.query(MerchantFeatureOverride)
|
|
.filter(
|
|
MerchantFeatureOverride.merchant_id == merchant_id,
|
|
MerchantFeatureOverride.platform_id == platform_id,
|
|
MerchantFeatureOverride.feature_code == entry["feature_code"],
|
|
)
|
|
.first()
|
|
)
|
|
if existing:
|
|
existing.limit_value = entry.get("limit_value")
|
|
existing.is_enabled = entry.get("is_enabled", True)
|
|
existing.reason = entry.get("reason")
|
|
results.append(existing)
|
|
else:
|
|
row = MerchantFeatureOverride(
|
|
merchant_id=merchant_id,
|
|
platform_id=platform_id,
|
|
feature_code=entry["feature_code"],
|
|
limit_value=entry.get("limit_value"),
|
|
is_enabled=entry.get("is_enabled", True),
|
|
reason=entry.get("reason"),
|
|
)
|
|
db.add(row) # noqa: PERF006
|
|
results.append(row)
|
|
return results
|
|
|
|
# =========================================================================
|
|
# Cache Management
|
|
# =========================================================================
|
|
|
|
def invalidate_cache(self, merchant_id: int, platform_id: int) -> None:
|
|
"""Invalidate cache for a specific merchant/platform."""
|
|
self._cache.invalidate(merchant_id, platform_id)
|
|
|
|
def invalidate_all_cache(self) -> None:
|
|
"""Invalidate entire cache."""
|
|
self._cache.invalidate_all()
|
|
|
|
|
|
# Singleton instance
|
|
feature_service = FeatureService()
|
|
|
|
__all__ = [
|
|
"feature_service",
|
|
"FeatureService",
|
|
"FeatureSummary",
|
|
]
|