Files
orion/middleware/store_context.py
Samir Boulahtit 540205402f
Some checks failed
CI / pytest (push) Waiting to run
CI / ruff (push) Successful in 12s
CI / validate (push) Successful in 26s
CI / dependency-scanning (push) Successful in 31s
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
feat(middleware): harden routing with fail-closed policy, custom subdomain management, and perf fixes
- Fix IPv6 host parsing with _strip_port() utility
- Remove dangerous StorePlatform→Store.subdomain silent fallback
- Close storefront gate bypass when frontend_type is None
- Add custom subdomain management UI and API for stores
- Add domain health diagnostic tool
- Convert db.add() in loops to db.add_all() (24 PERF-006 fixes)
- Add tests for all new functionality (18 subdomain service tests)
- Add .github templates for validator compliance

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 18:13:01 +01:00

699 lines
28 KiB
Python

# middleware/store_context.py
"""
Store Context Middleware (Class-Based)
Detects store from host/domain/path and injects into request.state.
Handles three routing modes:
1. Custom domains (customdomain1.com → Store 1)
2. Subdomains (store1.platform.com → Store 1)
3. Path-based (/store/store1/, /stores/store1/, or /storefront/store1/ → Store 1)
Also extracts clean_path for nested routing patterns.
IMPORTANT: This middleware runs AFTER PlatformContextMiddleware.
Uses request.state.platform_clean_path when available (set by PlatformContextMiddleware).
"""
import logging
from fastapi import Request
from sqlalchemy import func
from sqlalchemy.orm import Session
from starlette.middleware.base import BaseHTTPMiddleware
from app.core.config import settings
from app.core.database import get_db
from app.core.frontend_detector import FrontendDetector
from app.modules.tenancy.models import Store, StoreDomain
logger = logging.getLogger(__name__)
class StoreContextManager:
"""Manages store context detection for multi-tenant routing."""
@staticmethod
def detect_store_context(request: Request) -> dict | None:
"""
Detect store context from request.
Thin wrapper around _detect_store_from_host_and_path() that extracts
host, path, and platform from the request object.
Returns dict with store info or None if not found.
"""
host = request.headers.get("host", "")
# Use platform_clean_path if available (set by PlatformContextMiddleware)
path = getattr(request.state, "platform_clean_path", None) or request.url.path
platform = getattr(request.state, "platform", None)
return StoreContextManager._detect_store_from_host_and_path(host, path, platform)
@staticmethod
def _detect_store_from_host_and_path(host: str, path: str, platform=None) -> dict | None:
"""
Core store detection logic from host and path.
Priority order:
1. Custom domain (customdomain1.com) — skipped on platform domains
2. Subdomain (store1.platform.com) — skipped on platform domains
3. Path-based (/store/store1/ or /stores/store1/) — always runs as fallback
Args:
host: The request host header (may include port)
path: The clean path (platform prefix already stripped)
platform: Optional platform object from middleware state
Returns dict with store info or None if not found.
"""
original_host = host
# Remove port from host if present (e.g., localhost:8000 -> localhost)
if ":" in host:
host = host.split(":")[0]
# Determine if host is the platform's own domain or a subdomain of it
# (e.g. rewardflow.lu is the loyalty platform, not a store)
# (e.g. acme.rewardflow.lu is a subdomain OF the platform, not a custom domain)
platform_own_domain = getattr(platform, "domain", None) if platform else None
is_platform_domain = (
platform_own_domain and host == platform_own_domain
)
is_subdomain_of_platform = (
platform_own_domain
and host != platform_own_domain
and host.endswith(f".{platform_own_domain}")
)
# Method 1: Custom domain detection (HIGHEST PRIORITY)
# Skip if host is a platform domain or a subdomain of one
if not is_platform_domain and not is_subdomain_of_platform:
main_domain = getattr(settings, "main_domain", "platform.com")
is_custom_domain = (
host
and not host.endswith(f".{main_domain}")
and host != main_domain
and host
not in ["localhost", "127.0.0.1", "admin.localhost", "admin.127.0.0.1"]
and not host.startswith("admin.")
)
if is_custom_domain:
normalized_domain = StoreDomain.normalize_domain(host)
return {
"domain": normalized_domain,
"detection_method": "custom_domain",
"host": host,
"original_host": original_host,
}
# Method 2: Subdomain detection (acme.rewardflow.lu → "acme")
# Runs for subdomains of the platform domain, skipped for exact platform domain
if not is_platform_domain and "." in host:
parts = host.split(".")
# Check if it's a valid subdomain (not www, admin, api)
if len(parts) >= 2 and parts[0] not in ["www", "admin", "api"]:
subdomain = parts[0]
return {
"subdomain": subdomain,
"detection_method": "subdomain",
"host": host,
}
# Method 3: Path-based detection (/store/storename/, /stores/storename/, /storefront/storename/)
if path.startswith(("/store/", "/stores/", "/storefront/")):
# Determine which pattern
if path.startswith("/storefront/"):
prefix_len = len("/storefront/")
elif path.startswith("/stores/"):
prefix_len = len("/stores/")
else:
prefix_len = len("/store/")
path_parts = path[prefix_len:].split("/")
if len(path_parts) >= 1 and path_parts[0]:
store_code = path_parts[0]
return {
"subdomain": store_code,
"detection_method": "path",
"path_prefix": path[: prefix_len + len(store_code)],
"full_prefix": path[:prefix_len], # /store/, /stores/, or /storefront/
"host": host,
}
return None
@staticmethod
def get_store_from_context(db: Session, context: dict) -> Store | None:
"""
Get store from database using context information.
Supports three methods:
1. Custom domain lookup (StoreDomain table)
2. Subdomain lookup (Store.subdomain)
3. Path-based lookup (Store.subdomain)
"""
if not context:
return None
store = None
# Method 1: Custom domain lookup
if context.get("detection_method") == "custom_domain":
domain = context.get("domain")
if domain:
store_domain = (
db.query(StoreDomain)
.filter(StoreDomain.domain == domain)
.filter(StoreDomain.is_active.is_(True))
.filter(StoreDomain.is_verified.is_(True))
.first()
)
if store_domain:
store = store_domain.store
if not store or not store.is_active:
logger.warning(f"Store for domain {domain} is not active")
return None
logger.info(
f"[OK] Store found via custom domain: {domain}{store.name}"
)
return store
# Fallback: Try merchant-level domain
from app.modules.tenancy.models.merchant_domain import MerchantDomain
merchant_domain = (
db.query(MerchantDomain)
.filter(
MerchantDomain.domain == domain,
MerchantDomain.is_active.is_(True),
MerchantDomain.is_verified.is_(True),
)
.first()
)
if merchant_domain:
store = (
db.query(Store)
.filter(
Store.merchant_id == merchant_domain.merchant_id,
Store.is_active.is_(True),
)
.order_by(Store.id)
.first()
)
if store:
context["merchant_domain"] = True
context["merchant_id"] = merchant_domain.merchant_id
logger.info(
f"[OK] Store found via merchant domain: {domain}{store.name}"
)
return store
logger.warning(f"No active store found for custom domain: {domain}")
return None
# Method 2 & 3: Subdomain or path-based lookup
if "subdomain" in context:
subdomain = context["subdomain"]
# 2a. Check StorePlatform.custom_subdomain (platform-specific override)
# e.g. acme-rewards.rewardflow.lu → StorePlatform with custom_subdomain="acme-rewards"
platform = context.get("_platform")
if platform and context.get("detection_method") == "subdomain":
from app.modules.tenancy.models.store_platform import StorePlatform
store_platform = (
db.query(StorePlatform)
.filter(
func.lower(StorePlatform.custom_subdomain) == subdomain.lower(),
StorePlatform.platform_id == platform.id,
StorePlatform.is_active.is_(True),
)
.first()
)
if store_platform:
store = (
db.query(Store)
.filter(Store.id == store_platform.store_id, Store.is_active.is_(True))
.first()
)
if store:
logger.info(
f"[OK] Store found via custom_subdomain: {subdomain}{store.name} (platform={platform.code})"
)
return store
# 2b. Fallback to Store.subdomain with platform membership check
store = (
db.query(Store)
.filter(func.lower(Store.subdomain) == subdomain.lower())
.filter(Store.is_active.is_(True))
.first()
)
if store:
# When a platform context exists and detection is "subdomain",
# verify the store actually has an active membership on this
# platform. Without this check, a subdomain like
# "other-tenant.omsflow.lu" could resolve a store that only
# belongs to the loyalty platform — a cross-tenant leak.
if platform and context.get("detection_method") == "subdomain":
from app.modules.tenancy.models.store_platform import (
StorePlatform as SP,
)
has_membership = (
db.query(SP)
.filter(
SP.store_id == store.id,
SP.platform_id == platform.id,
SP.is_active.is_(True),
)
.first()
)
if not has_membership:
logger.warning(
f"[FAIL-CLOSED] Store '{subdomain}' exists but has no "
f"active membership on platform {platform.code}"
f"blocking cross-tenant resolution"
)
return None
method = context.get("detection_method", "unknown")
logger.info(
f"[OK] Store found via {method}: {subdomain}{store.name}"
)
else:
logger.warning(f"No active store found for subdomain: {subdomain}")
return store
@staticmethod
def extract_clean_path(request: Request, store_context: dict | None) -> str:
"""
Extract clean path without store prefix for routing.
Supports both /store/ and /stores/ prefixes.
"""
if not store_context:
return request.url.path
# Only strip path prefix for path-based detection
if store_context.get("detection_method") == "path":
path = request.url.path
path_prefix = store_context.get("path_prefix", "")
if path.startswith(path_prefix):
clean_path = path[len(path_prefix) :]
return clean_path if clean_path else "/"
return request.url.path
@staticmethod
def is_api_request(request: Request) -> bool:
"""Check if request is for API endpoints."""
return FrontendDetector.is_api_request(request.url.path)
@staticmethod
def extract_store_from_referer(request: Request) -> dict | None:
"""
Extract store context from Referer header.
Used for storefront API requests where store context comes from the page
that made the API call (e.g., JavaScript on /storefront/FASHIONHUB/loyalty/join
calling /api/v1/storefront/loyalty/program).
Platform-aware: uses request.state.platform.domain (e.g. rewardflow.lu)
for subdomain detection, not just settings.main_domain (wizard.lu).
Extracts store from Referer URL patterns:
- localhost:8000/platforms/loyalty/storefront/FASHIONHUB/... → FASHIONHUB (dev path)
- rewardflow.lu/storefront/FASHIONHUB/... → FASHIONHUB (prod platform domain path)
- fashionhub.rewardflow.lu/... → fashionhub (prod platform subdomain)
- custom-domain.com/... → custom-domain.com (prod custom domain)
Returns store context dict or None if unable to extract.
"""
referer = request.headers.get("referer") or request.headers.get("origin")
if not referer:
logger.debug("[STORE] No Referer/Origin header for storefront API request")
return None
try:
from urllib.parse import urlparse
parsed = urlparse(referer)
referer_host = parsed.hostname or ""
referer_path = parsed.path or ""
# Remove port from host
if ":" in referer_host:
referer_host = referer_host.split(":")[0]
logger.debug(
"[STORE] Extracting store from Referer",
extra={
"referer": referer,
"referer_host": referer_host,
"referer_path": referer_path,
},
)
# Determine platform domain for platform-aware detection
# (e.g. rewardflow.lu from the loyalty platform object)
platform = getattr(request.state, "platform", None)
platform_own_domain = getattr(platform, "domain", None) if platform else None
is_platform_domain = (
platform_own_domain and referer_host == platform_own_domain
)
is_subdomain_of_platform = (
platform_own_domain
and referer_host != platform_own_domain
and referer_host.endswith(f".{platform_own_domain}")
)
# Method 1: Path-based detection from referer path
# Works on localhost (dev) AND on platform domains (prod path-based routing)
# /platforms/oms/storefront/WIZATECH/products → WIZATECH (dev)
# /storefront/FASHIONHUB/loyalty/join → FASHIONHUB (prod platform domain)
# Note: For subdomain hosts, path segments after /storefront/ are pages, not store codes
is_local_referer = referer_host in ("localhost", "127.0.0.1", "testserver")
use_path_detection = is_local_referer or is_platform_domain
if use_path_detection and referer_path.startswith("/platforms/"):
# Strip /platforms/{code}/ to get clean path
after_platforms = referer_path[11:] # Remove "/platforms/"
parts = after_platforms.split("/", 1)
referer_path = "/" + parts[1] if len(parts) > 1 and parts[1] else "/"
if use_path_detection and referer_path.startswith(("/stores/", "/store/", "/storefront/")):
if referer_path.startswith("/storefront/"):
prefix = "/storefront/"
elif referer_path.startswith("/stores/"):
prefix = "/stores/"
else:
prefix = "/store/"
path_parts = referer_path[len(prefix):].split("/")
if len(path_parts) >= 1 and path_parts[0]:
store_code = path_parts[0]
prefix_len = len(prefix)
logger.debug(
f"[STORE] Extracted store from Referer path: {store_code}",
extra={"store_code": store_code, "method": "referer_path"},
)
return {
"subdomain": store_code,
"detection_method": "path",
"path_prefix": referer_path[
: prefix_len + len(store_code)
],
"full_prefix": prefix,
"host": referer_host,
"referer": referer,
}
# Method 2: Subdomain detection from referer host
# fashionhub.rewardflow.lu → fashionhub (platform subdomain)
# store1.wizard.lu → store1 (main domain subdomain)
if is_subdomain_of_platform:
subdomain = referer_host.split(".")[0]
logger.debug(
f"[STORE] Extracted store from Referer platform subdomain: {subdomain}",
extra={
"subdomain": subdomain,
"method": "referer_platform_subdomain",
"platform_domain": platform_own_domain,
},
)
return {
"subdomain": subdomain,
"detection_method": "referer_subdomain",
"host": referer_host,
"referer": referer,
}
main_domain = getattr(settings, "main_domain", "platform.com")
if "." in referer_host:
parts = referer_host.split(".")
if len(parts) >= 2 and parts[0] not in ["www", "admin", "api"]:
# Check if it's a subdomain of main domain
if referer_host.endswith(f".{main_domain}"):
subdomain = parts[0]
logger.debug(
f"[STORE] Extracted store from Referer subdomain: {subdomain}",
extra={
"subdomain": subdomain,
"method": "referer_subdomain",
},
)
return {
"subdomain": subdomain,
"detection_method": "referer_subdomain",
"host": referer_host,
"referer": referer,
}
# Method 3: Custom domain detection from referer host
# custom-shop.com → custom-shop.com
is_custom_domain = (
referer_host
and not is_platform_domain
and not is_subdomain_of_platform
and not referer_host.endswith(f".{main_domain}")
and referer_host != main_domain
and referer_host not in ["localhost", "127.0.0.1"]
and not referer_host.startswith("admin.")
)
if is_custom_domain:
from app.modules.tenancy.models import StoreDomain
normalized_domain = StoreDomain.normalize_domain(referer_host)
logger.debug(
f"[STORE] Extracted store from Referer custom domain: {normalized_domain}",
extra={
"domain": normalized_domain,
"method": "referer_custom_domain",
},
)
return {
"domain": normalized_domain,
"detection_method": "referer_custom_domain",
"host": referer_host,
"referer": referer,
}
except Exception as e:
logger.warning(
f"[STORE] Failed to extract store from Referer: {e}",
extra={"referer": referer, "error": str(e)},
)
return None
@staticmethod
def is_static_file_request(request: Request) -> bool:
"""Check if request is for static files."""
path = request.url.path.lower()
static_extensions = (
".ico",
".css",
".js",
".png",
".jpg",
".jpeg",
".gif",
".svg",
".woff",
".woff2",
".ttf",
".eot",
".webp",
".map",
".json",
".xml",
".txt",
".pdf",
".webmanifest",
)
static_paths = ("/static/", "/media/", "/assets/", "/.well-known/")
if path.endswith(static_extensions):
return True
if any(path.startswith(static_path) for static_path in static_paths):
return True
return "favicon.ico" in path
class StoreContextMiddleware(BaseHTTPMiddleware):
"""
Middleware to inject store context into request state.
Class-based middleware provides:
- Better state management
- Easier testing
- More organized code
- Standard ASGI pattern
Runs AFTER PlatformContextMiddleware in the request chain.
Uses request.state.platform_clean_path for path-based store detection.
Sets:
request.state.store: Store object
request.state.store_context: Detection metadata
request.state.clean_path: Path without store prefix
"""
async def dispatch(self, request: Request, call_next):
"""
Detect and inject store context.
"""
# Skip store detection for admin, static files, and system requests
if (
FrontendDetector.is_admin(request.headers.get("host", ""), request.url.path)
or StoreContextManager.is_static_file_request(request)
or request.url.path in ["/", "/health", "/docs", "/redoc", "/openapi.json"]
):
logger.debug(
f"[STORE] Skipping store detection: {request.url.path}",
extra={"path": request.url.path, "reason": "admin/static/system"},
)
request.state.store = None
request.state.store_context = None
request.state.clean_path = request.url.path
return await call_next(request)
# For API routes: skip most, but handle storefront API via Referer
if StoreContextManager.is_api_request(request):
# Storefront API requests need store context from the Referer header
# (the page URL contains the store code, e.g. /storefront/FASHIONHUB/...)
if request.url.path.startswith("/api/v1/storefront/"):
referer_context = StoreContextManager.extract_store_from_referer(request)
if referer_context:
db_gen = get_db()
db = next(db_gen)
try:
store = StoreContextManager.get_store_from_context(db, referer_context)
request.state.store = store
request.state.store_context = referer_context
request.state.clean_path = request.url.path
if store:
logger.debug(
"[STORE] Store detected for storefront API via Referer",
extra={
"store_id": store.id,
"store_name": store.name,
"path": request.url.path,
},
)
finally:
db.close()
return await call_next(request)
logger.debug(
f"[STORE] No Referer store context for storefront API: {request.url.path}",
extra={"path": request.url.path},
)
request.state.store = None
request.state.store_context = None
request.state.clean_path = request.url.path
return await call_next(request)
# Non-storefront API routes: skip store detection
logger.debug(
f"[STORE] Skipping store detection for non-storefront API: {request.url.path}",
extra={"path": request.url.path, "reason": "api"},
)
request.state.store = None
request.state.store_context = None
request.state.clean_path = request.url.path
return await call_next(request)
# Detect store context
store_context = StoreContextManager.detect_store_context(request)
if store_context:
# Pass platform from middleware state so subdomain lookup can check
# StorePlatform.custom_subdomain for platform-specific overrides
platform = getattr(request.state, "platform", None)
if platform:
store_context["_platform"] = platform
db_gen = get_db()
db = next(db_gen)
try:
store = StoreContextManager.get_store_from_context(
db, store_context
)
if store:
request.state.store = store
request.state.store_context = store_context
request.state.clean_path = StoreContextManager.extract_clean_path(
request, store_context
)
logger.debug(
"[STORE_CONTEXT] Store detected",
extra={
"store_id": store.id,
"store_name": store.name,
"store_subdomain": store.subdomain,
"detection_method": store_context.get("detection_method"),
"original_path": request.url.path,
"clean_path": request.state.clean_path,
},
)
else:
logger.warning(
"[WARNING] Store context detected but store not found",
extra={
"context": store_context,
"detection_method": store_context.get("detection_method"),
},
)
request.state.store = None
request.state.store_context = store_context
request.state.clean_path = request.url.path
finally:
db.close()
else:
logger.debug(
"[STORE] No store context detected",
extra={
"path": request.url.path,
"host": request.headers.get("host", ""),
},
)
request.state.store = None
request.state.store_context = None
request.state.clean_path = request.url.path
# Continue to next middleware
return await call_next(request)
def get_current_store(request: Request) -> Store | None:
"""Helper function to get current store from request state."""
return getattr(request.state, "store", None)
def require_store_context():
"""Dependency to require store context in endpoints."""
def dependency(request: Request):
store = get_current_store(request)
if not store:
from app.modules.tenancy.exceptions import StoreNotFoundException
raise StoreNotFoundException("unknown", identifier_type="context")
return store
return dependency