# app/core/feature_gate.py """ Feature gating decorator and dependencies for tier-based access control. Provides: - @require_feature decorator for endpoints - RequireFeature dependency for flexible usage - FeatureNotAvailableError exception with upgrade info Usage: # As decorator (simple) @router.get("/analytics") @require_feature(FeatureCode.ANALYTICS_DASHBOARD) def get_analytics(...): ... # As dependency (more control) @router.get("/analytics") def get_analytics( _: None = Depends(RequireFeature(FeatureCode.ANALYTICS_DASHBOARD)), ... ): ... # Multiple features (any one required) @require_feature(FeatureCode.ANALYTICS_DASHBOARD, FeatureCode.BASIC_REPORTS) def get_reports(...): ... """ import functools import logging from typing import Callable from fastapi import Depends, HTTPException, Request from sqlalchemy.orm import Session from app.api.deps import get_current_vendor_api from app.core.database import get_db from app.services.feature_service import feature_service from models.database.feature import FeatureCode from models.database.user import User logger = logging.getLogger(__name__) class FeatureNotAvailableError(HTTPException): """ Exception raised when a feature is not available for the vendor's tier. Includes upgrade information for the frontend to display. """ def __init__( self, feature_code: str, feature_name: str | None = None, required_tier_code: str | None = None, required_tier_name: str | None = None, required_tier_price_cents: int | None = None, ): detail = { "error": "feature_not_available", "message": f"This feature requires an upgrade to access.", "feature_code": feature_code, "feature_name": feature_name, "upgrade": { "tier_code": required_tier_code, "tier_name": required_tier_name, "price_monthly_cents": required_tier_price_cents, } if required_tier_code else None, } super().__init__(status_code=403, detail=detail) class RequireFeature: """ Dependency class that checks if vendor has access to a feature. Can be used as a FastAPI dependency: @router.get("/analytics") def get_analytics( _: None = Depends(RequireFeature("analytics_dashboard")), current_user: User = Depends(get_current_vendor_api), db: Session = Depends(get_db), ): ... Args: *feature_codes: One or more feature codes. Access granted if ANY is available. """ def __init__(self, *feature_codes: str): if not feature_codes: raise ValueError("At least one feature code is required") self.feature_codes = feature_codes def __call__( self, current_user: User = Depends(get_current_vendor_api), db: Session = Depends(get_db), ) -> None: """Check if vendor has access to any of the required features.""" vendor_id = current_user.token_vendor_id # Check if vendor has ANY of the required features for feature_code in self.feature_codes: if feature_service.has_feature(db, vendor_id, feature_code): return None # None of the features are available - get upgrade info for first one feature_code = self.feature_codes[0] upgrade_info = feature_service.get_feature_upgrade_info(db, feature_code) if upgrade_info: raise FeatureNotAvailableError( feature_code=feature_code, feature_name=upgrade_info.feature_name, required_tier_code=upgrade_info.required_tier_code, required_tier_name=upgrade_info.required_tier_name, required_tier_price_cents=upgrade_info.required_tier_price_monthly_cents, ) else: # Feature not found in registry raise FeatureNotAvailableError(feature_code=feature_code) def require_feature(*feature_codes: str) -> Callable: """ Decorator to require one or more features for an endpoint. The decorated endpoint will return 403 with upgrade info if the vendor doesn't have access to ANY of the specified features. Args: *feature_codes: One or more feature codes. Access granted if ANY is available. Example: @router.get("/analytics/dashboard") @require_feature(FeatureCode.ANALYTICS_DASHBOARD) async def get_analytics_dashboard( current_user: User = Depends(get_current_vendor_api), db: Session = Depends(get_db), ): ... # Multiple features (any one is sufficient) @router.get("/reports") @require_feature(FeatureCode.ANALYTICS_DASHBOARD, FeatureCode.BASIC_REPORTS) async def get_reports(...): ... """ if not feature_codes: raise ValueError("At least one feature code is required") def decorator(func: Callable) -> Callable: @functools.wraps(func) async def async_wrapper(*args, **kwargs): # Extract dependencies from kwargs db = kwargs.get("db") current_user = kwargs.get("current_user") if not db or not current_user: # Try to get from request if not in kwargs request = kwargs.get("request") if request and hasattr(request, "state"): db = getattr(request.state, "db", None) current_user = getattr(request.state, "user", None) if not db or not current_user: raise HTTPException( status_code=500, detail="Feature check failed: missing db or current_user dependency", ) vendor_id = current_user.token_vendor_id # Check if vendor has ANY of the required features for feature_code in feature_codes: if feature_service.has_feature(db, vendor_id, feature_code): return await func(*args, **kwargs) # None available - raise with upgrade info feature_code = feature_codes[0] upgrade_info = feature_service.get_feature_upgrade_info(db, feature_code) if upgrade_info: raise FeatureNotAvailableError( feature_code=feature_code, feature_name=upgrade_info.feature_name, required_tier_code=upgrade_info.required_tier_code, required_tier_name=upgrade_info.required_tier_name, required_tier_price_cents=upgrade_info.required_tier_price_monthly_cents, ) else: raise FeatureNotAvailableError(feature_code=feature_code) @functools.wraps(func) def sync_wrapper(*args, **kwargs): # Extract dependencies from kwargs db = kwargs.get("db") current_user = kwargs.get("current_user") if not db or not current_user: raise HTTPException( status_code=500, detail="Feature check failed: missing db or current_user dependency", ) vendor_id = current_user.token_vendor_id # Check if vendor has ANY of the required features for feature_code in feature_codes: if feature_service.has_feature(db, vendor_id, feature_code): return func(*args, **kwargs) # None available - raise with upgrade info feature_code = feature_codes[0] upgrade_info = feature_service.get_feature_upgrade_info(db, feature_code) if upgrade_info: raise FeatureNotAvailableError( feature_code=feature_code, feature_name=upgrade_info.feature_name, required_tier_code=upgrade_info.required_tier_code, required_tier_name=upgrade_info.required_tier_name, required_tier_price_cents=upgrade_info.required_tier_price_monthly_cents, ) else: raise FeatureNotAvailableError(feature_code=feature_code) # Return appropriate wrapper based on whether func is async import asyncio if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator # ============================================================================ # Convenience Exports # ============================================================================ __all__ = [ "require_feature", "RequireFeature", "FeatureNotAvailableError", "FeatureCode", ]