# app/modules/dev_tools/services/isolation_audit_service.py """ Tenant Isolation Audit Service Scans all stores and reports where configuration values come from — flagging silent inheritance, missing data, and potential data commingling in the multi-tenant system. """ import logging from sqlalchemy.orm import Session, joinedload logger = logging.getLogger(__name__) def run_isolation_audit( db: Session, store_code: str | None = None, risk_filter: str | None = None, ) -> dict: """ Audit tenant isolation across all (or one) store(s). Args: db: Database session store_code: Optional filter to a single store risk_filter: Optional 'critical', 'high', or 'medium' Returns: dict with total_stores, stores_with_findings, summary, results """ from app.modules.billing.models.merchant_subscription import MerchantSubscription from app.modules.tenancy.models import Store from app.modules.tenancy.models.admin import AdminSetting from app.modules.tenancy.models.merchant_domain import MerchantDomain # ── Pre-fetch stores with eager-loaded relationships ── query = ( db.query(Store) .options( joinedload(Store.merchant), joinedload(Store.store_platforms), joinedload(Store.domains), joinedload(Store.store_theme), ) .filter(Store.is_active.is_(True)) ) if store_code: query = query.filter(Store.store_code == store_code) stores = query.all() # ── Batch-load MerchantSubscription indexed by (merchant_id, platform_id) ── all_subs = db.query(MerchantSubscription).all() subs_map: dict[tuple[int, int], MerchantSubscription] = {} for sub in all_subs: subs_map[(sub.merchant_id, sub.platform_id)] = sub # ── Batch-load MerchantDomain indexed by merchant_id ── all_merchant_domains = db.query(MerchantDomain).all() merchant_domains_map: dict[int, list[MerchantDomain]] = {} for md in all_merchant_domains: merchant_domains_map.setdefault(md.merchant_id, []).append(md) # ── Pre-fetch default_storefront_locale from AdminSetting ── default_locale_setting = ( db.query(AdminSetting) .filter(AdminSetting.key == "default_storefront_locale") .first() ) default_locale = default_locale_setting.value if default_locale_setting else None # ── Run checkers per store ── results = [] summary = {"critical": 0, "high": 0, "medium": 0} for store in stores: merchant_domains = merchant_domains_map.get(store.merchant_id, []) findings: list[dict] = [] findings.extend(_check_contact_inheritance(store)) findings.extend(_check_domain_resolution(store, merchant_domains)) findings.extend(_check_subscription_coverage(store, subs_map)) findings.extend(_check_merchant_active(store)) findings.extend(_check_merchant_domain_primary(merchant_domains)) findings.extend(_check_theme_fallback(store)) findings.extend(_check_locale_fallback(store, default_locale)) findings.extend(_check_language_config(store)) # Apply risk filter if risk_filter: findings = [f for f in findings if f["risk"] == risk_filter] # Count by risk finding_counts = {"critical": 0, "high": 0, "medium": 0} for f in findings: finding_counts[f["risk"]] = finding_counts.get(f["risk"], 0) + 1 summary[f["risk"]] = summary.get(f["risk"], 0) + 1 results.append({ "store_code": store.store_code, "store_name": store.name, "merchant_name": store.merchant.name if store.merchant else None, "finding_counts": finding_counts, "findings": findings, }) stores_with_findings = sum(1 for r in results if r["findings"]) return { "total_stores": len(results), "stores_with_findings": stores_with_findings, "summary": summary, "results": results, } # ── Checker functions ── def _finding( check: str, check_label: str, risk: str, resolved_value: str | None, source: str, source_label: str, is_explicit: bool, note: str = "", ) -> dict: return { "check": check, "check_label": check_label, "risk": risk, "resolved_value": resolved_value, "source": source, "source_label": source_label, "is_explicit": is_explicit, "note": note, } _CONTACT_FIELDS = [ ("contact_email", "Contact Email"), ("contact_phone", "Contact Phone"), ("website", "Website"), ("business_address", "Business Address"), ("tax_number", "Tax Number"), ] def _check_contact_inheritance(store) -> list[dict]: """Check 5 contact fields: store value → merchant fallback → none.""" findings = [] merchant = store.merchant for field, label in _CONTACT_FIELDS: store_val = getattr(store, field, None) merchant_val = getattr(merchant, field, None) if merchant else None if store_val: # Explicitly set on store — no finding continue if merchant_val: findings.append(_finding( check=field, check_label=label, risk="critical", resolved_value=str(merchant_val), source="merchant", source_label=f"Inherited from merchant '{merchant.name}'", is_explicit=False, note="Store has no own value; silently falls back to merchant", )) else: findings.append(_finding( check=field, check_label=label, risk="critical", resolved_value=None, source="none", source_label="No value anywhere", is_explicit=False, note="Neither store nor merchant has this value set", )) return findings def _check_domain_resolution(store, merchant_domains: list) -> list[dict]: """Check effective_domain: StoreDomain → MerchantDomain → subdomain fallback.""" findings = [] # Check if store has its own active domain store_domains = [d for d in (store.domains or []) if d.is_active] active_merchant_domains = [d for d in merchant_domains if d.is_active] if store_domains: # Store has its own domain — explicit, no finding return findings if active_merchant_domains: primary = next((d for d in active_merchant_domains if d.is_primary), None) domain_val = primary.domain if primary else active_merchant_domains[0].domain findings.append(_finding( check="effective_domain", check_label="Effective Domain", risk="critical", resolved_value=domain_val, source="merchant", source_label="Using merchant domain (shared with other stores)", is_explicit=False, note="No store-specific domain; merchant domain may serve wrong store", )) else: findings.append(_finding( check="effective_domain", check_label="Effective Domain", risk="critical", resolved_value=f"{store.subdomain}.* (subdomain fallback)", source="hardcoded", source_label="Subdomain fallback only", is_explicit=False, note="No custom domain at store or merchant level", )) return findings def _check_subscription_coverage(store, subs_map: dict) -> list[dict]: """For each StorePlatform membership, verify a MerchantSubscription exists and is active.""" findings = [] for sp in (store.store_platforms or []): if not sp.is_active: continue key = (store.merchant_id, sp.platform_id) sub = subs_map.get(key) if not sub: findings.append(_finding( check=f"subscription_{sp.platform_id}", check_label=f"Subscription (platform {sp.platform_id})", risk="critical", resolved_value=None, source="none", source_label="No subscription found", is_explicit=False, note=f"Store active on platform {sp.platform_id} but merchant has no subscription", )) elif not sub.is_active: findings.append(_finding( check=f"subscription_{sp.platform_id}", check_label=f"Subscription (platform {sp.platform_id})", risk="critical", resolved_value=sub.status, source="merchant", source_label=f"Subscription exists but status='{sub.status}'", is_explicit=False, note="Store active on platform but subscription is not active", )) return findings def _check_merchant_active(store) -> list[dict]: """Merchant inactive but store still active.""" merchant = store.merchant if not merchant or merchant.is_active: return [] return [_finding( check="merchant_active", check_label="Merchant Active", risk="high", resolved_value=str(merchant.is_active), source="merchant", source_label=f"Merchant '{merchant.name}' is inactive", is_explicit=True, note="Store is active but its parent merchant is inactive", )] def _check_merchant_domain_primary(merchant_domains: list) -> list[dict]: """Multiple MerchantDomains marked primary — non-deterministic resolution.""" primary_domains = [d for d in merchant_domains if d.is_primary and d.is_active] if len(primary_domains) <= 1: return [] domain_list = ", ".join(d.domain for d in primary_domains) return [_finding( check="merchant_domain_primary", check_label="Merchant Primary Domain", risk="high", resolved_value=domain_list, source="merchant", source_label=f"{len(primary_domains)} domains marked as primary", is_explicit=True, note="Multiple primary domains cause non-deterministic resolution", )] def _check_theme_fallback(store) -> list[dict]: """No active store_theme — falls back to hardcoded default.""" theme = store.store_theme if theme and theme.is_active: return [] return [_finding( check="store_theme", check_label="Store Theme", risk="medium", resolved_value="default (hardcoded)", source="hardcoded", source_label="No active theme configured", is_explicit=False, note="Falls back to hardcoded default theme" if not theme else "Theme exists but is inactive", )] def _check_locale_fallback(store, default_locale: str | None) -> list[dict]: """storefront_locale is NULL — falls back to platform default or 'fr-LU'.""" if store.storefront_locale: return [] fallback = default_locale or "fr-LU" source = "platform_default" if default_locale else "hardcoded" source_label = ( f"Platform default: '{default_locale}'" if default_locale else "Hardcoded fallback 'fr-LU'" ) return [_finding( check="storefront_locale", check_label="Storefront Locale", risk="medium", resolved_value=fallback, source=source, source_label=source_label, is_explicit=False, note="storefront_locale is NULL; using fallback", )] _DEFAULT_LANGUAGES = ["fr", "de", "en", "lb"] def _check_language_config(store) -> list[dict]: """storefront_languages matches column default — may never have been explicitly set.""" languages = store.storefront_languages if languages and languages != _DEFAULT_LANGUAGES: return [] return [_finding( check="storefront_languages", check_label="Storefront Languages", risk="medium", resolved_value=str(languages or _DEFAULT_LANGUAGES), source="hardcoded", source_label="Column default value", is_explicit=False, note="Matches column default — may never have been explicitly configured", )]