feat: implement module-based access control (Phase 2)

Add route-level module checking and comprehensive tests.

Dependencies:
- require_module_access(): Direct module check for routes
- Updated require_menu_access(): Check module before visibility
- Clear error messages for module vs visibility restrictions

Tests (31 tests, all passing):
- Module registry validation
- Menu item to module mapping
- ModuleDefinition class methods
- Module enablement with platform config
- Dependency resolution (marketplace→inventory)
- Enable/disable operations with cascading
- Platform code-based lookups

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-25 21:48:58 +01:00
parent 5be42c5907
commit cd65a5992d
2 changed files with 612 additions and 0 deletions

View File

@@ -371,6 +371,221 @@ def get_admin_with_platform_context(
return user
# ============================================================================
# MODULE-BASED ACCESS CONTROL
# ============================================================================
def require_module_access(module_code: str):
"""
Dependency factory for module-based route access control.
Checks if the specified module is enabled for the current platform.
Use this for routes that should be gated by module enablement but aren't
tied to a specific menu item.
Usage:
@router.get("/admin/billing/stripe-config")
async def stripe_config(
current_user: User = Depends(require_module_access("billing")),
):
...
Args:
module_code: Module code to check (e.g., "billing", "marketplace")
Returns:
Dependency function that validates module access and returns User
"""
from app.modules.service import module_service
def _check_module_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
# Try admin auth first, then vendor
user = None
platform_id = None
# Check if this is an admin request
if admin_token or (credentials and request.url.path.startswith("/admin")):
try:
user = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
# Get platform context for admin
if user.is_super_admin:
# Super admins bypass module checks
return user
else:
platform = getattr(request.state, "admin_platform", None)
if platform:
platform_id = platform.id
elif hasattr(user, "token_platform_id"):
platform_id = user.token_platform_id
except Exception:
pass
# Check if this is a vendor request
if not user and (vendor_token or (credentials and "/vendor/" in request.url.path)):
try:
user = get_current_vendor_from_cookie_or_header(
request, credentials, vendor_token, db
)
# Get platform from vendor context
vendor = getattr(request.state, "vendor", None)
if vendor and hasattr(vendor, "platform_id") and vendor.platform_id:
platform_id = vendor.platform_id
except Exception:
pass
if not user:
raise InvalidTokenException("Authentication required")
# If no platform context, allow access (module checking requires platform)
if not platform_id:
logger.debug(f"No platform context for module check: {module_code}")
return user
# Check if module is enabled
if not module_service.is_module_enabled(db, platform_id, module_code):
logger.warning(
f"Module access denied: {module_code} disabled for "
f"platform_id={platform_id}, user={user.username}"
)
raise InsufficientPermissionsException(
f"The '{module_code}' module is not enabled for this platform"
)
return user
return _check_module_access
# ============================================================================
# MENU-BASED ACCESS CONTROL
# ============================================================================
def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"):
"""
Dependency factory for menu-based page route access control.
Checks if the specified menu item is visible/accessible for the current user:
- First checks if the module providing this menu item is enabled
- Then checks visibility configuration (platform or user level)
Access denied reasons:
- Module disabled: The feature module is not enabled for this platform
- Menu hidden: The menu item is hidden by platform/user configuration
Usage:
@router.get("/admin/inventory")
async def inventory_page(
current_user: User = Depends(
require_menu_access("inventory", FrontendType.ADMIN)
),
):
...
Args:
menu_item_id: Menu item identifier from registry
frontend_type: Which frontend (ADMIN or VENDOR)
Returns:
Dependency function that validates menu access and returns User
"""
from app.modules.registry import get_menu_item_module
from app.modules.service import module_service
from app.services.menu_service import menu_service
from models.database.admin_menu_config import FrontendType as FT
def _check_menu_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
vendor_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
# Get current user based on frontend type
if frontend_type == FT.ADMIN:
user = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
if user.is_super_admin:
# Super admin: check user-level config
platform_id = None
user_id = user.id
else:
# Platform admin: need platform context
# Try to get from request state or token
platform = getattr(request.state, "admin_platform", None)
if platform:
platform_id = platform.id
elif hasattr(user, "token_platform_id"):
platform_id = user.token_platform_id
else:
# No platform context - allow access (will be restricted elsewhere)
# This handles routes that don't have platform context yet
return user
user_id = None
elif frontend_type == FT.VENDOR:
user = get_current_vendor_from_cookie_or_header(
request, credentials, vendor_token, db
)
# Vendor: get platform from vendor's platform association
vendor = getattr(request.state, "vendor", None)
if vendor and hasattr(vendor, "platform_id") and vendor.platform_id:
platform_id = vendor.platform_id
else:
# No platform context for vendor - allow access
# This handles edge cases where vendor doesn't have platform
return user
user_id = None
else:
raise ValueError(f"Unsupported frontend_type: {frontend_type}")
# First check: Is the module providing this menu item enabled?
if platform_id:
module_code = get_menu_item_module(menu_item_id, frontend_type)
if module_code and not module_service.is_module_enabled(db, platform_id, module_code):
logger.warning(
f"Module access denied: {menu_item_id} (module={module_code}) for "
f"user={user.username}, platform_id={platform_id}"
)
raise InsufficientPermissionsException(
f"The '{module_code}' module is not enabled for this platform. "
f"Contact your administrator to enable this feature."
)
# Second check: Is the menu item visible in configuration?
can_access = menu_service.can_access_menu_item(
db, frontend_type, menu_item_id, platform_id, user_id
)
if not can_access:
logger.warning(
f"Menu visibility denied: {menu_item_id} for "
f"user={user.username}, frontend={frontend_type.value}, "
f"platform_id={platform_id}, user_id={user_id}"
)
raise InsufficientPermissionsException(
f"Access to '{menu_item_id}' has been restricted. "
f"Contact your administrator if you need access."
)
return user
return _check_menu_access
# ============================================================================
# VENDOR AUTHENTICATION
# ============================================================================

View File

@@ -0,0 +1,397 @@
# tests/unit/services/test_module_service.py
"""
Unit tests for ModuleService.
Tests cover:
- Module enablement checking
- Module dependency resolution
- Menu item filtering by modules
- Platform module configuration
"""
import pytest
from app.modules import MODULES, ModuleDefinition, module_service
from app.modules.registry import (
get_core_module_codes,
get_menu_item_module,
get_module,
validate_module_dependencies,
)
from app.modules.service import ModuleService
from models.database.admin_menu_config import FrontendType
@pytest.mark.unit
@pytest.mark.service
class TestModuleRegistry:
"""Test module registry structure and validation."""
def test_all_modules_defined(self):
"""Test that expected modules are defined."""
expected_modules = {
"core",
"platform-admin",
"billing",
"inventory",
"orders",
"marketplace",
"customers",
"cms",
"analytics",
"messaging",
"dev-tools",
"monitoring",
}
assert set(MODULES.keys()) == expected_modules
def test_core_modules_marked_correctly(self):
"""Test that core modules have is_core=True."""
core_codes = get_core_module_codes()
assert "core" in core_codes
assert "platform-admin" in core_codes
assert len(core_codes) == 2
def test_module_dependencies_valid(self):
"""Test that all module dependencies reference valid modules."""
errors = validate_module_dependencies()
assert errors == [], f"Module dependency validation failed: {errors}"
def test_marketplace_requires_inventory(self):
"""Test that marketplace module depends on inventory."""
marketplace = get_module("marketplace")
assert marketplace is not None
assert "inventory" in marketplace.requires
def test_core_modules_have_no_dependencies(self):
"""Test that core modules don't depend on optional modules."""
for code in get_core_module_codes():
module = get_module(code)
assert module is not None
# Core module dependencies should only be other core modules (or empty)
for req in module.requires:
assert req in get_core_module_codes(), (
f"Core module '{code}' depends on optional module '{req}'"
)
@pytest.mark.unit
@pytest.mark.service
class TestModuleMenuMapping:
"""Test menu item to module mapping."""
def test_dashboard_maps_to_core(self):
"""Test that dashboard menu item maps to core module."""
module_code = get_menu_item_module("dashboard", FrontendType.ADMIN)
assert module_code == "core"
def test_billing_items_map_to_billing(self):
"""Test that billing menu items map to billing module."""
billing_items = ["subscription-tiers", "subscriptions", "billing-history"]
for item in billing_items:
module_code = get_menu_item_module(item, FrontendType.ADMIN)
assert module_code == "billing", f"{item} should map to billing, got {module_code}"
def test_inventory_items_map_to_inventory(self):
"""Test that inventory menu items map to inventory module."""
module_code = get_menu_item_module("inventory", FrontendType.ADMIN)
assert module_code == "inventory"
def test_marketplace_items_map_to_marketplace(self):
"""Test that marketplace menu items map to marketplace module."""
module_code = get_menu_item_module("marketplace-letzshop", FrontendType.ADMIN)
assert module_code == "marketplace"
def test_vendor_billing_maps_to_billing(self):
"""Test that vendor billing menu item maps to billing module."""
module_code = get_menu_item_module("billing", FrontendType.VENDOR)
assert module_code == "billing"
@pytest.mark.unit
@pytest.mark.service
class TestModuleDefinition:
"""Test ModuleDefinition class."""
def test_module_definition_equality(self):
"""Test that modules are equal by code."""
mod1 = ModuleDefinition(code="test", name="Test 1")
mod2 = ModuleDefinition(code="test", name="Test 2")
mod3 = ModuleDefinition(code="other", name="Other")
assert mod1 == mod2 # Same code
assert mod1 != mod3 # Different code
def test_module_has_feature(self):
"""Test has_feature method."""
module = ModuleDefinition(
code="test",
name="Test",
features=["feature_a", "feature_b"],
)
assert module.has_feature("feature_a")
assert module.has_feature("feature_b")
assert not module.has_feature("feature_c")
def test_module_has_menu_item(self):
"""Test has_menu_item method."""
module = ModuleDefinition(
code="test",
name="Test",
menu_items={
FrontendType.ADMIN: ["item-a", "item-b"],
FrontendType.VENDOR: ["item-c"],
},
)
assert module.has_menu_item("item-a")
assert module.has_menu_item("item-c")
assert not module.has_menu_item("item-d")
def test_module_check_dependencies(self):
"""Test check_dependencies method."""
module = ModuleDefinition(
code="test",
name="Test",
requires=["dep1", "dep2"],
)
# All deps enabled
assert module.check_dependencies({"dep1", "dep2", "dep3"}) == []
# Missing dep1
assert module.check_dependencies({"dep2"}) == ["dep1"]
# Missing both
assert set(module.check_dependencies(set())) == {"dep1", "dep2"}
@pytest.mark.unit
@pytest.mark.service
class TestModuleServiceWithPlatform:
"""Test ModuleService with database platform."""
def test_get_platform_modules_all_enabled(self, db, test_platform):
"""Test that all modules are enabled when not configured."""
# Platform has no enabled_modules setting, so all should be enabled
service = ModuleService()
modules = service.get_platform_modules(db, test_platform.id)
# Should return all modules
assert len(modules) == len(MODULES)
def test_get_platform_modules_with_config(self, db, test_platform):
"""Test that only configured modules are enabled."""
# Set enabled_modules in platform settings
test_platform.settings = {"enabled_modules": ["billing", "inventory"]}
db.commit()
service = ModuleService()
module_codes = service.get_enabled_module_codes(db, test_platform.id)
# Should include core modules + configured modules
assert "core" in module_codes
assert "platform-admin" in module_codes
assert "billing" in module_codes
assert "inventory" in module_codes
assert "marketplace" not in module_codes # Not configured
def test_is_module_enabled_core_always_enabled(self, db, test_platform):
"""Test that core modules are always enabled."""
test_platform.settings = {"enabled_modules": []} # Empty list
db.commit()
service = ModuleService()
assert service.is_module_enabled(db, test_platform.id, "core")
assert service.is_module_enabled(db, test_platform.id, "platform-admin")
def test_is_module_enabled_optional_disabled(self, db, test_platform):
"""Test that optional modules can be disabled."""
test_platform.settings = {"enabled_modules": ["inventory"]}
db.commit()
service = ModuleService()
assert service.is_module_enabled(db, test_platform.id, "inventory")
assert not service.is_module_enabled(db, test_platform.id, "billing")
def test_dependency_resolution(self, db, test_platform):
"""Test that enabling marketplace auto-enables inventory."""
# Enable marketplace but not inventory
test_platform.settings = {"enabled_modules": ["marketplace"]}
db.commit()
service = ModuleService()
module_codes = service.get_enabled_module_codes(db, test_platform.id)
# Inventory should be auto-enabled due to marketplace dependency
assert "marketplace" in module_codes
assert "inventory" in module_codes
def test_get_module_menu_items(self, db, test_platform):
"""Test getting menu items for enabled modules."""
test_platform.settings = {"enabled_modules": ["billing"]}
db.commit()
service = ModuleService()
menu_items = service.get_module_menu_items(db, test_platform.id, FrontendType.ADMIN)
# Should include core and billing menu items
assert "dashboard" in menu_items # core
assert "settings" in menu_items # core
assert "subscription-tiers" in menu_items # billing
assert "subscriptions" in menu_items # billing
# Should NOT include disabled module items
assert "marketplace-letzshop" not in menu_items
def test_is_menu_item_module_enabled(self, db, test_platform):
"""Test checking if menu item's module is enabled."""
test_platform.settings = {"enabled_modules": ["billing"]}
db.commit()
service = ModuleService()
# Billing menu item should be enabled
assert service.is_menu_item_module_enabled(
db, test_platform.id, "subscription-tiers", FrontendType.ADMIN
)
# Marketplace menu item should be disabled
assert not service.is_menu_item_module_enabled(
db, test_platform.id, "marketplace-letzshop", FrontendType.ADMIN
)
def test_filter_menu_items_by_modules(self, db, test_platform):
"""Test filtering menu items by enabled modules."""
test_platform.settings = {"enabled_modules": ["billing"]}
db.commit()
service = ModuleService()
# Try to filter a mix of enabled and disabled items
all_items = {"dashboard", "subscription-tiers", "marketplace-letzshop", "inventory"}
filtered = service.filter_menu_items_by_modules(
db, test_platform.id, all_items, FrontendType.ADMIN
)
# Should keep core and billing items, remove marketplace and inventory
assert "dashboard" in filtered
assert "subscription-tiers" in filtered
assert "marketplace-letzshop" not in filtered
assert "inventory" not in filtered
@pytest.mark.unit
@pytest.mark.service
class TestModuleServiceEnableDisable:
"""Test module enable/disable operations."""
def test_enable_module(self, db, test_platform):
"""Test enabling a module."""
test_platform.settings = {"enabled_modules": ["billing"]}
db.commit()
service = ModuleService()
result = service.enable_module(db, test_platform.id, "analytics")
db.commit()
assert result is True
assert service.is_module_enabled(db, test_platform.id, "analytics")
def test_disable_module(self, db, test_platform):
"""Test disabling a module."""
test_platform.settings = {"enabled_modules": ["billing", "analytics"]}
db.commit()
service = ModuleService()
result = service.disable_module(db, test_platform.id, "analytics")
db.commit()
assert result is True
assert not service.is_module_enabled(db, test_platform.id, "analytics")
assert service.is_module_enabled(db, test_platform.id, "billing")
def test_cannot_disable_core_module(self, db, test_platform):
"""Test that core modules cannot be disabled."""
service = ModuleService()
result = service.disable_module(db, test_platform.id, "core")
assert result is False
assert service.is_module_enabled(db, test_platform.id, "core")
def test_disable_module_cascades_to_dependents(self, db, test_platform):
"""Test that disabling a module also disables its dependents."""
# Enable marketplace (which requires inventory)
test_platform.settings = {"enabled_modules": ["marketplace", "inventory"]}
db.commit()
service = ModuleService()
# Disable inventory - should also disable marketplace
result = service.disable_module(db, test_platform.id, "inventory")
db.commit()
assert result is True
assert not service.is_module_enabled(db, test_platform.id, "inventory")
assert not service.is_module_enabled(db, test_platform.id, "marketplace")
def test_set_enabled_modules(self, db, test_platform):
"""Test setting all enabled modules at once."""
service = ModuleService()
result = service.set_enabled_modules(
db, test_platform.id, ["billing", "inventory", "orders"]
)
db.commit()
assert result is True
module_codes = service.get_enabled_module_codes(db, test_platform.id)
# Should have core + specified modules
assert "core" in module_codes
assert "platform-admin" in module_codes
assert "billing" in module_codes
assert "inventory" in module_codes
assert "orders" in module_codes
assert "marketplace" not in module_codes
def test_invalid_module_code_ignored(self, db, test_platform):
"""Test that invalid module codes are ignored."""
service = ModuleService()
result = service.set_enabled_modules(
db, test_platform.id, ["billing", "invalid_module", "inventory"]
)
db.commit()
assert result is True
module_codes = service.get_enabled_module_codes(db, test_platform.id)
assert "billing" in module_codes
assert "inventory" in module_codes
assert "invalid_module" not in module_codes
@pytest.mark.unit
@pytest.mark.service
class TestModuleServiceByCode:
"""Test ModuleService methods that work with platform code."""
def test_get_platform_modules_by_code(self, db, test_platform):
"""Test getting modules by platform code."""
service = ModuleService()
modules = service.get_platform_modules_by_code(db, test_platform.code)
# Should return all modules for platform without config
assert len(modules) == len(MODULES)
def test_is_module_enabled_by_code(self, db, test_platform):
"""Test checking module enablement by platform code."""
test_platform.settings = {"enabled_modules": ["billing"]}
db.commit()
service = ModuleService()
assert service.is_module_enabled_by_code(db, test_platform.code, "billing")
assert service.is_module_enabled_by_code(db, test_platform.code, "core")
assert not service.is_module_enabled_by_code(db, test_platform.code, "marketplace")
def test_nonexistent_platform_code_returns_all(self, db):
"""Test that nonexistent platform code returns all modules."""
service = ModuleService()
modules = service.get_platform_modules_by_code(db, "nonexistent_platform")
# Should return all modules as fallback
assert len(modules) == len(MODULES)