Evolve the platform-debug page into a diagnostics hub with sidebar explorer layout. Add permissions audit API that introspects all registered page routes and reports auth/permission enforcement status. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
290 lines
9.1 KiB
Python
290 lines
9.1 KiB
Python
# app/modules/dev_tools/routes/api/admin_diagnostics.py
|
|
"""
|
|
Diagnostics API endpoints.
|
|
|
|
Provides route introspection and permissions audit capabilities
|
|
for the Diagnostics Hub admin page.
|
|
"""
|
|
|
|
import inspect
|
|
import logging
|
|
|
|
from fastapi import APIRouter, Depends, Request
|
|
from fastapi.routing import APIRoute
|
|
from pydantic import BaseModel
|
|
|
|
from app.api.deps import get_current_super_admin_api
|
|
from app.modules.tenancy.schemas.auth import UserContext
|
|
|
|
router = APIRouter(prefix="/diagnostics")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ── Response schemas ──
|
|
|
|
|
|
class RouteAuditEntry(BaseModel):
|
|
path: str
|
|
methods: list[str]
|
|
module: str | None
|
|
frontend: str | None
|
|
endpoint_name: str
|
|
auth_dependency: str | None
|
|
auth_detail: str | None
|
|
status: str # "ok", "warning", "error"
|
|
issue: str | None
|
|
|
|
|
|
class FrontendSummary(BaseModel):
|
|
total: int = 0
|
|
ok: int = 0
|
|
warnings: int = 0
|
|
errors: int = 0
|
|
|
|
|
|
class AuditSummary(BaseModel):
|
|
total: int = 0
|
|
ok: int = 0
|
|
warnings: int = 0
|
|
errors: int = 0
|
|
by_frontend: dict[str, FrontendSummary] = {}
|
|
|
|
|
|
class PermissionsAuditResponse(BaseModel):
|
|
routes: list[RouteAuditEntry]
|
|
summary: AuditSummary
|
|
|
|
|
|
# ── Auth dependency names for classification ──
|
|
|
|
_OK_DEPENDENCIES = {
|
|
"require_menu_access",
|
|
"require_store_page_permission",
|
|
"get_current_super_admin",
|
|
"get_current_super_admin_api",
|
|
}
|
|
|
|
_ADMIN_AUTH_DEPENDENCIES = {
|
|
"get_current_admin_from_cookie_or_header",
|
|
}
|
|
|
|
_STORE_AUTH_DEPENDENCIES = {
|
|
"get_current_store_from_cookie_or_header",
|
|
}
|
|
|
|
_MERCHANT_AUTH_DEPENDENCIES = {
|
|
"get_current_merchant_from_cookie_or_header",
|
|
}
|
|
|
|
|
|
def _classify_frontend(path: str) -> str | None:
|
|
"""Classify frontend type from route path prefix."""
|
|
if path.startswith("/admin/"):
|
|
return "admin"
|
|
if path.startswith(("/store/", "/store/{")):
|
|
return "store"
|
|
if path.startswith(("/merchants/", "/merchants/{")):
|
|
return "merchant"
|
|
if path.startswith(("/storefront/", "/storefront/{")):
|
|
return "storefront"
|
|
return None
|
|
|
|
|
|
def _infer_module(path: str) -> str | None:
|
|
"""Infer module name from path segments."""
|
|
# Strip frontend prefix to get module segment
|
|
# e.g. /admin/loyalty/programs → loyalty
|
|
# e.g. /store/{store_code}/loyalty/terminal → loyalty
|
|
parts = path.strip("/").split("/")
|
|
if len(parts) < 2:
|
|
return None
|
|
|
|
# Skip the frontend prefix and optional path params
|
|
start = 1
|
|
# For store/merchant with {store_code}/{merchant_code} param
|
|
if len(parts) > 2 and parts[1].startswith("{"):
|
|
start = 2
|
|
|
|
if start < len(parts):
|
|
candidate = parts[start]
|
|
# Skip common non-module segments
|
|
if candidate not in {"dashboard", "settings", "profile", "login", "logout", "api"}:
|
|
return candidate
|
|
|
|
return None
|
|
|
|
|
|
def _extract_auth_dependencies(endpoint) -> list[dict]:
|
|
"""
|
|
Inspect endpoint function signature to find Depends() parameters
|
|
and extract auth-related dependency information.
|
|
"""
|
|
results = []
|
|
try:
|
|
sig = inspect.signature(endpoint)
|
|
except (ValueError, TypeError):
|
|
return results
|
|
|
|
for _param_name, param in sig.parameters.items():
|
|
default = param.default
|
|
if default is inspect.Parameter.empty:
|
|
continue
|
|
|
|
# Check for Depends() instances
|
|
dep_callable = None
|
|
if hasattr(default, "dependency"):
|
|
dep_callable = default.dependency
|
|
elif callable(default) and not isinstance(default, type):
|
|
continue # Skip plain callables that aren't Depends
|
|
|
|
if dep_callable is None:
|
|
continue
|
|
|
|
dep_name = None
|
|
dep_detail = None
|
|
|
|
# Direct function reference
|
|
if callable(dep_callable) and hasattr(dep_callable, "__name__"):
|
|
dep_name = dep_callable.__name__
|
|
|
|
# functools.partial (e.g. require_menu_access("id", FrontendType.ADMIN))
|
|
if hasattr(dep_callable, "func"):
|
|
dep_name = getattr(dep_callable.func, "__name__", str(dep_callable.func))
|
|
# Extract args for detail
|
|
if dep_callable.args:
|
|
dep_detail = str(dep_callable.args[0])
|
|
|
|
# Closure / nested function from factory
|
|
if dep_name is None and callable(dep_callable):
|
|
# Try qualname for closures from factories
|
|
qualname = getattr(dep_callable, "__qualname__", "")
|
|
if "." in qualname:
|
|
# e.g. "require_menu_access.<locals>.dependency"
|
|
factory_name = qualname.split(".")[0]
|
|
dep_name = factory_name
|
|
# Try to extract closure variables for detail
|
|
if hasattr(dep_callable, "__closure__") and dep_callable.__closure__:
|
|
for cell in dep_callable.__closure__:
|
|
try:
|
|
val = cell.cell_contents
|
|
if isinstance(val, str) and val not in ("admin", "store", "merchant"):
|
|
dep_detail = val
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
if dep_name:
|
|
results.append({"name": dep_name, "detail": dep_detail})
|
|
|
|
return results
|
|
|
|
|
|
def _classify_route(path: str, frontend: str | None, auth_deps: list[dict]) -> tuple[str, str | None, str | None, str | None]:
|
|
"""
|
|
Classify a route's auth status.
|
|
|
|
Returns: (status, auth_dependency, auth_detail, issue)
|
|
"""
|
|
if not auth_deps:
|
|
return "error", None, None, "No authentication dependency found"
|
|
|
|
# Check each dependency against classification rules
|
|
for dep in auth_deps:
|
|
name = dep["name"]
|
|
detail = dep["detail"]
|
|
|
|
if name in _OK_DEPENDENCIES:
|
|
return "ok", name, detail, None
|
|
|
|
if name in _ADMIN_AUTH_DEPENDENCIES and frontend == "admin":
|
|
return "ok", name, detail, None
|
|
|
|
# If we get here, check for auth-only deps (warnings)
|
|
for dep in auth_deps:
|
|
name = dep["name"]
|
|
detail = dep["detail"]
|
|
|
|
if name in _STORE_AUTH_DEPENDENCIES and frontend == "store":
|
|
return "warning", name, detail, "Authentication only \u2014 no menu access or permission check"
|
|
|
|
if name in _MERCHANT_AUTH_DEPENDENCIES and frontend == "merchant":
|
|
return "warning", name, detail, "Authentication only \u2014 no menu access check"
|
|
|
|
if name in _ADMIN_AUTH_DEPENDENCIES:
|
|
return "warning", name, detail, "Admin auth on non-admin route"
|
|
|
|
# Has some dependency but not recognized
|
|
first = auth_deps[0]
|
|
return "warning", first["name"], first["detail"], f"Unrecognized auth dependency: {first['name']}"
|
|
|
|
|
|
@router.get("/permissions-audit", response_model=PermissionsAuditResponse)
|
|
def permissions_audit(
|
|
request: Request,
|
|
current_user: UserContext = Depends(get_current_super_admin_api),
|
|
):
|
|
"""
|
|
Introspect all registered page routes and audit their auth/permission enforcement.
|
|
|
|
Examines each route's endpoint function signature to find Depends() parameters
|
|
and classify the level of authentication and authorization applied.
|
|
"""
|
|
audit_routes: list[RouteAuditEntry] = []
|
|
|
|
for route in request.app.routes:
|
|
if not isinstance(route, APIRoute):
|
|
continue
|
|
|
|
# Filter to page routes only (include_in_schema=False signals HTML page routes)
|
|
if route.include_in_schema:
|
|
continue
|
|
|
|
path = route.path
|
|
methods = sorted(route.methods - {"HEAD", "OPTIONS"}) if route.methods else []
|
|
frontend = _classify_frontend(path)
|
|
module = _infer_module(path)
|
|
endpoint_name = getattr(route.endpoint, "__name__", str(route.endpoint))
|
|
|
|
# Extract and classify auth dependencies
|
|
auth_deps = _extract_auth_dependencies(route.endpoint)
|
|
status, auth_dep, auth_detail, issue = _classify_route(path, frontend, auth_deps)
|
|
|
|
audit_routes.append(RouteAuditEntry(
|
|
path=path,
|
|
methods=methods,
|
|
module=module,
|
|
frontend=frontend,
|
|
endpoint_name=endpoint_name,
|
|
auth_dependency=auth_dep,
|
|
auth_detail=auth_detail,
|
|
status=status,
|
|
issue=issue,
|
|
))
|
|
|
|
# Sort by frontend, then path
|
|
audit_routes.sort(key=lambda r: (r.frontend or "", r.path))
|
|
|
|
# Build summary
|
|
summary = AuditSummary(
|
|
total=len(audit_routes),
|
|
ok=sum(1 for r in audit_routes if r.status == "ok"),
|
|
warnings=sum(1 for r in audit_routes if r.status == "warning"),
|
|
errors=sum(1 for r in audit_routes if r.status == "error"),
|
|
by_frontend={},
|
|
)
|
|
|
|
for r in audit_routes:
|
|
fe = r.frontend or "other"
|
|
if fe not in summary.by_frontend:
|
|
summary.by_frontend[fe] = FrontendSummary()
|
|
fe_summary = summary.by_frontend[fe]
|
|
fe_summary.total += 1
|
|
if r.status == "ok":
|
|
fe_summary.ok += 1
|
|
elif r.status == "warning":
|
|
fe_summary.warnings += 1
|
|
elif r.status == "error":
|
|
fe_summary.errors += 1
|
|
|
|
return PermissionsAuditResponse(routes=audit_routes, summary=summary)
|