diff --git a/app/api/deps.py b/app/api/deps.py index 82a3ca75..84f0577a 100644 --- a/app/api/deps.py +++ b/app/api/deps.py @@ -371,6 +371,221 @@ def get_admin_with_platform_context( return user +# ============================================================================ +# MODULE-BASED ACCESS CONTROL +# ============================================================================ + + +def require_module_access(module_code: str): + """ + Dependency factory for module-based route access control. + + Checks if the specified module is enabled for the current platform. + Use this for routes that should be gated by module enablement but aren't + tied to a specific menu item. + + Usage: + @router.get("/admin/billing/stripe-config") + async def stripe_config( + current_user: User = Depends(require_module_access("billing")), + ): + ... + + Args: + module_code: Module code to check (e.g., "billing", "marketplace") + + Returns: + Dependency function that validates module access and returns User + """ + from app.modules.service import module_service + + def _check_module_access( + request: Request, + credentials: HTTPAuthorizationCredentials | None = Depends(security), + admin_token: str | None = Cookie(None), + vendor_token: str | None = Cookie(None), + db: Session = Depends(get_db), + ) -> User: + # Try admin auth first, then vendor + user = None + platform_id = None + + # Check if this is an admin request + if admin_token or (credentials and request.url.path.startswith("/admin")): + try: + user = get_current_admin_from_cookie_or_header( + request, credentials, admin_token, db + ) + # Get platform context for admin + if user.is_super_admin: + # Super admins bypass module checks + return user + else: + platform = getattr(request.state, "admin_platform", None) + if platform: + platform_id = platform.id + elif hasattr(user, "token_platform_id"): + platform_id = user.token_platform_id + except Exception: + pass + + # Check if this is a vendor request + if not user and (vendor_token or (credentials and "/vendor/" in request.url.path)): + try: + user = get_current_vendor_from_cookie_or_header( + request, credentials, vendor_token, db + ) + # Get platform from vendor context + vendor = getattr(request.state, "vendor", None) + if vendor and hasattr(vendor, "platform_id") and vendor.platform_id: + platform_id = vendor.platform_id + except Exception: + pass + + if not user: + raise InvalidTokenException("Authentication required") + + # If no platform context, allow access (module checking requires platform) + if not platform_id: + logger.debug(f"No platform context for module check: {module_code}") + return user + + # Check if module is enabled + if not module_service.is_module_enabled(db, platform_id, module_code): + logger.warning( + f"Module access denied: {module_code} disabled for " + f"platform_id={platform_id}, user={user.username}" + ) + raise InsufficientPermissionsException( + f"The '{module_code}' module is not enabled for this platform" + ) + + return user + + return _check_module_access + + +# ============================================================================ +# MENU-BASED ACCESS CONTROL +# ============================================================================ + + +def require_menu_access(menu_item_id: str, frontend_type: "FrontendType"): + """ + Dependency factory for menu-based page route access control. + + Checks if the specified menu item is visible/accessible for the current user: + - First checks if the module providing this menu item is enabled + - Then checks visibility configuration (platform or user level) + + Access denied reasons: + - Module disabled: The feature module is not enabled for this platform + - Menu hidden: The menu item is hidden by platform/user configuration + + Usage: + @router.get("/admin/inventory") + async def inventory_page( + current_user: User = Depends( + require_menu_access("inventory", FrontendType.ADMIN) + ), + ): + ... + + Args: + menu_item_id: Menu item identifier from registry + frontend_type: Which frontend (ADMIN or VENDOR) + + Returns: + Dependency function that validates menu access and returns User + """ + from app.modules.registry import get_menu_item_module + from app.modules.service import module_service + from app.services.menu_service import menu_service + from models.database.admin_menu_config import FrontendType as FT + + def _check_menu_access( + request: Request, + credentials: HTTPAuthorizationCredentials | None = Depends(security), + admin_token: str | None = Cookie(None), + vendor_token: str | None = Cookie(None), + db: Session = Depends(get_db), + ) -> User: + # Get current user based on frontend type + if frontend_type == FT.ADMIN: + user = get_current_admin_from_cookie_or_header( + request, credentials, admin_token, db + ) + + if user.is_super_admin: + # Super admin: check user-level config + platform_id = None + user_id = user.id + else: + # Platform admin: need platform context + # Try to get from request state or token + platform = getattr(request.state, "admin_platform", None) + if platform: + platform_id = platform.id + elif hasattr(user, "token_platform_id"): + platform_id = user.token_platform_id + else: + # No platform context - allow access (will be restricted elsewhere) + # This handles routes that don't have platform context yet + return user + user_id = None + + elif frontend_type == FT.VENDOR: + user = get_current_vendor_from_cookie_or_header( + request, credentials, vendor_token, db + ) + + # Vendor: get platform from vendor's platform association + vendor = getattr(request.state, "vendor", None) + if vendor and hasattr(vendor, "platform_id") and vendor.platform_id: + platform_id = vendor.platform_id + else: + # No platform context for vendor - allow access + # This handles edge cases where vendor doesn't have platform + return user + user_id = None + + else: + raise ValueError(f"Unsupported frontend_type: {frontend_type}") + + # First check: Is the module providing this menu item enabled? + if platform_id: + module_code = get_menu_item_module(menu_item_id, frontend_type) + if module_code and not module_service.is_module_enabled(db, platform_id, module_code): + logger.warning( + f"Module access denied: {menu_item_id} (module={module_code}) for " + f"user={user.username}, platform_id={platform_id}" + ) + raise InsufficientPermissionsException( + f"The '{module_code}' module is not enabled for this platform. " + f"Contact your administrator to enable this feature." + ) + + # Second check: Is the menu item visible in configuration? + can_access = menu_service.can_access_menu_item( + db, frontend_type, menu_item_id, platform_id, user_id + ) + + if not can_access: + logger.warning( + f"Menu visibility denied: {menu_item_id} for " + f"user={user.username}, frontend={frontend_type.value}, " + f"platform_id={platform_id}, user_id={user_id}" + ) + raise InsufficientPermissionsException( + f"Access to '{menu_item_id}' has been restricted. " + f"Contact your administrator if you need access." + ) + + return user + + return _check_menu_access + + # ============================================================================ # VENDOR AUTHENTICATION # ============================================================================ diff --git a/tests/unit/services/test_module_service.py b/tests/unit/services/test_module_service.py new file mode 100644 index 00000000..9907bc4a --- /dev/null +++ b/tests/unit/services/test_module_service.py @@ -0,0 +1,397 @@ +# tests/unit/services/test_module_service.py +""" +Unit tests for ModuleService. + +Tests cover: +- Module enablement checking +- Module dependency resolution +- Menu item filtering by modules +- Platform module configuration +""" + +import pytest + +from app.modules import MODULES, ModuleDefinition, module_service +from app.modules.registry import ( + get_core_module_codes, + get_menu_item_module, + get_module, + validate_module_dependencies, +) +from app.modules.service import ModuleService +from models.database.admin_menu_config import FrontendType + + +@pytest.mark.unit +@pytest.mark.service +class TestModuleRegistry: + """Test module registry structure and validation.""" + + def test_all_modules_defined(self): + """Test that expected modules are defined.""" + expected_modules = { + "core", + "platform-admin", + "billing", + "inventory", + "orders", + "marketplace", + "customers", + "cms", + "analytics", + "messaging", + "dev-tools", + "monitoring", + } + assert set(MODULES.keys()) == expected_modules + + def test_core_modules_marked_correctly(self): + """Test that core modules have is_core=True.""" + core_codes = get_core_module_codes() + assert "core" in core_codes + assert "platform-admin" in core_codes + assert len(core_codes) == 2 + + def test_module_dependencies_valid(self): + """Test that all module dependencies reference valid modules.""" + errors = validate_module_dependencies() + assert errors == [], f"Module dependency validation failed: {errors}" + + def test_marketplace_requires_inventory(self): + """Test that marketplace module depends on inventory.""" + marketplace = get_module("marketplace") + assert marketplace is not None + assert "inventory" in marketplace.requires + + def test_core_modules_have_no_dependencies(self): + """Test that core modules don't depend on optional modules.""" + for code in get_core_module_codes(): + module = get_module(code) + assert module is not None + # Core module dependencies should only be other core modules (or empty) + for req in module.requires: + assert req in get_core_module_codes(), ( + f"Core module '{code}' depends on optional module '{req}'" + ) + + +@pytest.mark.unit +@pytest.mark.service +class TestModuleMenuMapping: + """Test menu item to module mapping.""" + + def test_dashboard_maps_to_core(self): + """Test that dashboard menu item maps to core module.""" + module_code = get_menu_item_module("dashboard", FrontendType.ADMIN) + assert module_code == "core" + + def test_billing_items_map_to_billing(self): + """Test that billing menu items map to billing module.""" + billing_items = ["subscription-tiers", "subscriptions", "billing-history"] + for item in billing_items: + module_code = get_menu_item_module(item, FrontendType.ADMIN) + assert module_code == "billing", f"{item} should map to billing, got {module_code}" + + def test_inventory_items_map_to_inventory(self): + """Test that inventory menu items map to inventory module.""" + module_code = get_menu_item_module("inventory", FrontendType.ADMIN) + assert module_code == "inventory" + + def test_marketplace_items_map_to_marketplace(self): + """Test that marketplace menu items map to marketplace module.""" + module_code = get_menu_item_module("marketplace-letzshop", FrontendType.ADMIN) + assert module_code == "marketplace" + + def test_vendor_billing_maps_to_billing(self): + """Test that vendor billing menu item maps to billing module.""" + module_code = get_menu_item_module("billing", FrontendType.VENDOR) + assert module_code == "billing" + + +@pytest.mark.unit +@pytest.mark.service +class TestModuleDefinition: + """Test ModuleDefinition class.""" + + def test_module_definition_equality(self): + """Test that modules are equal by code.""" + mod1 = ModuleDefinition(code="test", name="Test 1") + mod2 = ModuleDefinition(code="test", name="Test 2") + mod3 = ModuleDefinition(code="other", name="Other") + + assert mod1 == mod2 # Same code + assert mod1 != mod3 # Different code + + def test_module_has_feature(self): + """Test has_feature method.""" + module = ModuleDefinition( + code="test", + name="Test", + features=["feature_a", "feature_b"], + ) + assert module.has_feature("feature_a") + assert module.has_feature("feature_b") + assert not module.has_feature("feature_c") + + def test_module_has_menu_item(self): + """Test has_menu_item method.""" + module = ModuleDefinition( + code="test", + name="Test", + menu_items={ + FrontendType.ADMIN: ["item-a", "item-b"], + FrontendType.VENDOR: ["item-c"], + }, + ) + assert module.has_menu_item("item-a") + assert module.has_menu_item("item-c") + assert not module.has_menu_item("item-d") + + def test_module_check_dependencies(self): + """Test check_dependencies method.""" + module = ModuleDefinition( + code="test", + name="Test", + requires=["dep1", "dep2"], + ) + # All deps enabled + assert module.check_dependencies({"dep1", "dep2", "dep3"}) == [] + # Missing dep1 + assert module.check_dependencies({"dep2"}) == ["dep1"] + # Missing both + assert set(module.check_dependencies(set())) == {"dep1", "dep2"} + + +@pytest.mark.unit +@pytest.mark.service +class TestModuleServiceWithPlatform: + """Test ModuleService with database platform.""" + + def test_get_platform_modules_all_enabled(self, db, test_platform): + """Test that all modules are enabled when not configured.""" + # Platform has no enabled_modules setting, so all should be enabled + service = ModuleService() + modules = service.get_platform_modules(db, test_platform.id) + + # Should return all modules + assert len(modules) == len(MODULES) + + def test_get_platform_modules_with_config(self, db, test_platform): + """Test that only configured modules are enabled.""" + # Set enabled_modules in platform settings + test_platform.settings = {"enabled_modules": ["billing", "inventory"]} + db.commit() + + service = ModuleService() + module_codes = service.get_enabled_module_codes(db, test_platform.id) + + # Should include core modules + configured modules + assert "core" in module_codes + assert "platform-admin" in module_codes + assert "billing" in module_codes + assert "inventory" in module_codes + assert "marketplace" not in module_codes # Not configured + + def test_is_module_enabled_core_always_enabled(self, db, test_platform): + """Test that core modules are always enabled.""" + test_platform.settings = {"enabled_modules": []} # Empty list + db.commit() + + service = ModuleService() + assert service.is_module_enabled(db, test_platform.id, "core") + assert service.is_module_enabled(db, test_platform.id, "platform-admin") + + def test_is_module_enabled_optional_disabled(self, db, test_platform): + """Test that optional modules can be disabled.""" + test_platform.settings = {"enabled_modules": ["inventory"]} + db.commit() + + service = ModuleService() + assert service.is_module_enabled(db, test_platform.id, "inventory") + assert not service.is_module_enabled(db, test_platform.id, "billing") + + def test_dependency_resolution(self, db, test_platform): + """Test that enabling marketplace auto-enables inventory.""" + # Enable marketplace but not inventory + test_platform.settings = {"enabled_modules": ["marketplace"]} + db.commit() + + service = ModuleService() + module_codes = service.get_enabled_module_codes(db, test_platform.id) + + # Inventory should be auto-enabled due to marketplace dependency + assert "marketplace" in module_codes + assert "inventory" in module_codes + + def test_get_module_menu_items(self, db, test_platform): + """Test getting menu items for enabled modules.""" + test_platform.settings = {"enabled_modules": ["billing"]} + db.commit() + + service = ModuleService() + menu_items = service.get_module_menu_items(db, test_platform.id, FrontendType.ADMIN) + + # Should include core and billing menu items + assert "dashboard" in menu_items # core + assert "settings" in menu_items # core + assert "subscription-tiers" in menu_items # billing + assert "subscriptions" in menu_items # billing + # Should NOT include disabled module items + assert "marketplace-letzshop" not in menu_items + + def test_is_menu_item_module_enabled(self, db, test_platform): + """Test checking if menu item's module is enabled.""" + test_platform.settings = {"enabled_modules": ["billing"]} + db.commit() + + service = ModuleService() + + # Billing menu item should be enabled + assert service.is_menu_item_module_enabled( + db, test_platform.id, "subscription-tiers", FrontendType.ADMIN + ) + + # Marketplace menu item should be disabled + assert not service.is_menu_item_module_enabled( + db, test_platform.id, "marketplace-letzshop", FrontendType.ADMIN + ) + + def test_filter_menu_items_by_modules(self, db, test_platform): + """Test filtering menu items by enabled modules.""" + test_platform.settings = {"enabled_modules": ["billing"]} + db.commit() + + service = ModuleService() + + # Try to filter a mix of enabled and disabled items + all_items = {"dashboard", "subscription-tiers", "marketplace-letzshop", "inventory"} + filtered = service.filter_menu_items_by_modules( + db, test_platform.id, all_items, FrontendType.ADMIN + ) + + # Should keep core and billing items, remove marketplace and inventory + assert "dashboard" in filtered + assert "subscription-tiers" in filtered + assert "marketplace-letzshop" not in filtered + assert "inventory" not in filtered + + +@pytest.mark.unit +@pytest.mark.service +class TestModuleServiceEnableDisable: + """Test module enable/disable operations.""" + + def test_enable_module(self, db, test_platform): + """Test enabling a module.""" + test_platform.settings = {"enabled_modules": ["billing"]} + db.commit() + + service = ModuleService() + result = service.enable_module(db, test_platform.id, "analytics") + db.commit() + + assert result is True + assert service.is_module_enabled(db, test_platform.id, "analytics") + + def test_disable_module(self, db, test_platform): + """Test disabling a module.""" + test_platform.settings = {"enabled_modules": ["billing", "analytics"]} + db.commit() + + service = ModuleService() + result = service.disable_module(db, test_platform.id, "analytics") + db.commit() + + assert result is True + assert not service.is_module_enabled(db, test_platform.id, "analytics") + assert service.is_module_enabled(db, test_platform.id, "billing") + + def test_cannot_disable_core_module(self, db, test_platform): + """Test that core modules cannot be disabled.""" + service = ModuleService() + result = service.disable_module(db, test_platform.id, "core") + + assert result is False + assert service.is_module_enabled(db, test_platform.id, "core") + + def test_disable_module_cascades_to_dependents(self, db, test_platform): + """Test that disabling a module also disables its dependents.""" + # Enable marketplace (which requires inventory) + test_platform.settings = {"enabled_modules": ["marketplace", "inventory"]} + db.commit() + + service = ModuleService() + # Disable inventory - should also disable marketplace + result = service.disable_module(db, test_platform.id, "inventory") + db.commit() + + assert result is True + assert not service.is_module_enabled(db, test_platform.id, "inventory") + assert not service.is_module_enabled(db, test_platform.id, "marketplace") + + def test_set_enabled_modules(self, db, test_platform): + """Test setting all enabled modules at once.""" + service = ModuleService() + result = service.set_enabled_modules( + db, test_platform.id, ["billing", "inventory", "orders"] + ) + db.commit() + + assert result is True + module_codes = service.get_enabled_module_codes(db, test_platform.id) + + # Should have core + specified modules + assert "core" in module_codes + assert "platform-admin" in module_codes + assert "billing" in module_codes + assert "inventory" in module_codes + assert "orders" in module_codes + assert "marketplace" not in module_codes + + def test_invalid_module_code_ignored(self, db, test_platform): + """Test that invalid module codes are ignored.""" + service = ModuleService() + result = service.set_enabled_modules( + db, test_platform.id, ["billing", "invalid_module", "inventory"] + ) + db.commit() + + assert result is True + module_codes = service.get_enabled_module_codes(db, test_platform.id) + + assert "billing" in module_codes + assert "inventory" in module_codes + assert "invalid_module" not in module_codes + + +@pytest.mark.unit +@pytest.mark.service +class TestModuleServiceByCode: + """Test ModuleService methods that work with platform code.""" + + def test_get_platform_modules_by_code(self, db, test_platform): + """Test getting modules by platform code.""" + service = ModuleService() + modules = service.get_platform_modules_by_code(db, test_platform.code) + + # Should return all modules for platform without config + assert len(modules) == len(MODULES) + + def test_is_module_enabled_by_code(self, db, test_platform): + """Test checking module enablement by platform code.""" + test_platform.settings = {"enabled_modules": ["billing"]} + db.commit() + + service = ModuleService() + + assert service.is_module_enabled_by_code(db, test_platform.code, "billing") + assert service.is_module_enabled_by_code(db, test_platform.code, "core") + assert not service.is_module_enabled_by_code(db, test_platform.code, "marketplace") + + def test_nonexistent_platform_code_returns_all(self, db): + """Test that nonexistent platform code returns all modules.""" + service = ModuleService() + modules = service.get_platform_modules_by_code(db, "nonexistent_platform") + + # Should return all modules as fallback + assert len(modules) == len(MODULES)