feat(dev_tools): add diagnostics hub with permissions audit tool
Evolve the platform-debug page into a diagnostics hub with sidebar explorer layout. Add permissions audit API that introspects all registered page routes and reports auth/permission enforcement status. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,9 @@ Aggregates all admin API routers for the dev-tools module.
|
||||
|
||||
from fastapi import APIRouter
|
||||
|
||||
from app.modules.dev_tools.routes.api.admin_diagnostics import (
|
||||
router as diagnostics_router,
|
||||
)
|
||||
from app.modules.dev_tools.routes.api.admin_platform_debug import (
|
||||
router as platform_debug_router,
|
||||
)
|
||||
@@ -19,3 +22,4 @@ router = APIRouter()
|
||||
router.include_router(sql_query_router, tags=["sql-query"])
|
||||
router.include_router(platform_debug_router, tags=["platform-debug"])
|
||||
router.include_router(translations_router, tags=["translations"])
|
||||
router.include_router(diagnostics_router, tags=["diagnostics"])
|
||||
|
||||
289
app/modules/dev_tools/routes/api/admin_diagnostics.py
Normal file
289
app/modules/dev_tools/routes/api/admin_diagnostics.py
Normal file
@@ -0,0 +1,289 @@
|
||||
# app/modules/dev_tools/routes/api/admin_diagnostics.py
|
||||
"""
|
||||
Diagnostics API endpoints.
|
||||
|
||||
Provides route introspection and permissions audit capabilities
|
||||
for the Diagnostics Hub admin page.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, Depends, Request
|
||||
from fastapi.routing import APIRoute
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.api.deps import get_current_super_admin_api
|
||||
from app.modules.tenancy.schemas.auth import UserContext
|
||||
|
||||
router = APIRouter(prefix="/diagnostics")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Response schemas ──
|
||||
|
||||
|
||||
class RouteAuditEntry(BaseModel):
|
||||
path: str
|
||||
methods: list[str]
|
||||
module: str | None
|
||||
frontend: str | None
|
||||
endpoint_name: str
|
||||
auth_dependency: str | None
|
||||
auth_detail: str | None
|
||||
status: str # "ok", "warning", "error"
|
||||
issue: str | None
|
||||
|
||||
|
||||
class FrontendSummary(BaseModel):
|
||||
total: int = 0
|
||||
ok: int = 0
|
||||
warnings: int = 0
|
||||
errors: int = 0
|
||||
|
||||
|
||||
class AuditSummary(BaseModel):
|
||||
total: int = 0
|
||||
ok: int = 0
|
||||
warnings: int = 0
|
||||
errors: int = 0
|
||||
by_frontend: dict[str, FrontendSummary] = {}
|
||||
|
||||
|
||||
class PermissionsAuditResponse(BaseModel):
|
||||
routes: list[RouteAuditEntry]
|
||||
summary: AuditSummary
|
||||
|
||||
|
||||
# ── Auth dependency names for classification ──
|
||||
|
||||
_OK_DEPENDENCIES = {
|
||||
"require_menu_access",
|
||||
"require_store_page_permission",
|
||||
"get_current_super_admin",
|
||||
"get_current_super_admin_api",
|
||||
}
|
||||
|
||||
_ADMIN_AUTH_DEPENDENCIES = {
|
||||
"get_current_admin_from_cookie_or_header",
|
||||
}
|
||||
|
||||
_STORE_AUTH_DEPENDENCIES = {
|
||||
"get_current_store_from_cookie_or_header",
|
||||
}
|
||||
|
||||
_MERCHANT_AUTH_DEPENDENCIES = {
|
||||
"get_current_merchant_from_cookie_or_header",
|
||||
}
|
||||
|
||||
|
||||
def _classify_frontend(path: str) -> str | None:
|
||||
"""Classify frontend type from route path prefix."""
|
||||
if path.startswith("/admin/"):
|
||||
return "admin"
|
||||
if path.startswith(("/store/", "/store/{")):
|
||||
return "store"
|
||||
if path.startswith(("/merchants/", "/merchants/{")):
|
||||
return "merchant"
|
||||
if path.startswith(("/storefront/", "/storefront/{")):
|
||||
return "storefront"
|
||||
return None
|
||||
|
||||
|
||||
def _infer_module(path: str) -> str | None:
|
||||
"""Infer module name from path segments."""
|
||||
# Strip frontend prefix to get module segment
|
||||
# e.g. /admin/loyalty/programs → loyalty
|
||||
# e.g. /store/{store_code}/loyalty/terminal → loyalty
|
||||
parts = path.strip("/").split("/")
|
||||
if len(parts) < 2:
|
||||
return None
|
||||
|
||||
# Skip the frontend prefix and optional path params
|
||||
start = 1
|
||||
# For store/merchant with {store_code}/{merchant_code} param
|
||||
if len(parts) > 2 and parts[1].startswith("{"):
|
||||
start = 2
|
||||
|
||||
if start < len(parts):
|
||||
candidate = parts[start]
|
||||
# Skip common non-module segments
|
||||
if candidate not in {"dashboard", "settings", "profile", "login", "logout", "api"}:
|
||||
return candidate
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _extract_auth_dependencies(endpoint) -> list[dict]:
|
||||
"""
|
||||
Inspect endpoint function signature to find Depends() parameters
|
||||
and extract auth-related dependency information.
|
||||
"""
|
||||
results = []
|
||||
try:
|
||||
sig = inspect.signature(endpoint)
|
||||
except (ValueError, TypeError):
|
||||
return results
|
||||
|
||||
for _param_name, param in sig.parameters.items():
|
||||
default = param.default
|
||||
if default is inspect.Parameter.empty:
|
||||
continue
|
||||
|
||||
# Check for Depends() instances
|
||||
dep_callable = None
|
||||
if hasattr(default, "dependency"):
|
||||
dep_callable = default.dependency
|
||||
elif callable(default) and not isinstance(default, type):
|
||||
continue # Skip plain callables that aren't Depends
|
||||
|
||||
if dep_callable is None:
|
||||
continue
|
||||
|
||||
dep_name = None
|
||||
dep_detail = None
|
||||
|
||||
# Direct function reference
|
||||
if callable(dep_callable) and hasattr(dep_callable, "__name__"):
|
||||
dep_name = dep_callable.__name__
|
||||
|
||||
# functools.partial (e.g. require_menu_access("id", FrontendType.ADMIN))
|
||||
if hasattr(dep_callable, "func"):
|
||||
dep_name = getattr(dep_callable.func, "__name__", str(dep_callable.func))
|
||||
# Extract args for detail
|
||||
if dep_callable.args:
|
||||
dep_detail = str(dep_callable.args[0])
|
||||
|
||||
# Closure / nested function from factory
|
||||
if dep_name is None and callable(dep_callable):
|
||||
# Try qualname for closures from factories
|
||||
qualname = getattr(dep_callable, "__qualname__", "")
|
||||
if "." in qualname:
|
||||
# e.g. "require_menu_access.<locals>.dependency"
|
||||
factory_name = qualname.split(".")[0]
|
||||
dep_name = factory_name
|
||||
# Try to extract closure variables for detail
|
||||
if hasattr(dep_callable, "__closure__") and dep_callable.__closure__:
|
||||
for cell in dep_callable.__closure__:
|
||||
try:
|
||||
val = cell.cell_contents
|
||||
if isinstance(val, str) and val not in ("admin", "store", "merchant"):
|
||||
dep_detail = val
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if dep_name:
|
||||
results.append({"name": dep_name, "detail": dep_detail})
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _classify_route(path: str, frontend: str | None, auth_deps: list[dict]) -> tuple[str, str | None, str | None, str | None]:
|
||||
"""
|
||||
Classify a route's auth status.
|
||||
|
||||
Returns: (status, auth_dependency, auth_detail, issue)
|
||||
"""
|
||||
if not auth_deps:
|
||||
return "error", None, None, "No authentication dependency found"
|
||||
|
||||
# Check each dependency against classification rules
|
||||
for dep in auth_deps:
|
||||
name = dep["name"]
|
||||
detail = dep["detail"]
|
||||
|
||||
if name in _OK_DEPENDENCIES:
|
||||
return "ok", name, detail, None
|
||||
|
||||
if name in _ADMIN_AUTH_DEPENDENCIES and frontend == "admin":
|
||||
return "ok", name, detail, None
|
||||
|
||||
# If we get here, check for auth-only deps (warnings)
|
||||
for dep in auth_deps:
|
||||
name = dep["name"]
|
||||
detail = dep["detail"]
|
||||
|
||||
if name in _STORE_AUTH_DEPENDENCIES and frontend == "store":
|
||||
return "warning", name, detail, "Authentication only \u2014 no menu access or permission check"
|
||||
|
||||
if name in _MERCHANT_AUTH_DEPENDENCIES and frontend == "merchant":
|
||||
return "warning", name, detail, "Authentication only \u2014 no menu access check"
|
||||
|
||||
if name in _ADMIN_AUTH_DEPENDENCIES:
|
||||
return "warning", name, detail, "Admin auth on non-admin route"
|
||||
|
||||
# Has some dependency but not recognized
|
||||
first = auth_deps[0]
|
||||
return "warning", first["name"], first["detail"], f"Unrecognized auth dependency: {first['name']}"
|
||||
|
||||
|
||||
@router.get("/permissions-audit", response_model=PermissionsAuditResponse)
|
||||
def permissions_audit(
|
||||
request: Request,
|
||||
current_user: UserContext = Depends(get_current_super_admin_api),
|
||||
):
|
||||
"""
|
||||
Introspect all registered page routes and audit their auth/permission enforcement.
|
||||
|
||||
Examines each route's endpoint function signature to find Depends() parameters
|
||||
and classify the level of authentication and authorization applied.
|
||||
"""
|
||||
audit_routes: list[RouteAuditEntry] = []
|
||||
|
||||
for route in request.app.routes:
|
||||
if not isinstance(route, APIRoute):
|
||||
continue
|
||||
|
||||
# Filter to page routes only (include_in_schema=False signals HTML page routes)
|
||||
if route.include_in_schema:
|
||||
continue
|
||||
|
||||
path = route.path
|
||||
methods = sorted(route.methods - {"HEAD", "OPTIONS"}) if route.methods else []
|
||||
frontend = _classify_frontend(path)
|
||||
module = _infer_module(path)
|
||||
endpoint_name = getattr(route.endpoint, "__name__", str(route.endpoint))
|
||||
|
||||
# Extract and classify auth dependencies
|
||||
auth_deps = _extract_auth_dependencies(route.endpoint)
|
||||
status, auth_dep, auth_detail, issue = _classify_route(path, frontend, auth_deps)
|
||||
|
||||
audit_routes.append(RouteAuditEntry(
|
||||
path=path,
|
||||
methods=methods,
|
||||
module=module,
|
||||
frontend=frontend,
|
||||
endpoint_name=endpoint_name,
|
||||
auth_dependency=auth_dep,
|
||||
auth_detail=auth_detail,
|
||||
status=status,
|
||||
issue=issue,
|
||||
))
|
||||
|
||||
# Sort by frontend, then path
|
||||
audit_routes.sort(key=lambda r: (r.frontend or "", r.path))
|
||||
|
||||
# Build summary
|
||||
summary = AuditSummary(
|
||||
total=len(audit_routes),
|
||||
ok=sum(1 for r in audit_routes if r.status == "ok"),
|
||||
warnings=sum(1 for r in audit_routes if r.status == "warning"),
|
||||
errors=sum(1 for r in audit_routes if r.status == "error"),
|
||||
by_frontend={},
|
||||
)
|
||||
|
||||
for r in audit_routes:
|
||||
fe = r.frontend or "other"
|
||||
if fe not in summary.by_frontend:
|
||||
summary.by_frontend[fe] = FrontendSummary()
|
||||
fe_summary = summary.by_frontend[fe]
|
||||
fe_summary.total += 1
|
||||
if r.status == "ok":
|
||||
fe_summary.ok += 1
|
||||
elif r.status == "warning":
|
||||
fe_summary.warnings += 1
|
||||
elif r.status == "error":
|
||||
fe_summary.errors += 1
|
||||
|
||||
return PermissionsAuditResponse(routes=audit_routes, summary=summary)
|
||||
Reference in New Issue
Block a user