feat(middleware): harden routing with fail-closed policy, custom subdomain management, and perf fixes
Some checks failed
CI / pytest (push) Waiting to run
CI / ruff (push) Successful in 12s
CI / validate (push) Successful in 26s
CI / dependency-scanning (push) Successful in 31s
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled

- Fix IPv6 host parsing with _strip_port() utility
- Remove dangerous StorePlatform→Store.subdomain silent fallback
- Close storefront gate bypass when frontend_type is None
- Add custom subdomain management UI and API for stores
- Add domain health diagnostic tool
- Convert db.add() in loops to db.add_all() (24 PERF-006 fixes)
- Add tests for all new functionality (18 subdomain service tests)
- Add .github templates for validator compliance

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-15 18:13:01 +01:00
parent 07fab01f6a
commit 540205402f
38 changed files with 1827 additions and 134 deletions

19
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View File

@@ -0,0 +1,19 @@
## Summary
<!-- Brief description of what this PR does -->
## Changes
-
## Test plan
- [ ] Unit tests pass (`python -m pytest tests/unit/`)
- [ ] Integration tests pass (`python -m pytest tests/integration/`)
- [ ] Architecture validation passes (`python scripts/validate/validate_all.py`)
## Checklist
- [ ] Code follows project conventions
- [ ] No new warnings introduced
- [ ] Database migrations included (if applicable)

9
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,9 @@
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
labels:
- "dependencies"

View File

@@ -85,8 +85,9 @@ class ErrorPageRenderer:
Returns:
HTMLResponse with rendered error page
"""
# Get frontend type
frontend_type = get_frontend_type(request)
# Get frontend type — default to PLATFORM in error rendering context
# (errors can occur before FrontendTypeMiddleware runs)
frontend_type = get_frontend_type(request) or FrontendType.PLATFORM
# Prepare template data
template_data = ErrorPageRenderer._prepare_template_data(
@@ -291,7 +292,7 @@ class ErrorPageRenderer:
# TODO: Implement actual admin check based on JWT/session
# For now, check if we're in admin frontend
frontend_type = get_frontend_type(request)
return frontend_type == FrontendType.ADMIN
return frontend_type is not None and frontend_type == FrontendType.ADMIN
@staticmethod
def _render_basic_html_fallback(

View File

@@ -388,7 +388,7 @@ def _redirect_to_login(request: Request) -> RedirectResponse:
Uses FrontendType detection to determine admin vs store vs storefront login.
Properly handles multi-access routing (domain, subdomain, path-based).
"""
frontend_type = get_frontend_type(request)
frontend_type = get_frontend_type(request) or FrontendType.PLATFORM
if frontend_type == FrontendType.ADMIN:
logger.debug("Redirecting to /admin/login")

View File

@@ -114,8 +114,7 @@ def ft_tier_with_features(db, ft_tier):
TierFeatureLimit(tier_id=ft_tier.id, feature_code="basic_shop", limit_value=None),
TierFeatureLimit(tier_id=ft_tier.id, feature_code="team_members", limit_value=5),
]
for f in features:
db.add(f)
db.add_all(features)
db.commit()
# Refresh so the tier's selectin-loaded feature_limits relationship is up to date
db.refresh(ft_tier)

View File

@@ -83,13 +83,12 @@ def billing_extra_platforms(db):
"""Create additional platforms for multiple subscriptions (unique constraint: merchant+platform)."""
platforms = []
for i in range(2):
p = Platform(
platforms.append(Platform(
code=f"bm_extra_{uuid.uuid4().hex[:8]}",
name=f"Extra Platform {i}",
is_active=True,
)
db.add(p)
platforms.append(p)
))
db.add_all(platforms)
db.commit()
for p in platforms:
db.refresh(p)

View File

@@ -90,8 +90,7 @@ def fs_tier_with_features(db, fs_tier):
TierFeatureLimit(tier_id=fs_tier.id, feature_code="feature_b", limit_value=100),
TierFeatureLimit(tier_id=fs_tier.id, feature_code="feature_c", limit_value=50),
]
for f in features:
db.add(f)
db.add_all(features)
db.commit()
return features

View File

@@ -112,15 +112,14 @@ def dash_team_members(db, dash_stores, dash_owner):
auth = AuthManager()
users = []
for _ in range(2):
u = User(
users.append(User(
email=f"dteam_{uuid.uuid4().hex[:8]}@test.com",
username=f"dteam_{uuid.uuid4().hex[:8]}",
hashed_password=auth.hash_password("pass123"),
role="store_user",
is_active=True,
)
db.add(u)
users.append(u)
))
db.add_all(users)
db.flush()
db.add(StoreUser(store_id=dash_stores[0].id, user_id=users[0].id, is_active=True))
@@ -132,19 +131,19 @@ def dash_team_members(db, dash_stores, dash_owner):
@pytest.fixture
def dash_customers(db, dash_stores):
"""Create customers in the merchant's stores."""
customers = []
for i in range(4):
uid = uuid.uuid4().hex[:8]
db.add(
Customer(
store_id=dash_stores[0].id,
email=f"dc_{uid}@test.com",
hashed_password="hashed", # noqa: SEC001
first_name=f"F{i}",
last_name=f"L{i}",
customer_number=f"DC{uid}",
is_active=True,
)
)
customers.append(Customer(
store_id=dash_stores[0].id,
email=f"dc_{uid}@test.com",
hashed_password="hashed", # noqa: SEC001
first_name=f"F{i}",
last_name=f"L{i}",
customer_number=f"DC{uid}",
is_active=True,
))
db.add_all(customers)
db.commit()

View File

@@ -45,16 +45,15 @@ def cust_stores(db, cust_merchant):
stores = []
for i in range(2):
uid = uuid.uuid4().hex[:8].upper()
store = Store(
stores.append(Store(
merchant_id=cust_merchant.id,
store_code=f"CSTORE_{uid}",
subdomain=f"cstore{uid.lower()}",
name=f"Cust Store {i}",
is_active=True,
is_verified=True,
)
db.add(store)
stores.append(store)
))
db.add_all(stores)
db.commit()
for s in stores:
db.refresh(s)
@@ -68,7 +67,7 @@ def cust_customers(db, cust_stores):
# 3 customers in store 0
for i in range(3):
uid = uuid.uuid4().hex[:8]
c = Customer(
customers.append(Customer(
store_id=cust_stores[0].id,
email=f"cust_{uid}@test.com",
hashed_password="hashed", # noqa: SEC001
@@ -76,13 +75,11 @@ def cust_customers(db, cust_stores):
last_name=f"Last{i}",
customer_number=f"C{uid}",
is_active=True,
)
db.add(c)
customers.append(c)
))
# 2 customers in store 1
for i in range(2):
uid = uuid.uuid4().hex[:8]
c = Customer(
customers.append(Customer(
store_id=cust_stores[1].id,
email=f"cust_{uid}@test.com",
hashed_password="hashed", # noqa: SEC001
@@ -90,9 +87,8 @@ def cust_customers(db, cust_stores):
last_name=f"Last{i}",
customer_number=f"C{uid}",
is_active=True,
)
db.add(c)
customers.append(c)
))
db.add_all(customers)
db.commit()
return customers

View File

