Files
orion/app/modules/dev_tools/services/isolation_audit_service.py
Samir Boulahtit 07fab01f6a feat(dev_tools): add tenant isolation audit to diagnostics page
Add a new "Tenant Isolation" diagnostic tool that scans all stores and
reports where configuration values come from — flagging silent inheritance,
missing data, and potential data commingling. Also fix merchant dashboard
and onboarding integration tests that were missing require_platform
dependency override.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 16:53:21 +01:00

370 lines
12 KiB
Python

# app/modules/dev_tools/services/isolation_audit_service.py
"""
Tenant Isolation Audit Service
Scans all stores and reports where configuration values come from —
flagging silent inheritance, missing data, and potential data commingling
in the multi-tenant system.
"""
import logging
from sqlalchemy.orm import Session, joinedload
logger = logging.getLogger(__name__)
def run_isolation_audit(
db: Session,
store_code: str | None = None,
risk_filter: str | None = None,
) -> dict:
"""
Audit tenant isolation across all (or one) store(s).
Args:
db: Database session
store_code: Optional filter to a single store
risk_filter: Optional 'critical', 'high', or 'medium'
Returns:
dict with total_stores, stores_with_findings, summary, results
"""
from app.modules.billing.models.merchant_subscription import MerchantSubscription
from app.modules.tenancy.models import Store
from app.modules.tenancy.models.admin import AdminSetting
from app.modules.tenancy.models.merchant_domain import MerchantDomain
# ── Pre-fetch stores with eager-loaded relationships ──
query = (
db.query(Store)
.options(
joinedload(Store.merchant),
joinedload(Store.store_platforms),
joinedload(Store.domains),
joinedload(Store.store_theme),
)
.filter(Store.is_active.is_(True))
)
if store_code:
query = query.filter(Store.store_code == store_code)
stores = query.all()
# ── Batch-load MerchantSubscription indexed by (merchant_id, platform_id) ──
all_subs = db.query(MerchantSubscription).all()
subs_map: dict[tuple[int, int], MerchantSubscription] = {}
for sub in all_subs:
subs_map[(sub.merchant_id, sub.platform_id)] = sub
# ── Batch-load MerchantDomain indexed by merchant_id ──
all_merchant_domains = db.query(MerchantDomain).all()
merchant_domains_map: dict[int, list[MerchantDomain]] = {}
for md in all_merchant_domains:
merchant_domains_map.setdefault(md.merchant_id, []).append(md)
# ── Pre-fetch default_storefront_locale from AdminSetting ──
default_locale_setting = (
db.query(AdminSetting)
.filter(AdminSetting.key == "default_storefront_locale")
.first()
)
default_locale = default_locale_setting.value if default_locale_setting else None
# ── Run checkers per store ──
results = []
summary = {"critical": 0, "high": 0, "medium": 0}
for store in stores:
merchant_domains = merchant_domains_map.get(store.merchant_id, [])
findings: list[dict] = []
findings.extend(_check_contact_inheritance(store))
findings.extend(_check_domain_resolution(store, merchant_domains))
findings.extend(_check_subscription_coverage(store, subs_map))
findings.extend(_check_merchant_active(store))
findings.extend(_check_merchant_domain_primary(merchant_domains))
findings.extend(_check_theme_fallback(store))
findings.extend(_check_locale_fallback(store, default_locale))
findings.extend(_check_language_config(store))
# Apply risk filter
if risk_filter:
findings = [f for f in findings if f["risk"] == risk_filter]
# Count by risk
finding_counts = {"critical": 0, "high": 0, "medium": 0}
for f in findings:
finding_counts[f["risk"]] = finding_counts.get(f["risk"], 0) + 1
summary[f["risk"]] = summary.get(f["risk"], 0) + 1
results.append({
"store_code": store.store_code,
"store_name": store.name,
"merchant_name": store.merchant.name if store.merchant else None,
"finding_counts": finding_counts,
"findings": findings,
})
stores_with_findings = sum(1 for r in results if r["findings"])
return {
"total_stores": len(results),
"stores_with_findings": stores_with_findings,
"summary": summary,
"results": results,
}
# ── Checker functions ──
def _finding(
check: str,
check_label: str,
risk: str,
resolved_value: str | None,
source: str,
source_label: str,
is_explicit: bool,
note: str = "",
) -> dict:
return {
"check": check,
"check_label": check_label,
"risk": risk,
"resolved_value": resolved_value,
"source": source,
"source_label": source_label,
"is_explicit": is_explicit,
"note": note,
}
_CONTACT_FIELDS = [
("contact_email", "Contact Email"),
("contact_phone", "Contact Phone"),
("website", "Website"),
("business_address", "Business Address"),
("tax_number", "Tax Number"),
]
def _check_contact_inheritance(store) -> list[dict]:
"""Check 5 contact fields: store value → merchant fallback → none."""
findings = []
merchant = store.merchant
for field, label in _CONTACT_FIELDS:
store_val = getattr(store, field, None)
merchant_val = getattr(merchant, field, None) if merchant else None
if store_val:
# Explicitly set on store — no finding
continue
if merchant_val:
findings.append(_finding(
check=field,
check_label=label,
risk="critical",
resolved_value=str(merchant_val),
source="merchant",
source_label=f"Inherited from merchant '{merchant.name}'",
is_explicit=False,
note="Store has no own value; silently falls back to merchant",
))
else:
findings.append(_finding(
check=field,
check_label=label,
risk="critical",
resolved_value=None,
source="none",
source_label="No value anywhere",
is_explicit=False,
note="Neither store nor merchant has this value set",
))
return findings
def _check_domain_resolution(store, merchant_domains: list) -> list[dict]:
"""Check effective_domain: StoreDomain → MerchantDomain → subdomain fallback."""
findings = []
# Check if store has its own active domain
store_domains = [d for d in (store.domains or []) if d.is_active]
active_merchant_domains = [d for d in merchant_domains if d.is_active]
if store_domains:
# Store has its own domain — explicit, no finding
return findings
if active_merchant_domains:
primary = next((d for d in active_merchant_domains if d.is_primary), None)
domain_val = primary.domain if primary else active_merchant_domains[0].domain
findings.append(_finding(
check="effective_domain",
check_label="Effective Domain",
risk="critical",
resolved_value=domain_val,
source="merchant",
source_label="Using merchant domain (shared with other stores)",
is_explicit=False,
note="No store-specific domain; merchant domain may serve wrong store",
))
else:
findings.append(_finding(
check="effective_domain",
check_label="Effective Domain",
risk="critical",
resolved_value=f"{store.subdomain}.* (subdomain fallback)",
source="hardcoded",
source_label="Subdomain fallback only",
is_explicit=False,
note="No custom domain at store or merchant level",
))
return findings
def _check_subscription_coverage(store, subs_map: dict) -> list[dict]:
"""For each StorePlatform membership, verify a MerchantSubscription exists and is active."""
findings = []
for sp in (store.store_platforms or []):
if not sp.is_active:
continue
key = (store.merchant_id, sp.platform_id)
sub = subs_map.get(key)
if not sub:
findings.append(_finding(
check=f"subscription_{sp.platform_id}",
check_label=f"Subscription (platform {sp.platform_id})",
risk="critical",
resolved_value=None,
source="none",
source_label="No subscription found",
is_explicit=False,
note=f"Store active on platform {sp.platform_id} but merchant has no subscription",
))
elif not sub.is_active:
findings.append(_finding(
check=f"subscription_{sp.platform_id}",
check_label=f"Subscription (platform {sp.platform_id})",
risk="critical",
resolved_value=sub.status,
source="merchant",
source_label=f"Subscription exists but status='{sub.status}'",
is_explicit=False,
note="Store active on platform but subscription is not active",
))
return findings
def _check_merchant_active(store) -> list[dict]:
"""Merchant inactive but store still active."""
merchant = store.merchant
if not merchant or merchant.is_active:
return []
return [_finding(
check="merchant_active",
check_label="Merchant Active",
risk="high",
resolved_value=str(merchant.is_active),
source="merchant",
source_label=f"Merchant '{merchant.name}' is inactive",
is_explicit=True,
note="Store is active but its parent merchant is inactive",
)]
def _check_merchant_domain_primary(merchant_domains: list) -> list[dict]:
"""Multiple MerchantDomains marked primary — non-deterministic resolution."""
primary_domains = [d for d in merchant_domains if d.is_primary and d.is_active]
if len(primary_domains) <= 1:
return []
domain_list = ", ".join(d.domain for d in primary_domains)
return [_finding(
check="merchant_domain_primary",
check_label="Merchant Primary Domain",
risk="high",
resolved_value=domain_list,
source="merchant",
source_label=f"{len(primary_domains)} domains marked as primary",
is_explicit=True,
note="Multiple primary domains cause non-deterministic resolution",
)]
def _check_theme_fallback(store) -> list[dict]:
"""No active store_theme — falls back to hardcoded default."""
theme = store.store_theme
if theme and theme.is_active:
return []
return [_finding(
check="store_theme",
check_label="Store Theme",
risk="medium",
resolved_value="default (hardcoded)",
source="hardcoded",
source_label="No active theme configured",
is_explicit=False,
note="Falls back to hardcoded default theme" if not theme else "Theme exists but is inactive",
)]
def _check_locale_fallback(store, default_locale: str | None) -> list[dict]:
"""storefront_locale is NULL — falls back to platform default or 'fr-LU'."""
if store.storefront_locale:
return []
fallback = default_locale or "fr-LU"
source = "platform_default" if default_locale else "hardcoded"
source_label = (
f"Platform default: '{default_locale}'"
if default_locale
else "Hardcoded fallback 'fr-LU'"
)
return [_finding(
check="storefront_locale",
check_label="Storefront Locale",
risk="medium",
resolved_value=fallback,
source=source,
source_label=source_label,
is_explicit=False,
note="storefront_locale is NULL; using fallback",
)]
_DEFAULT_LANGUAGES = ["fr", "de", "en", "lb"]
def _check_language_config(store) -> list[dict]:
"""storefront_languages matches column default — may never have been explicitly set."""
languages = store.storefront_languages
if languages and languages != _DEFAULT_LANGUAGES:
return []
return [_finding(
check="storefront_languages",
check_label="Storefront Languages",
risk="medium",
resolved_value=str(languages or _DEFAULT_LANGUAGES),
source="hardcoded",
source_label="Column default value",
is_explicit=False,
note="Matches column default — may never have been explicitly configured",
)]