@@ -0,0 +1,232 @@
# app/modules/dev_tools/services/domain_health_service.py
"""
Domain Health Check Service
Simulates the middleware resolution pipeline for every active access method
(custom subdomain, default subdomain, custom domain, path-based) to verify
they resolve to the expected store.
"""
import logging
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
def run_domain_health_check(db: Session) -> dict:
"""
Check all active store access methods by simulating middleware resolution:
1. StorePlatform custom subdomains (acme-rewards.rewardflow.lu)
2. Default subdomains per platform (acme.omsflow.lu)
3. StoreDomain custom domains (wizatech.shop)
4. Path-based store routes (/storefront/{subdomain}/, /store/{subdomain}/)
Returns:
dict with keys: total, passed, failed, details (list of entry dicts)
"""
from app.modules.tenancy.models import Platform, Store, StoreDomain
from app.modules.tenancy.models.store_platform import StorePlatform
from middleware.store_context import StoreContextManager
details: list[dict] = []
# Pre-fetch all active stores, platforms, and memberships
active_stores = (
db.query(Store).filter(Store.is_active.is_(True)).all()
)
store_by_id: dict[int, Store] = {s.id: s for s in active_stores}
all_platforms = (
db.query(Platform).filter(Platform.is_active.is_(True)).all()
)
platform_by_id: dict[int, Platform] = {p.id: p for p in all_platforms}
all_store_platforms = (
db.query(StorePlatform)
.filter(StorePlatform.is_active.is_(True))
.all()
)
# Build store_id → list of (StorePlatform, Platform) for reuse
store_memberships: dict[int, list[tuple]] = {}
for sp in all_store_platforms:
platform = platform_by_id.get(sp.platform_id)
if platform:
store_memberships.setdefault(sp.store_id, []).append((sp, platform))
# ── 1. Custom subdomains (StorePlatform.custom_subdomain) ──
# e.g. acme-rewards.rewardflow.lu
for sp in all_store_platforms:
if not sp.custom_subdomain:
continue
expected_store = store_by_id.get(sp.store_id)
platform = platform_by_id.get(sp.platform_id)
if not expected_store or not platform:
continue
context = {
"detection_method": "subdomain",
"subdomain": sp.custom_subdomain,
"_platform": platform,
}
resolved = StoreContextManager.get_store_from_context(db, context)
passed = resolved is not None and resolved.id == expected_store.id
details.append(_entry(
domain=f"{sp.custom_subdomain}.{platform.domain or platform.code}",
entry_type="custom subdomain",
platform_code=platform.code,
expected_store=expected_store,
resolved_store=resolved,
passed=passed,
note="" if passed else "custom_subdomain lookup failed",
))
# ── 2. Default subdomains per platform ──
# e.g. acme.omsflow.lu — uses Store.subdomain on the platform domain.
# With fail-closed routing, this only works if StorePlatform.custom_subdomain
# matches Store.subdomain, so this check surfaces missing entries.
for store in active_stores:
if not store.subdomain:
continue
memberships = store_memberships.get(store.id, [])
for sp, platform in memberships:
if not platform.domain:
continue
# If custom_subdomain is already set to this value, it's covered
# in check #1 above — but we still check default subdomain access
# to confirm it resolves (it exercises the same code path).
# Skip if custom_subdomain == subdomain to avoid duplicate entries.
if sp.custom_subdomain and sp.custom_subdomain.lower() == store.subdomain.lower():
continue
context = {
"detection_method": "subdomain",
"subdomain": store.subdomain,
"_platform": platform,
}
resolved = StoreContextManager.get_store_from_context(db, context)
passed = resolved is not None and resolved.id == store.id
note = ""
if not passed:
note = (
f"Store '{store.subdomain}' not found or has no active "
f"membership on platform {platform.code}"
)
details.append(_entry(
domain=f"{store.subdomain}.{platform.domain}",
entry_type="default subdomain",
platform_code=platform.code,
expected_store=store,
resolved_store=resolved,
passed=passed,
note=note,
))
# ── 3. Custom domains (StoreDomain) ──
# e.g. wizatech.shop
store_domains = (
db.query(StoreDomain)
.filter(
StoreDomain.is_active.is_(True),
StoreDomain.is_verified.is_(True),
)
.all()
)
for sd in store_domains:
expected_store = sd.store
if not expected_store:
continue
platform = sd.platform
platform_code = platform.code if platform else None
context = {
"detection_method": "custom_domain",
"domain": sd.domain,
}
resolved = StoreContextManager.get_store_from_context(db, context)
passed = resolved is not None and resolved.id == expected_store.id
details.append(_entry(
domain=sd.domain,
entry_type="custom domain",
platform_code=platform_code,
expected_store=expected_store,
resolved_store=resolved,
passed=passed,
note="" if passed else "custom domain resolution failed",
))
# ── 4. Path-based routes ──
# e.g. /storefront/luxweb/, /store/luxweb/
# Path-based URLs use Store.subdomain as the path segment.
for store in active_stores:
if not store.subdomain:
continue
memberships = store_memberships.get(store.id, [])
platform_codes = sorted(p.code for _, p in memberships) if memberships else []
platform_label = ", ".join(platform_codes) if platform_codes else None
for prefix, label in [
("/storefront/", "storefront"),
("/store/", "store"),
]:
context = {
"detection_method": "path",
"subdomain": store.subdomain,
"path_prefix": f"{prefix}{store.subdomain}",
"full_prefix": prefix,
}
resolved = StoreContextManager.get_store_from_context(db, context)
passed = resolved is not None and resolved.id == store.id
details.append(_entry(
domain=f"{prefix}{store.subdomain}/",
entry_type=f"path ({label})",
platform_code=platform_label,
expected_store=store,
resolved_store=resolved,
passed=passed,
note="" if passed else f"path-based {label} resolution failed",
))
total = len(details)
passed_count = sum(1 for d in details if d["status"] == "pass")
return {
"total": total,
"passed": passed_count,
"failed": total - passed_count,
"details": details,
}
def _entry(
domain: str,
entry_type: str,
platform_code: str | None,
expected_store,
resolved_store,
passed: bool,
note: str,
) -> dict:
"""Build a single health-check result entry."""
return {
"domain": domain,
"type": entry_type,
"platform_code": platform_code,
"expected_store": (
getattr(expected_store, "store_code", expected_store.subdomain)
if expected_store else None
),
"resolved_store": (
getattr(resolved_store, "store_code", resolved_store.subdomain)
if resolved_store else None
),
"status": "pass" if passed else "fail",
"note": note,
}

View File

@@ -271,8 +271,8 @@ class TestGetStoreTransactions:
from datetime import UTC, datetime
# Create 3 transactions
for i in range(3):
t = LoyaltyTransaction(
txns = [
LoyaltyTransaction(
merchant_id=test_loyalty_card.merchant_id,
card_id=test_loyalty_card.id,
store_id=test_store.id,
@@ -280,7 +280,9 @@ class TestGetStoreTransactions:
points_delta=10 * (i + 1),
transaction_at=datetime.now(UTC),
)
db.add(t)
for i in range(3)
]
db.add_all(txns)
db.commit()
transactions, total = self.service.get_store_transactions(

View File

@@ -414,9 +414,10 @@ class TestGetProgramStats:
db.flush()
# Create cards with customers
customers = []
for i in range(3):
uid = uuid.uuid4().hex[:8]
customer = Customer(
customers.append(Customer(
email=f"stat_{uid}@test.com",
first_name="Stat",
last_name=f"Customer{i}",
@@ -424,11 +425,13 @@ class TestGetProgramStats:
customer_number=f"SC-{uid.upper()}",
store_id=store.id,
is_active=True,
)
db.add(customer)
db.flush()
))
db.add_all(customers)
db.flush()
card = LoyaltyCard(
cards = []
for i, customer in enumerate(customers):
cards.append(LoyaltyCard(
merchant_id=ps_merchant.id,
program_id=ps_program.id,
customer_id=customer.id,
@@ -437,8 +440,8 @@ class TestGetProgramStats:
total_points_earned=100 * (i + 1),
is_active=True,
last_activity_at=datetime.now(UTC),
)
db.add(card)
))
db.add_all(cards)
db.commit()
stats = self.service.get_program_stats(db, ps_program.id)
@@ -560,9 +563,10 @@ class TestGetMerchantStats:
db.add(store)
db.flush()
customers = []
for i in range(2):
uid = uuid.uuid4().hex[:8]
customer = Customer(
customers.append(Customer(
email=f"mstat_{uid}@test.com",
first_name="MS",
last_name=f"Customer{i}",
@@ -570,11 +574,13 @@ class TestGetMerchantStats:
customer_number=f"MS-{uid.upper()}",
store_id=store.id,
is_active=True,
)
db.add(customer)
db.flush()
))
db.add_all(customers)
db.flush()
card = LoyaltyCard(
cards = []
for i, customer in enumerate(customers):
cards.append(LoyaltyCard(
merchant_id=ps_merchant.id,
program_id=ps_program.id,
customer_id=customer.id,
@@ -584,8 +590,8 @@ class TestGetMerchantStats:
total_points_earned=200,
is_active=True,
last_activity_at=datetime.now(UTC),
)
db.add(card)
))
db.add_all(cards)
db.commit()
stats = self.service.get_merchant_stats(db, ps_merchant.id)

View File

@@ -33,7 +33,7 @@ def audit_logs(db):
# Sent logs
for i in range(3):
uid = uuid.uuid4().hex[:8]
log = EmailLog(
logs.append(EmailLog(
template_code="signup_welcome",
recipient_email=f"sent_{uid}@example.com",
recipient_name=f"Sent User {i}",
@@ -45,14 +45,12 @@ def audit_logs(db):
status=EmailStatus.SENT.value,
provider="debug",
sent_at=datetime.utcnow(),
)
db.add(log)
logs.append(log)
))
# Failed logs
for i in range(2):
uid = uuid.uuid4().hex[:8]
log = EmailLog(
logs.append(EmailLog(
template_code="order_confirmation",
recipient_email=f"failed_{uid}@example.com",
recipient_name=f"Failed User {i}",
@@ -62,22 +60,19 @@ def audit_logs(db):
status=EmailStatus.FAILED.value,
provider="smtp",
error_message="Connection refused",
)
db.add(log)
logs.append(log)
))
# Pending log
uid = uuid.uuid4().hex[:8]
log = EmailLog(
logs.append(EmailLog(
template_code="team_invitation",
recipient_email=f"pending_{uid}@example.com",
subject=f"Invitation {uid}",
from_email="noreply@orion.lu",
status=EmailStatus.PENDING.value,
)
db.add(log)
logs.append(log)
))
db.add_all(logs)
db.commit()
for log in logs:
db.refresh(log)

View File

@@ -55,7 +55,7 @@ def email_logs(db, email_template):
for status, count in statuses:
for i in range(count):
uid = uuid.uuid4().hex[:8]
log = EmailLog(
logs.append(EmailLog(
template_code=email_template.code,
recipient_email=f"user_{uid}@example.com",
recipient_name=f"User {uid}",
@@ -67,10 +67,9 @@ def email_logs(db, email_template):
status=status.value,
provider="debug",
sent_at=datetime.utcnow() if status != EmailStatus.PENDING else None,
)
db.add(log)
logs.append(log)
))
db.add_all(logs)
db.commit()
for log in logs:
db.refresh(log)
@@ -86,7 +85,7 @@ def multi_template_logs(db):
for tpl_code in templates:
for i in range(2):
uid = uuid.uuid4().hex[:8]
log = EmailLog(
logs.append(EmailLog(
template_code=tpl_code,
recipient_email=f"{tpl_code}_{uid}@example.com",
subject=f"{tpl_code} email",
@@ -94,12 +93,10 @@ def multi_template_logs(db):
status=EmailStatus.SENT.value,
provider="debug",
sent_at=datetime.utcnow(),
)
db.add(log)
logs.append(log)
))
# Add one failed log
log = EmailLog(
logs.append(EmailLog(
template_code="password_reset",
recipient_email="failed@example.com",
subject="Failed email",
@@ -107,10 +104,9 @@ def multi_template_logs(db):
status=EmailStatus.FAILED.value,
provider="debug",
error_message="SMTP connection refused",
)
db.add(log)
logs.append(log)
))
db.add_all(logs)
db.commit()
for log in logs:
db.refresh(log)

View File

@@ -138,9 +138,9 @@ class CampaignService:
sent_at=datetime.now(UTC),
sent_by_user_id=sent_by_user_id,
)
db.add(send)
sends.append(send)
db.add_all(sends)
db.flush()
logger.info("Sent campaign %d to %d prospects", template_id, len(prospect_ids))
return sends

View File

@@ -324,8 +324,7 @@ class EnrichmentService:
ProspectContact.source_element == "regex",
).delete()
for contact in contacts:
db.add(contact)
db.add_all(contacts)
# Mark first email and phone as primary
if contacts:

View File

@@ -127,15 +127,17 @@ class ProspectService:
# Create inline contacts if provided
contacts = data.get("contacts", [])
contact_objects = []
for c in contacts:
contact = ProspectContact(
contact_objects.append(ProspectContact(
prospect_id=prospect.id,
contact_type=c["contact_type"],
value=c["value"],
label=c.get("label"),
is_primary=c.get("is_primary", False),
)
db.add(contact)
))
if contact_objects:
db.add_all(contact_objects)
db.flush()
logger.info("Created prospect: %s (channel=%s)", prospect.display_name, channel)
@@ -144,6 +146,7 @@ class ProspectService:
def create_bulk(self, db: Session, domain_names: list[str], source: str = "csv_import") -> tuple[int, int]:
created = 0
skipped = 0
new_prospects = []
for name in domain_names:
name = name.strip().lower()
if not name:
@@ -152,14 +155,15 @@ class ProspectService:
if existing:
skipped += 1
continue
prospect = Prospect(
new_prospects.append(Prospect(
channel=ProspectChannel.DIGITAL,
domain_name=name,
source=source,
)
db.add(prospect)
))
created += 1
if new_prospects:
db.add_all(new_prospects)
db.flush()
logger.info("Bulk import: %d created, %d skipped", created, skipped)
return created, skipped

View File

@@ -27,6 +27,7 @@ from .admin_platform_users import admin_platform_users_router
from .admin_platforms import admin_platforms_router
from .admin_store_domains import admin_store_domains_router
from .admin_store_roles import admin_store_roles_router
from .admin_store_subdomains import admin_store_subdomains_router
from .admin_stores import admin_stores_router
from .admin_users import admin_users_router
from .user_account import admin_account_router
@@ -42,6 +43,7 @@ router.include_router(admin_merchants_router, tags=["admin-merchants"])
router.include_router(admin_platforms_router, tags=["admin-platforms"])
router.include_router(admin_stores_router, tags=["admin-stores"])
router.include_router(admin_store_domains_router, tags=["admin-store-domains"])
router.include_router(admin_store_subdomains_router, tags=["admin-store-subdomains"])
router.include_router(admin_store_roles_router, tags=["admin-store-roles"])
router.include_router(admin_merchant_domains_router, tags=["admin-merchant-domains"])
router.include_router(admin_modules_router, tags=["admin-modules"])

View File

@@ -0,0 +1,137 @@
# app/modules/tenancy/routes/api/admin_store_subdomains.py
"""
Admin endpoints for managing store custom subdomains (per-platform).
Each store can have a custom subdomain on each platform it belongs to.
For example, store "WizaTech" on the loyalty platform could have
custom_subdomain="wizatech-rewards" → wizatech-rewards.rewardflow.lu
"""
import logging
from fastapi import APIRouter, Body, Depends, Path
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.modules.tenancy.schemas.auth import UserContext
from app.modules.tenancy.services.store_subdomain_service import store_subdomain_service
admin_store_subdomains_router = APIRouter(prefix="/stores")
logger = logging.getLogger(__name__)
# ── Schemas ──────────────────────────────────────────────────────────────────
class CustomSubdomainEntry(BaseModel):
store_platform_id: int
platform_id: int
platform_code: str
platform_name: str
platform_domain: str | None
custom_subdomain: str | None
default_subdomain: str | None
full_url: str | None
default_url: str | None
class CustomSubdomainListResponse(BaseModel):
subdomains: list[CustomSubdomainEntry]
total: int
class SetCustomSubdomainRequest(BaseModel):
subdomain: str = Field(
...,
min_length=3,
max_length=63,
description="Custom subdomain (lowercase, alphanumeric + hyphens)",
)
class CustomSubdomainUpdateResponse(BaseModel):
message: str
platform_id: int
custom_subdomain: str | None
# ── Endpoints ────────────────────────────────────────────────────────────────
@admin_store_subdomains_router.get(
"/{store_id}/custom-subdomains",
response_model=CustomSubdomainListResponse,
)
def list_custom_subdomains(
store_id: int = Path(..., description="Store ID", gt=0),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
List all platform memberships with their custom subdomains.
Returns one entry per active platform the store belongs to,
showing the custom_subdomain (if set) and the default subdomain.
"""
entries = store_subdomain_service.get_custom_subdomains(db, store_id)
return CustomSubdomainListResponse(
subdomains=[CustomSubdomainEntry(**e) for e in entries],
total=len(entries),
)
@admin_store_subdomains_router.put(
"/{store_id}/custom-subdomains/{platform_id}",
response_model=CustomSubdomainUpdateResponse,
)
def set_custom_subdomain(
store_id: int = Path(..., description="Store ID", gt=0),
platform_id: int = Path(..., description="Platform ID", gt=0),
payload: SetCustomSubdomainRequest = Body(...),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Set or update the custom subdomain for a store on a specific platform.
The subdomain must be unique on the platform (no other store can claim it).
Format: lowercase alphanumeric + hyphens, 3-63 characters.
"""
sp = store_subdomain_service.set_custom_subdomain(
db, store_id, platform_id, payload.subdomain
)
db.commit()
return CustomSubdomainUpdateResponse(
message=f"Custom subdomain set to '{sp.custom_subdomain}'",
platform_id=platform_id,
custom_subdomain=sp.custom_subdomain,
)
@admin_store_subdomains_router.delete(
"/{store_id}/custom-subdomains/{platform_id}",
response_model=CustomSubdomainUpdateResponse,
)
def clear_custom_subdomain(
store_id: int = Path(..., description="Store ID", gt=0),
platform_id: int = Path(..., description="Platform ID", gt=0),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Clear the custom subdomain for a store on a specific platform.
The store will still be accessible via its default subdomain
(Store.subdomain + platform domain).
"""
store_subdomain_service.clear_custom_subdomain(db, store_id, platform_id)
db.commit()
return CustomSubdomainUpdateResponse(
message="Custom subdomain cleared",
platform_id=platform_id,
custom_subdomain=None,
)

View File

@@ -231,15 +231,17 @@ class MerchantStoreService:
# Assign to platforms if provided
platform_ids = store_data.get("platform_ids", [])
store_platforms = []
for pid in platform_ids:
platform = db.query(Platform).filter(Platform.id == pid).first()
if platform:
sp = StorePlatform(
store_platforms.append(StorePlatform(
store_id=store.id,
platform_id=pid,
is_active=True,
)
db.add(sp)
))
if store_platforms:
db.add_all(store_platforms)
db.flush()
db.refresh(store)

View File

@@ -0,0 +1,170 @@
# app/modules/tenancy/services/store_subdomain_service.py
"""
Service for managing StorePlatform custom subdomains.
Handles validation (format, uniqueness) and CRUD operations on
the custom_subdomain field of StorePlatform entries.
"""
import logging
import re
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.exceptions.base import (
ConflictException,
ResourceNotFoundException,
ValidationException,
)
from app.modules.tenancy.models import Platform, Store
from app.modules.tenancy.models.store_platform import StorePlatform
logger = logging.getLogger(__name__)
# Subdomain rules: lowercase alphanumeric + hyphens, 3-63 chars, no leading/trailing hyphen
_SUBDOMAIN_RE = re.compile(r"^[a-z0-9]([a-z0-9-]{1,61}[a-z0-9])?$")
class StoreSubdomainService:
"""Manage custom subdomains on StorePlatform entries."""
def get_custom_subdomains(self, db: Session, store_id: int) -> list[dict]:
"""
List all platform memberships for a store with their custom subdomains.
Returns a list of dicts with platform info and custom_subdomain (may be None).
"""
store = db.query(Store).filter(Store.id == store_id).first()
if not store:
raise ResourceNotFoundException("Store", str(store_id))
memberships = (
db.query(StorePlatform)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.is_active.is_(True),
)
.all()
)
results = []
for sp in memberships:
platform = db.query(Platform).filter(Platform.id == sp.platform_id).first()
if not platform:
continue
results.append({
"store_platform_id": sp.id,
"platform_id": sp.platform_id,
"platform_code": platform.code,
"platform_name": platform.name,
"platform_domain": platform.domain,
"custom_subdomain": sp.custom_subdomain,
"default_subdomain": store.subdomain,
"full_url": (
f"{sp.custom_subdomain}.{platform.domain}"
if sp.custom_subdomain and platform.domain
else None
),
"default_url": (
f"{store.subdomain}.{platform.domain}"
if store.subdomain and platform.domain
else None
),
})
return results
def set_custom_subdomain(
self, db: Session, store_id: int, platform_id: int, subdomain: str
) -> StorePlatform:
"""
Set or update the custom_subdomain for a store on a specific platform.
Validates:
- Subdomain format (lowercase, alphanumeric + hyphens)
- Uniqueness on the platform (no other store claims it)
- StorePlatform membership exists and is active
"""
subdomain = subdomain.strip().lower()
# Validate format
if not _SUBDOMAIN_RE.match(subdomain):
raise ValidationException(
"Must be 3-63 characters, lowercase alphanumeric and hyphens, "
"cannot start or end with a hyphen.",
field="custom_subdomain",
)
# Find the membership
sp = (
db.query(StorePlatform)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.platform_id == platform_id,
StorePlatform.is_active.is_(True),
)
.first()
)
if not sp:
raise ResourceNotFoundException(
"StorePlatform",
f"store_id={store_id}, platform_id={platform_id}",
)
# Check uniqueness on this platform (exclude current entry)
existing = (
db.query(StorePlatform)
.filter(
func.lower(StorePlatform.custom_subdomain) == subdomain,
StorePlatform.platform_id == platform_id,
StorePlatform.id != sp.id,
)
.first()
)
if existing:
raise ConflictException(
f"Subdomain '{subdomain}' is already claimed by another store "
f"on this platform."
)
sp.custom_subdomain = subdomain
db.flush()
logger.info(
f"Set custom_subdomain='{subdomain}' for store_id={store_id} "
f"on platform_id={platform_id}"
)
return sp
def clear_custom_subdomain(
self, db: Session, store_id: int, platform_id: int
) -> StorePlatform:
"""Clear the custom_subdomain for a store on a specific platform."""
sp = (
db.query(StorePlatform)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.platform_id == platform_id,
StorePlatform.is_active.is_(True),
)
.first()
)
if not sp:
raise ResourceNotFoundException(
"StorePlatform",
f"store_id={store_id}, platform_id={platform_id}",
)
old_value = sp.custom_subdomain
sp.custom_subdomain = None
db.flush()
logger.info(
f"Cleared custom_subdomain (was '{old_value}') for store_id={store_id} "
f"on platform_id={platform_id}"
)
return sp
store_subdomain_service = StoreSubdomainService()

View File

@@ -27,6 +27,13 @@ function adminStoreDetail() {
domainSaving: false,
newDomain: { domain: '', platform_id: '' },
// Custom subdomain management state
customSubdomains: [],
customSubdomainsLoading: false,
editingSubdomainPlatformId: null,
editingSubdomainValue: '',
subdomainSaving: false,
// Initialize
async init() {
// Load i18n translations
@@ -54,6 +61,7 @@ function adminStoreDetail() {
await Promise.all([
this.loadSubscriptions(),
this.loadDomains(),
this.loadCustomSubdomains(),
]);
}
} else {
@@ -274,6 +282,70 @@ function adminStoreDetail() {
}
},
// ====================================================================
// CUSTOM SUBDOMAIN MANAGEMENT
// ====================================================================
async loadCustomSubdomains() {
if (!this.store?.id) return;
this.customSubdomainsLoading = true;
try {
const url = `/admin/stores/${this.store.id}/custom-subdomains`;
const response = await apiClient.get(url);
this.customSubdomains = response.subdomains || [];
detailLog.info('Custom subdomains loaded:', this.customSubdomains.length);
} catch (error) {
if (error.status === 404) {
this.customSubdomains = [];
} else {
detailLog.warn('Failed to load custom subdomains:', error.message);
}
} finally {
this.customSubdomainsLoading = false;
}
},
startEditSubdomain(entry) {
this.editingSubdomainPlatformId = entry.platform_id;
this.editingSubdomainValue = entry.custom_subdomain || '';
},
cancelEditSubdomain() {
this.editingSubdomainPlatformId = null;
this.editingSubdomainValue = '';
},
async saveCustomSubdomain(platformId) {
if (!this.editingSubdomainValue || this.subdomainSaving) return;
this.subdomainSaving = true;
try {
await apiClient.put(
`/admin/stores/${this.store.id}/custom-subdomains/${platformId}`,
{ subdomain: this.editingSubdomainValue.trim().toLowerCase() }
);
Utils.showToast('Custom subdomain saved', 'success');
this.cancelEditSubdomain();
await this.loadCustomSubdomains();
} catch (error) {
Utils.showToast(error.message || 'Failed to save subdomain', 'error');
} finally {
this.subdomainSaving = false;
}
},
async clearCustomSubdomain(platformId, subdomainName) {
if (!confirm(`Clear custom subdomain "${subdomainName}"? The store will use its default subdomain on this platform.`)) return;
try {
await apiClient.delete(
`/admin/stores/${this.store.id}/custom-subdomains/${platformId}`
);
Utils.showToast('Custom subdomain cleared', 'success');
await this.loadCustomSubdomains();
} catch (error) {
Utils.showToast(error.message || 'Failed to clear subdomain', 'error');
}
},
// Refresh store data
async refresh() {
detailLog.info('=== STORE REFRESH TRIGGERED ===');

View File

@@ -492,6 +492,117 @@
</div>
</div>
<!-- Custom Subdomains (per-platform) -->
<div class="px-4 py-3 mb-8 bg-white rounded-lg shadow-md dark:bg-gray-800">
<div class="flex items-center justify-between mb-4">
<h3 class="text-lg font-semibold text-gray-700 dark:text-gray-200">
Custom Subdomains
</h3>
<button
@click="loadCustomSubdomains()"
class="flex items-center px-3 py-1.5 text-sm font-medium text-gray-600 bg-gray-200 rounded-lg hover:bg-gray-300 dark:bg-gray-600 dark:text-gray-300 dark:hover:bg-gray-500">
<span x-html="$icon('refresh', 'w-4 h-4 mr-1')"></span>
Refresh
</button>
</div>
<p class="text-xs text-gray-500 dark:text-gray-400 mb-4">
Each platform membership can have a custom subdomain override. If not set, the store's default subdomain
(<strong x-text="store?.subdomain"></strong>) is used on each platform.
</p>
<!-- Loading -->
<div x-show="customSubdomainsLoading" class="text-center py-4">
<p class="text-sm text-gray-500 dark:text-gray-400">Loading subdomains...</p>
</div>
<!-- Subdomains List -->
<div x-show="!customSubdomainsLoading && customSubdomains.length > 0" class="space-y-3">
<template x-for="entry in customSubdomains" :key="entry.store_platform_id">
<div class="p-3 bg-gray-50 rounded-lg dark:bg-gray-700">
<div class="flex items-center justify-between mb-2">
<div class="flex items-center gap-2">
<span x-html="$icon('globe', 'w-4 h-4 text-purple-500')"></span>
<span class="text-sm font-medium text-gray-700 dark:text-gray-200" x-text="entry.platform_name"></span>
<span class="px-1.5 py-0.5 text-xs font-medium text-gray-600 bg-gray-200 rounded-full dark:bg-gray-600 dark:text-gray-300" x-text="entry.platform_code"></span>
</div>
<div class="flex items-center gap-1">
<!-- Edit button (show when not editing) -->
<button
x-show="editingSubdomainPlatformId !== entry.platform_id"
@click="startEditSubdomain(entry)"
class="px-2 py-1 text-xs font-medium text-blue-600 hover:text-blue-700 dark:text-blue-400"
title="Edit subdomain">
<span x-html="$icon('edit', 'w-4 h-4')"></span>
</button>
<!-- Clear button (show when custom subdomain is set and not editing) -->
<button
x-show="entry.custom_subdomain && editingSubdomainPlatformId !== entry.platform_id"
@click="clearCustomSubdomain(entry.platform_id, entry.custom_subdomain)"
class="px-2 py-1 text-xs font-medium text-red-600 hover:text-red-700 dark:text-red-400"
title="Clear custom subdomain">
<span x-html="$icon('delete', 'w-4 h-4')"></span>
</button>
</div>
</div>
<!-- Display mode -->
<div x-show="editingSubdomainPlatformId !== entry.platform_id" class="text-sm">
<div class="flex items-center gap-2">
<span class="text-gray-500 dark:text-gray-400">Custom:</span>
<template x-if="entry.custom_subdomain">
<span>
<span class="font-medium text-purple-600 dark:text-purple-400" x-text="entry.custom_subdomain"></span>
<span class="text-gray-400" x-text="'.' + entry.platform_domain" x-show="entry.platform_domain"></span>
</span>
</template>
<template x-if="!entry.custom_subdomain">
<span class="text-gray-400 italic">Not set</span>
</template>
</div>
<div class="flex items-center gap-2 mt-1">
<span class="text-gray-500 dark:text-gray-400">Default:</span>
<span class="text-gray-600 dark:text-gray-300" x-text="entry.default_url || '--'"></span>
</div>
</div>
<!-- Edit mode -->
<div x-show="editingSubdomainPlatformId === entry.platform_id" x-transition class="mt-2">
<div class="flex items-center gap-2">
<input
type="text"
x-model="editingSubdomainValue"
:placeholder="entry.default_subdomain || 'custom-subdomain'"
@keydown.enter="saveCustomSubdomain(entry.platform_id)"
@keydown.escape="cancelEditSubdomain()"
class="flex-1 px-3 py-1.5 text-sm border rounded-lg dark:bg-gray-800 dark:border-gray-600 dark:text-gray-300 focus:outline-none focus:ring-2 focus:ring-purple-500" />
<span class="text-sm text-gray-400" x-text="'.' + entry.platform_domain" x-show="entry.platform_domain"></span>
</div>
<div class="flex justify-end gap-2 mt-2">
<button
@click="cancelEditSubdomain()"
class="px-3 py-1 text-xs font-medium text-gray-600 bg-gray-200 rounded-lg hover:bg-gray-300 dark:bg-gray-600 dark:text-gray-300 dark:hover:bg-gray-500">
Cancel
</button>
<button
@click="saveCustomSubdomain(entry.platform_id)"
:disabled="!editingSubdomainValue || subdomainSaving"
class="px-3 py-1 text-xs font-medium text-white bg-purple-600 rounded-lg hover:bg-purple-700 disabled:opacity-50 disabled:cursor-not-allowed">
<span x-show="!subdomainSaving">Save</span>
<span x-show="subdomainSaving">Saving...</span>
</button>
</div>
</div>
</div>
</template>
</div>
<!-- No Platform Memberships -->
<div x-show="!customSubdomainsLoading && customSubdomains.length === 0" class="text-center py-4">
<p class="text-sm text-gray-500 dark:text-gray-400">No active platform memberships found.</p>
</div>
</div>
<!-- More Actions -->
<div class="px-4 py-3 bg-white rounded-lg shadow-md dark:bg-gray-800">
<h3 class="mb-4 text-lg font-semibold text-gray-700 dark:text-gray-200">

View File

@@ -0,0 +1,257 @@
# tests/unit/services/test_store_subdomain_service.py
"""Unit tests for StoreSubdomainService."""
import uuid
import pytest
from app.exceptions.base import (
ConflictException,
ResourceNotFoundException,
ValidationException,
)
from app.modules.tenancy.models import Platform, Store
from app.modules.tenancy.models.store_platform import StorePlatform
from app.modules.tenancy.services.store_subdomain_service import StoreSubdomainService
@pytest.fixture
def subdomain_service():
return StoreSubdomainService()
@pytest.fixture
def sd_merchant(db, test_admin):
"""Create a merchant for subdomain tests."""
from app.modules.tenancy.models import Merchant
unique_id = str(uuid.uuid4())[:8]
merchant = Merchant(
name=f"SD Merchant {unique_id}",
owner_user_id=test_admin.id,
contact_email=f"sd{unique_id}@test.com",
is_active=True,
is_verified=True,
)
db.add(merchant)
db.commit()
db.refresh(merchant)
return merchant
@pytest.fixture
def sd_platform(db):
"""Create a test platform."""
unique_id = str(uuid.uuid4())[:8]
platform = Platform(
code=f"sd{unique_id}",
name=f"SD Platform {unique_id}",
domain=f"sd{unique_id}.example.com",
is_active=True,
)
db.add(platform)
db.commit()
db.refresh(platform)
return platform
@pytest.fixture
def sd_platform_2(db):
"""Create a second test platform."""
unique_id = str(uuid.uuid4())[:8]
platform = Platform(
code=f"sd2{unique_id}",
name=f"SD Platform 2 {unique_id}",
domain=f"sd2{unique_id}.example.com",
is_active=True,
)
db.add(platform)
db.commit()
db.refresh(platform)
return platform
@pytest.fixture
def sd_store(db, sd_merchant):
"""Create a test store."""
unique_id = str(uuid.uuid4())[:8]
store = Store(
store_code=f"SD{unique_id}".upper(),
name=f"SD Store {unique_id}",
subdomain=f"sdstore{unique_id}",
merchant_id=sd_merchant.id,
is_active=True,
)
db.add(store)
db.commit()
db.refresh(store)
return store
@pytest.fixture
def sd_membership(db, sd_store, sd_platform):
"""Create a StorePlatform membership."""
sp = StorePlatform(
store_id=sd_store.id,
platform_id=sd_platform.id,
is_active=True,
)
db.add(sp)
db.commit()
db.refresh(sp)
return sp
@pytest.mark.unit
@pytest.mark.stores
class TestGetCustomSubdomains:
"""Tests for listing custom subdomains."""
def test_list_returns_memberships(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
results = subdomain_service.get_custom_subdomains(db, sd_store.id)
assert len(results) == 1
assert results[0]["platform_code"] == sd_platform.code
assert results[0]["custom_subdomain"] is None
assert results[0]["default_subdomain"] == sd_store.subdomain
def test_list_shows_custom_subdomain(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
sd_membership.custom_subdomain = "my-custom"
db.commit()
results = subdomain_service.get_custom_subdomains(db, sd_store.id)
assert results[0]["custom_subdomain"] == "my-custom"
assert results[0]["full_url"] == f"my-custom.{sd_platform.domain}"
def test_list_nonexistent_store_raises(self, db, subdomain_service):
with pytest.raises(ResourceNotFoundException):
subdomain_service.get_custom_subdomains(db, 999999)
def test_list_excludes_inactive_memberships(self, db, subdomain_service, sd_store, sd_membership):
sd_membership.is_active = False
db.commit()
results = subdomain_service.get_custom_subdomains(db, sd_store.id)
assert len(results) == 0
@pytest.mark.unit
@pytest.mark.stores
class TestSetCustomSubdomain:
"""Tests for setting a custom subdomain."""
def test_set_valid_subdomain(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
sp = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "my-store")
assert sp.custom_subdomain == "my-store"
def test_set_normalizes_to_lowercase(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
sp = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "My-Store")
assert sp.custom_subdomain == "my-store"
def test_set_strips_whitespace(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
sp = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, " my-store ")
assert sp.custom_subdomain == "my-store"
def test_set_rejects_leading_hyphen(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
with pytest.raises(ValidationException):
subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "-invalid")
def test_set_rejects_trailing_hyphen(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
with pytest.raises(ValidationException):
subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "invalid-")
def test_set_rejects_uppercase_special_chars(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
with pytest.raises(ValidationException):
subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "inv@lid")
def test_set_rejects_too_short(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
with pytest.raises(ValidationException):
subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "ab")
def test_set_nonexistent_membership_raises(self, db, subdomain_service, sd_store):
with pytest.raises(ResourceNotFoundException):
subdomain_service.set_custom_subdomain(db, sd_store.id, 999999, "test-sub")
def test_set_duplicate_on_same_platform_raises(
self, db, subdomain_service, sd_store, sd_membership, sd_platform, sd_merchant
):
# Create another store with same subdomain on same platform
unique_id = str(uuid.uuid4())[:8]
other_store = Store(
store_code=f"OTHER{unique_id}".upper(),
name=f"Other Store {unique_id}",
subdomain=f"other{unique_id}",
merchant_id=sd_merchant.id,
is_active=True,
)
db.add(other_store)
db.flush()
other_sp = StorePlatform(
store_id=other_store.id,
platform_id=sd_platform.id,
is_active=True,
custom_subdomain="taken-name",
)
db.add(other_sp)
db.commit()
with pytest.raises(ConflictException):
subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "taken-name")
def test_set_same_subdomain_different_platform_ok(
self, db, subdomain_service, sd_store, sd_membership, sd_platform, sd_platform_2
):
# Set on first platform
subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "shared-name")
db.commit()
# Create membership on second platform
sp2 = StorePlatform(
store_id=sd_store.id,
platform_id=sd_platform_2.id,
is_active=True,
)
db.add(sp2)
db.commit()
# Same subdomain on different platform should work
result = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform_2.id, "shared-name")
assert result.custom_subdomain == "shared-name"
def test_update_existing_subdomain(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "first")
db.commit()
sp = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "second")
assert sp.custom_subdomain == "second"
@pytest.mark.unit
@pytest.mark.stores
class TestClearCustomSubdomain:
"""Tests for clearing a custom subdomain."""
def test_clear_existing_subdomain(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
sd_membership.custom_subdomain = "to-clear"
db.commit()
sp = subdomain_service.clear_custom_subdomain(db, sd_store.id, sd_platform.id)
assert sp.custom_subdomain is None
def test_clear_already_none_ok(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
sp = subdomain_service.clear_custom_subdomain(db, sd_store.id, sd_platform.id)
assert sp.custom_subdomain is None
def test_clear_nonexistent_membership_raises(self, db, subdomain_service, sd_store):
with pytest.raises(ResourceNotFoundException):
subdomain_service.clear_custom_subdomain(db, sd_store.id, 999999)

View File

@@ -68,15 +68,14 @@ def metrics_team_members(db, metrics_stores):
auth = AuthManager()
users = []
for i in range(3):
u = User(
users.append(User(
email=f"team_{uuid.uuid4().hex[:8]}@test.com",
username=f"team_{uuid.uuid4().hex[:8]}",
hashed_password=auth.hash_password("pass123"),
role="store_user",
is_active=True,
)
db.add(u)
users.append(u)
))
db.add_all(users)
db.flush()
# User 0 on store 0 and store 1 (should be counted once)

View File

@@ -0,0 +1,162 @@
# Domain Health Diagnostic Tool
The Domain Health tool simulates the middleware resolution pipeline for every configured store access method, verifying that each domain, subdomain, and path-based route resolves to the expected store.
## Overview
| Aspect | Detail |
|--------|--------|
| Location | Admin > Diagnostics > Resolution > Domain Health |
| URL | `/admin/platform-debug` (select "Domain Health" in sidebar) |
| API Endpoint | `GET /api/v1/admin/debug/domain-health` |
| Access | Super admin only |
## What It Checks
The tool runs four categories of checks against the **live database**:
### 1. Custom Subdomains (StorePlatform)
Checks every active `StorePlatform` entry that has a `custom_subdomain` set.
| Example | What It Tests |
|---------|--------------|
| `acme-rewards.rewardflow.lu` | Does `detection_method=subdomain` with `custom_subdomain` + platform context resolve to the expected store? |
This is the most critical check because the fail-closed routing policy means a custom subdomain that doesn't resolve will return a 404 rather than falling back to a global lookup.
### 2. Default Subdomains per Platform
Checks every active store's default `subdomain` on each platform it belongs to.
| Example | What It Tests |
|---------|--------------|
| `acme.omsflow.lu` | Does `detection_method=subdomain` with `Store.subdomain` + platform context resolve to the expected store? |
The middleware first tries `StorePlatform.custom_subdomain`, then falls back to `Store.subdomain` — but **only** if the store has an active `StorePlatform` membership on the detected platform. A store with no membership on that platform will be blocked (fail-closed, prevents cross-tenant leaks).
Entries where `custom_subdomain` already equals the store's `subdomain` are de-duplicated (covered by check #1).
### 3. Custom Domains (StoreDomain)
Checks every active, verified `StoreDomain` entry.
| Example | What It Tests |
|---------|--------------|
| `wizatech.shop` | Does `detection_method=custom_domain` resolve to the store that owns the domain? |
The Platform column shows the platform associated with the `StoreDomain.platform_id` relationship.
### 4. Path-Based Routes
Checks every active store's `subdomain` field via path-based resolution.
| Example | What It Tests |
|---------|--------------|
| `/storefront/luxweb/` | Does `detection_method=path` with `subdomain=luxweb` resolve to the correct store? |
| `/store/luxweb/` | Same check for the store dashboard path |
The Platform column shows all platforms the store is a member of (via `StorePlatform`).
!!! note "Path segments use Store.subdomain"
Path-based URLs use the store's `subdomain` field, **not** `store_code`. For example, a store with `store_code=LUXWEBSITES` and `subdomain=luxweb` is accessed at `/storefront/luxweb/`, not `/storefront/LUXWEBSITES/`.
## How It Works
The tool calls the **same resolution functions** that the live middleware uses:
```mermaid
graph LR
A[Health Check] -->|builds context dict| B[StoreContextManager.get_store_from_context]
B -->|queries DB| C[Store resolved?]
C -->|id matches expected| D[PASS]
C -->|mismatch or None| E[FAIL]
```
It does **not** make actual HTTP requests. This means it validates database configuration and resolution logic, but cannot detect:
- DNS misconfigurations
- Reverse proxy / Cloudflare routing issues
- SSL certificate problems
- Network-level failures
## Reading the Results
### Summary Bar
| Counter | Meaning |
|---------|---------|
| Total Checked | Number of access methods tested |
| Passed | Resolved to the expected store |
| Failed | Resolution mismatch or store not found |
### Results Table
| Column | Description |
|--------|-------------|
| Status | `pass` or `fail` badge |
| Domain | The domain, subdomain, or path being tested |
| Type | `custom subdomain`, `default subdomain`, `custom domain`, `path (storefront)`, or `path (store)` |
| Platform | Platform code(s) the entry belongs to |
| Expected Store | The store code we expect to resolve |
| Resolved Store | The store code that actually resolved (or `--` if none) |
| Note | Error description for failed entries |
### Common Failure Scenarios
| Symptom | Likely Cause |
|---------|-------------|
| Custom subdomain fails | `StorePlatform.custom_subdomain` doesn't match or `StorePlatform.is_active=false` |
| Default subdomain fails | Store has no active `StorePlatform` membership on the detected platform (cross-tenant blocked) |
| Custom domain fails | `StoreDomain` not verified, inactive, or store is inactive |
| Path-based fails | Store's `subdomain` field is empty or store is inactive |
| Resolved store differs from expected | Two stores have conflicting subdomain/domain entries |
## API Response
```
GET /api/v1/admin/debug/domain-health
```
```json
{
"total": 36,
"passed": 30,
"failed": 6,
"details": [
{
"domain": "acme-rewards.rewardflow.lu",
"type": "custom subdomain",
"platform_code": "loyalty",
"expected_store": "ACME",
"resolved_store": "ACME",
"status": "pass",
"note": ""
},
{
"domain": "acme.omsflow.lu",
"type": "default subdomain",
"platform_code": "oms",
"expected_store": "ACME",
"resolved_store": "ACME",
"status": "pass",
"note": ""
},
{
"domain": "/storefront/acme/",
"type": "path (storefront)",
"platform_code": "oms, loyalty",
"expected_store": "ACME",
"resolved_store": "ACME",
"status": "pass",
"note": ""
}
]
}
```
## Related Documentation
- [Middleware Stack](../../architecture/middleware.md) - Full middleware pipeline reference
- [Dev Tools Module](../../modules/dev_tools/index.md) - Other diagnostic tools
- [Multi-Tenant System](../../architecture/multi-tenant.md) - Tenant detection architecture

View File

@@ -82,7 +82,7 @@ class FrontendTypeMiddleware(BaseHTTPMiddleware):
return response
def get_frontend_type(request: Request) -> FrontendType:
def get_frontend_type(request: Request) -> FrontendType | None:
"""
Helper function to get current frontend type from request.
@@ -90,6 +90,7 @@ def get_frontend_type(request: Request) -> FrontendType:
request: FastAPI request object
Returns:
FrontendType enum value (defaults to PLATFORM if not set)
FrontendType enum value, or None if the middleware hasn't run yet.
Callers should handle None explicitly where context is clear.
"""
return getattr(request.state, "frontend_type", FrontendType.PLATFORM)
return getattr(request.state, "frontend_type", None)

View File

@@ -36,6 +36,31 @@ MAIN_PLATFORM_CODE = "main"
_LOCAL_HOSTS = {"localhost", "127.0.0.1", "testserver"}
def _strip_port(host: str) -> str:
"""
Remove port from host, handling IPv6 bracket notation.
Examples:
"localhost:8000""localhost"
"[::1]:8000""::1"
"[::1]""::1"
"example.com""example.com"
"192.168.1.1:80""192.168.1.1"
"""
if not host:
return host
# IPv6 with brackets: [::1]:8000 or [::1]
if host.startswith("["):
bracket_end = host.find("]")
if bracket_end != -1:
return host[1:bracket_end]
return host # malformed, return as-is
# Regular host:port
if ":" in host:
return host.split(":")[0]
return host
class PlatformContextManager:
"""Manages platform context detection for multi-platform routing."""
@@ -62,7 +87,7 @@ class PlatformContextManager:
path = request.url.path
# Remove port from host if present (e.g., localhost:9999 -> localhost)
host_without_port = host.split(":")[0] if ":" in host else host
host_without_port = _strip_port(host)
# Skip platform detection for admin routes - admin is global
if FrontendDetector.is_admin(host, path):
@@ -359,7 +384,7 @@ class PlatformContextMiddleware:
# For storefront API requests on localhost, the path doesn't contain
# /platforms/{code}/, so extract platform from the Referer header instead.
# e.g., Referer: http://localhost:8000/platforms/loyalty/storefront/FASHIONHUB/...
host_without_port = host.split(":")[0] if ":" in host else host
host_without_port = _strip_port(host)
if (
host_without_port in _LOCAL_HOSTS
and path.startswith("/api/v1/storefront/")
@@ -450,7 +475,7 @@ class PlatformContextMiddleware:
- /platforms/oms/pricing → OMS platform pricing
- /platforms/loyalty/ → Loyalty platform homepage
"""
host_without_port = host.split(":")[0] if ":" in host else host
host_without_port = _strip_port(host)
# Method 1: Domain-based (production)
if host_without_port and host_without_port not in _LOCAL_HOSTS:

View File

@@ -245,7 +245,7 @@ class StoreContextManager:
)
return store
# 2b. Fallback to Store.subdomain (global default)
# 2b. Fallback to Store.subdomain with platform membership check
store = (
db.query(Store)
.filter(func.lower(Store.subdomain) == subdomain.lower())
@@ -254,6 +254,33 @@ class StoreContextManager:
)
if store:
# When a platform context exists and detection is "subdomain",
# verify the store actually has an active membership on this
# platform. Without this check, a subdomain like
# "other-tenant.omsflow.lu" could resolve a store that only
# belongs to the loyalty platform — a cross-tenant leak.
if platform and context.get("detection_method") == "subdomain":
from app.modules.tenancy.models.store_platform import (
StorePlatform as SP,
)
has_membership = (
db.query(SP)
.filter(
SP.store_id == store.id,
SP.platform_id == platform.id,
SP.is_active.is_(True),
)
.first()
)
if not has_membership:
logger.warning(
f"[FAIL-CLOSED] Store '{subdomain}' exists but has no "
f"active membership on platform {platform.code}"
f"blocking cross-tenant resolution"
)
return None
method = context.get("detection_method", "unknown")
logger.info(
f"[OK] Store found via {method}: {subdomain}{store.name}"

View File

@@ -83,6 +83,11 @@ def _is_static_request(path: str) -> bool:
return "favicon.ico" in lower
def _looks_like_storefront(path: str) -> bool:
"""Return True if path belongs to the storefront surface."""
return path.startswith(("/storefront/", "/api/v1/storefront/"))
class StorefrontAccessMiddleware(BaseHTTPMiddleware):
"""
Gate storefront requests behind an active subscription.
@@ -94,6 +99,25 @@ class StorefrontAccessMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
frontend_type = getattr(request.state, "frontend_type", None)
# Safety net: if frontend_type is None (upstream middleware failed) but
# the path looks like a storefront path, block instead of letting it
# through — a None frontend_type must never bypass the gate.
if frontend_type is None and _looks_like_storefront(request.url.path):
logger.error(
"[STOREFRONT_ACCESS] frontend_type is None on storefront path "
f"'{request.url.path}' — blocking (fail-closed). "
"Check middleware chain ordering."
)
if request.url.path.startswith("/api/"):
return JSONResponse(
status_code=403,
content={
"error": "storefront_not_available",
"reason": "middleware_misconfigured",
},
)
return self._render_unavailable(request, "not_found")
# Only gate storefront requests
if frontend_type != FrontendType.STOREFRONT:
return await call_next(request)

View File

@@ -147,6 +147,8 @@ nav:
- Developer Documentation: development/error-rendering/error-rendering-developer-documentation.md
- Flow Diagram: development/error-rendering/html-error-rendering-flow-diagram.md
- Environment Detection: development/environment-detection.md
- Diagnostics:
- Domain Health: development/diagnostics/domain-health.md
- Database Seeder:
- Documentation: development/database-seeder/database-seeder-documentation.md
- Makefile Guide: development/database-seeder/makefile-database-seeder.md

View File

@@ -955,6 +955,7 @@ def create_demo_stores(
platform_rows = db.execute(select(Platform.id, Platform.code)).all()
platform_code_map = {code: pid for pid, code in platform_rows}
store_platforms = []
for i, (platform_id,) in enumerate(merchant_subs):
# Per-platform subdomain override for multi-platform stores
# Config uses platform codes; resolve to IDs
@@ -963,13 +964,14 @@ def create_demo_stores(
if platform_code_map.get(pcode) == platform_id:
custom_sub = subdomain_val
break
sp = StorePlatform(
store_platforms.append(StorePlatform(
store_id=store.id,
platform_id=platform_id,
is_active=True,
custom_subdomain=custom_sub,
)
db.add(sp)
))
if store_platforms:
db.add_all(store_platforms)
if merchant_subs:
db.flush()

View File

@@ -0,0 +1,215 @@
# tests/unit/middleware/test_fail_closed_store_context.py
"""
Unit tests for fail-closed store context resolution.
Verifies that subdomain detection with a platform context:
1. Tries StorePlatform.custom_subdomain first (platform-specific override)
2. Falls back to Store.subdomain but ONLY if the store has an active
membership on the detected platform (prevents cross-tenant leaks)
3. Returns None if the store has no membership on the platform
"""
from unittest.mock import MagicMock, Mock
import pytest
from sqlalchemy.orm import Session
from middleware.store_context import StoreContextManager
@pytest.mark.unit
@pytest.mark.middleware
class TestFailClosedStoreContext:
"""Ensure subdomain+platform blocks cross-tenant resolution."""
def test_subdomain_not_found_at_all_returns_none(self):
"""
Platform + subdomain + no custom_subdomain match + no Store.subdomain match → None.
"""
mock_db = Mock(spec=Session)
mock_platform = Mock()
mock_platform.id = 2
mock_platform.code = "loyalty"
# All queries return None
mock_db.query.return_value.filter.return_value.first.return_value = None
mock_db.query.return_value.filter.return_value.filter.return_value.first.return_value = None
context = {
"detection_method": "subdomain",
"subdomain": "nonexistent",
"_platform": mock_platform,
}
store = StoreContextManager.get_store_from_context(mock_db, context)
assert store is None
def test_subdomain_found_with_platform_membership_returns_store(self):
"""
Platform + subdomain + Store.subdomain found + store HAS membership
on this platform → return store.
e.g. wizatech.omsflow.lu where WIZATECH is on OMS platform.
"""
mock_db = Mock(spec=Session)
mock_platform = Mock()
mock_platform.id = 1
mock_platform.code = "oms"
mock_store = Mock()
mock_store.id = 10
mock_store.is_active = True
mock_store.name = "WizaTech"
mock_membership = Mock() # Active StorePlatform entry
call_count = [0]
def side_effect_query(model):
result = MagicMock()
call_count[0] += 1
if call_count[0] == 1:
# StorePlatform.custom_subdomain lookup → no match
result.filter.return_value.first.return_value = None
elif call_count[0] == 2:
# Store.subdomain lookup → found
result.filter.return_value.filter.return_value.first.return_value = mock_store
else:
# StorePlatform membership check → exists
result.filter.return_value.first.return_value = mock_membership
return result
mock_db.query.side_effect = side_effect_query
context = {
"detection_method": "subdomain",
"subdomain": "wizatech",
"_platform": mock_platform,
}
store = StoreContextManager.get_store_from_context(mock_db, context)
assert store is mock_store
def test_subdomain_found_without_platform_membership_returns_none(self):
"""
Platform + subdomain + Store.subdomain found but store has NO
membership on this platform → None (cross-tenant blocked).
e.g. loyalty-only-store.omsflow.lu where the store is only on loyalty.
"""
mock_db = Mock(spec=Session)
mock_platform = Mock()
mock_platform.id = 1
mock_platform.code = "oms"
mock_store = Mock()
mock_store.id = 10
mock_store.is_active = True
mock_store.name = "LoyaltyOnlyStore"
call_count = [0]
def side_effect_query(model):
result = MagicMock()
call_count[0] += 1
if call_count[0] == 1:
# StorePlatform.custom_subdomain lookup → no match
result.filter.return_value.first.return_value = None
elif call_count[0] == 2:
# Store.subdomain lookup → found
result.filter.return_value.filter.return_value.first.return_value = mock_store
else:
# StorePlatform membership check → NOT found (no membership)
result.filter.return_value.first.return_value = None
return result
mock_db.query.side_effect = side_effect_query
context = {
"detection_method": "subdomain",
"subdomain": "loyalty-only",
"_platform": mock_platform,
}
store = StoreContextManager.get_store_from_context(mock_db, context)
assert store is None
def test_custom_subdomain_found_returns_store(self):
"""Platform + subdomain + custom_subdomain found → correct store."""
mock_db = Mock(spec=Session)
mock_platform = Mock()
mock_platform.id = 2
mock_platform.code = "loyalty"
mock_store_platform = Mock()
mock_store_platform.store_id = 42
mock_store = Mock()
mock_store.id = 42
mock_store.is_active = True
mock_store.name = "Acme Rewards"
call_count = [0]
def side_effect_query(model):
result = MagicMock()
call_count[0] += 1
if call_count[0] == 1:
result.filter.return_value.first.return_value = mock_store_platform
else:
result.filter.return_value.first.return_value = mock_store
return result
mock_db.query.side_effect = side_effect_query
context = {
"detection_method": "subdomain",
"subdomain": "acme-rewards",
"_platform": mock_platform,
}
store = StoreContextManager.get_store_from_context(mock_db, context)
assert store is mock_store
def test_no_platform_subdomain_uses_global_store(self):
"""No platform + subdomain → uses Store.subdomain (dev mode)."""
mock_db = Mock(spec=Session)
mock_store = Mock()
mock_store.is_active = True
mock_store.name = "Acme"
mock_db.query.return_value.filter.return_value.filter.return_value.first.return_value = mock_store
context = {
"detection_method": "subdomain",
"subdomain": "acme",
# No "_platform" key — dev mode
}
store = StoreContextManager.get_store_from_context(mock_db, context)
assert store is mock_store
def test_path_based_uses_global_store(self):
"""Path-based detection always uses Store.subdomain (unchanged)."""
mock_db = Mock(spec=Session)
mock_platform = Mock()
mock_platform.id = 1
mock_store = Mock()
mock_store.is_active = True
mock_store.name = "Acme"
mock_db.query.return_value.filter.return_value.filter.return_value.first.return_value = mock_store
context = {
"detection_method": "path",
"subdomain": "ACME",
"_platform": mock_platform,
}
store = StoreContextManager.get_store_from_context(mock_db, context)
assert store is mock_store

View File

@@ -188,13 +188,13 @@ class TestGetFrontendTypeHelper:
assert result == FrontendType.ADMIN
def test_get_frontend_type_default(self):
"""Test getting frontend type returns PLATFORM as default."""
"""Test getting frontend type returns None when not set (fail-aware)."""
request = Mock(spec=Request)
request.state = Mock(spec=[]) # No frontend_type attribute
result = get_frontend_type(request)
assert result == FrontendType.PLATFORM
assert result is None
def test_get_frontend_type_for_all_types(self):
"""Test getting all frontend types."""

View File

@@ -0,0 +1,63 @@
# tests/unit/middleware/test_ipv6_host_parsing.py
"""
Unit tests for _strip_port() IPv6-safe host parsing utility.
Ensures the middleware correctly strips ports from:
- IPv4 hosts (localhost:8000)
- IPv6 hosts ([::1]:8000)
- Bare hostnames (example.com)
- Edge cases (empty, malformed brackets)
"""
import pytest
from middleware.platform_context import _strip_port
@pytest.mark.unit
@pytest.mark.middleware
class TestStripPort:
"""Test _strip_port() handles all host formats correctly."""
def test_ipv4_with_port(self):
assert _strip_port("127.0.0.1:8000") == "127.0.0.1"
def test_ipv4_without_port(self):
assert _strip_port("127.0.0.1") == "127.0.0.1"
def test_localhost_with_port(self):
assert _strip_port("localhost:9999") == "localhost"
def test_localhost_without_port(self):
assert _strip_port("localhost") == "localhost"
def test_domain_with_port(self):
assert _strip_port("example.com:443") == "example.com"
def test_domain_without_port(self):
assert _strip_port("example.com") == "example.com"
def test_ipv6_with_brackets_and_port(self):
assert _strip_port("[::1]:8000") == "::1"
def test_ipv6_with_brackets_no_port(self):
assert _strip_port("[::1]") == "::1"
def test_ipv6_full_address_with_port(self):
assert _strip_port("[2001:db8::1]:443") == "2001:db8::1"
def test_ipv6_full_address_no_port(self):
assert _strip_port("[2001:db8::1]") == "2001:db8::1"
def test_empty_string(self):
assert _strip_port("") == ""
def test_bare_hostname(self):
assert _strip_port("myhost") == "myhost"
def test_subdomain_with_port(self):
assert _strip_port("store.omsflow.lu:8080") == "store.omsflow.lu"
def test_malformed_brackets_no_closing(self):
"""Malformed bracket with no closing ] returns as-is."""
assert _strip_port("[::1") == "[::1"

View File

@@ -63,34 +63,15 @@ class TestCustomSubdomainResolution:
assert store is mock_store
def test_custom_subdomain_not_found_falls_back_to_store_subdomain(self):
"""When custom_subdomain doesn't match, fall back to Store.subdomain."""
def test_custom_subdomain_not_found_returns_none_on_platform(self):
"""When custom_subdomain doesn't match on a platform, return None (fail-closed)."""
mock_db = Mock(spec=Session)
mock_platform = Mock()
mock_platform.id = 2
mock_platform.code = "loyalty"
mock_store = Mock()
mock_store.is_active = True
mock_store.name = "Acme Corp"
# Query sequence:
# 1. StorePlatform query → None (no custom_subdomain match)
# 2. Store query → mock_store (subdomain match)
call_count = [0]
def side_effect_query(model):
result = MagicMock()
call_count[0] += 1
if call_count[0] == 1:
# StorePlatform query → no match
result.filter.return_value.first.return_value = None
else:
# Store.subdomain fallback
result.filter.return_value.filter.return_value.first.return_value = mock_store
return result
mock_db.query.side_effect = side_effect_query
# StorePlatform query → no match
mock_db.query.return_value.filter.return_value.first.return_value = None
context = {
"detection_method": "subdomain",
@@ -100,6 +81,25 @@ class TestCustomSubdomainResolution:
store = StoreContextManager.get_store_from_context(mock_db, context)
assert store is None
def test_no_platform_subdomain_uses_global_store_subdomain(self):
"""When no platform context, subdomain detection uses global Store.subdomain."""
mock_db = Mock(spec=Session)
mock_store = Mock()
mock_store.is_active = True
mock_store.name = "Acme Corp"
mock_db.query.return_value.filter.return_value.filter.return_value.first.return_value = mock_store
context = {
"detection_method": "subdomain",
"subdomain": "acme",
# No "_platform" key — dev mode
}
store = StoreContextManager.get_store_from_context(mock_db, context)
assert store is mock_store
def test_no_platform_skips_custom_subdomain_lookup(self):

View File

@@ -172,10 +172,51 @@ class TestStorefrontAccessMiddlewarePassthrough:
call_next.assert_called_once_with(request)
@pytest.mark.asyncio
async def test_no_frontend_type_passes_through(self):
"""Test request with no frontend_type set passes through."""
async def test_no_frontend_type_non_storefront_passes_through(self):
"""Test request with no frontend_type on non-storefront path passes through."""
middleware = StorefrontAccessMiddleware(app=None)
request = _make_request()
request = _make_request(path="/admin/dashboard")
request.state.frontend_type = None
call_next = AsyncMock(return_value=Mock())
await middleware.dispatch(request, call_next)
call_next.assert_called_once_with(request)
@pytest.mark.asyncio
async def test_no_frontend_type_storefront_path_blocked(self):
"""Test request with no frontend_type on storefront path is blocked (fail-closed)."""
middleware = StorefrontAccessMiddleware(app=None)
request = _make_request(path="/storefront/products")
request.state.frontend_type = None
request.state.store = None
call_next = AsyncMock()
with patch("app.templates_config.templates") as mock_templates:
mock_templates.TemplateResponse.return_value = Mock(status_code=403)
await middleware.dispatch(request, call_next)
call_next.assert_not_called()
@pytest.mark.asyncio
async def test_no_frontend_type_storefront_api_blocked_json(self):
"""Test request with no frontend_type on storefront API returns JSON 403."""
middleware = StorefrontAccessMiddleware(app=None)
request = _make_request(path="/api/v1/storefront/cart")
request.state.frontend_type = None
call_next = AsyncMock()
response = await middleware.dispatch(request, call_next)
call_next.assert_not_called()
assert isinstance(response, JSONResponse)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_no_frontend_type_static_passes_through(self):
"""Test request with no frontend_type on static path passes through."""
middleware = StorefrontAccessMiddleware(app=None)
request = _make_request(path="/static/css/style.css")
request.state.frontend_type = None
call_next = AsyncMock(return_value=Mock())

View File

@@ -0,0 +1,126 @@
# tests/unit/middleware/test_storefront_gate_bypass.py
"""
Unit tests for the storefront gate bypass safety net.
Ensures that when frontend_type is None (upstream middleware failed),
storefront paths are BLOCKED rather than passed through. Non-storefront
paths with None frontend_type should pass through unchanged.
"""
from unittest.mock import AsyncMock, Mock, patch
import pytest
from fastapi import Request
from starlette.responses import JSONResponse
from middleware.storefront_access import (
StorefrontAccessMiddleware,
_looks_like_storefront,
)
@pytest.mark.unit
@pytest.mark.middleware
class TestLooksLikeStorefront:
"""Test the _looks_like_storefront helper."""
def test_storefront_page_path(self):
assert _looks_like_storefront("/storefront/products") is True
def test_storefront_api_path(self):
assert _looks_like_storefront("/api/v1/storefront/cart") is True
def test_admin_path(self):
assert _looks_like_storefront("/admin/dashboard") is False
def test_static_path(self):
assert _looks_like_storefront("/static/css/style.css") is False
def test_store_path(self):
assert _looks_like_storefront("/store/ACME/login") is False
def test_root_path(self):
assert _looks_like_storefront("/") is False
@pytest.mark.unit
@pytest.mark.middleware
class TestStorefrontGateBypass:
"""Test that None frontend_type can't bypass the storefront gate."""
@pytest.mark.asyncio
async def test_none_frontend_type_storefront_page_blocked(self):
"""frontend_type=None + /storefront/ path → blocked with HTML 403."""
middleware = StorefrontAccessMiddleware(app=None)
request = Mock(spec=Request)
request.url = Mock(path="/storefront/products")
request.state = Mock()
request.state.frontend_type = None
request.state.store = None
request.state.language = "en"
request.state.theme = None
call_next = AsyncMock()
with patch("app.templates_config.templates") as mock_templates:
mock_templates.TemplateResponse.return_value = Mock(status_code=403)
await middleware.dispatch(request, call_next)
call_next.assert_not_called()
@pytest.mark.asyncio
async def test_none_frontend_type_storefront_api_blocked(self):
"""frontend_type=None + /api/v1/storefront/ path → JSON 403."""
middleware = StorefrontAccessMiddleware(app=None)
request = Mock(spec=Request)
request.url = Mock(path="/api/v1/storefront/cart")
request.state = Mock()
request.state.frontend_type = None
call_next = AsyncMock()
response = await middleware.dispatch(request, call_next)
call_next.assert_not_called()
assert isinstance(response, JSONResponse)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_none_frontend_type_admin_passes_through(self):
"""frontend_type=None + /admin/ path → passes through (not storefront)."""
middleware = StorefrontAccessMiddleware(app=None)
request = Mock(spec=Request)
request.url = Mock(path="/admin/dashboard")
request.state = Mock()
request.state.frontend_type = None
call_next = AsyncMock(return_value=Mock())
await middleware.dispatch(request, call_next)
call_next.assert_called_once_with(request)
@pytest.mark.asyncio
async def test_none_frontend_type_static_passes_through(self):
"""frontend_type=None + /static/ path → passes through."""
middleware = StorefrontAccessMiddleware(app=None)
request = Mock(spec=Request)
request.url = Mock(path="/static/css/style.css")
request.state = Mock()
request.state.frontend_type = None
call_next = AsyncMock(return_value=Mock())
await middleware.dispatch(request, call_next)
call_next.assert_called_once_with(request)
@pytest.mark.asyncio
async def test_none_frontend_type_root_passes_through(self):
"""frontend_type=None + / path → passes through."""
middleware = StorefrontAccessMiddleware(app=None)
request = Mock(spec=Request)
request.url = Mock(path="/")
request.state = Mock()
request.state.frontend_type = None
call_next = AsyncMock(return_value=Mock())
await middleware.dispatch(request, call_next)
call_next.assert_called_once_with(request)