diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
new file mode 100644
index 00000000..87151fc8
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -0,0 +1,19 @@
+## Summary
+
+
+
+## Changes
+
+-
+
+## Test plan
+
+- [ ] Unit tests pass (`python -m pytest tests/unit/`)
+- [ ] Integration tests pass (`python -m pytest tests/integration/`)
+- [ ] Architecture validation passes (`python scripts/validate/validate_all.py`)
+
+## Checklist
+
+- [ ] Code follows project conventions
+- [ ] No new warnings introduced
+- [ ] Database migrations included (if applicable)
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
new file mode 100644
index 00000000..9f837b6d
--- /dev/null
+++ b/.github/dependabot.yml
@@ -0,0 +1,9 @@
+version: 2
+updates:
+ - package-ecosystem: "pip"
+ directory: "/"
+ schedule:
+ interval: "weekly"
+ open-pull-requests-limit: 10
+ labels:
+ - "dependencies"
diff --git a/app/exceptions/error_renderer.py b/app/exceptions/error_renderer.py
index fac59c16..790cf2c1 100644
--- a/app/exceptions/error_renderer.py
+++ b/app/exceptions/error_renderer.py
@@ -85,8 +85,9 @@ class ErrorPageRenderer:
Returns:
HTMLResponse with rendered error page
"""
- # Get frontend type
- frontend_type = get_frontend_type(request)
+ # Get frontend type — default to PLATFORM in error rendering context
+ # (errors can occur before FrontendTypeMiddleware runs)
+ frontend_type = get_frontend_type(request) or FrontendType.PLATFORM
# Prepare template data
template_data = ErrorPageRenderer._prepare_template_data(
@@ -291,7 +292,7 @@ class ErrorPageRenderer:
# TODO: Implement actual admin check based on JWT/session
# For now, check if we're in admin frontend
frontend_type = get_frontend_type(request)
- return frontend_type == FrontendType.ADMIN
+ return frontend_type is not None and frontend_type == FrontendType.ADMIN
@staticmethod
def _render_basic_html_fallback(
diff --git a/app/exceptions/handler.py b/app/exceptions/handler.py
index 3ef58ac0..5670fbb8 100644
--- a/app/exceptions/handler.py
+++ b/app/exceptions/handler.py
@@ -388,7 +388,7 @@ def _redirect_to_login(request: Request) -> RedirectResponse:
Uses FrontendType detection to determine admin vs store vs storefront login.
Properly handles multi-access routing (domain, subdomain, path-based).
"""
- frontend_type = get_frontend_type(request)
+ frontend_type = get_frontend_type(request) or FrontendType.PLATFORM
if frontend_type == FrontendType.ADMIN:
logger.debug("Redirecting to /admin/login")
diff --git a/app/modules/billing/tests/integration/test_admin_features_routes.py b/app/modules/billing/tests/integration/test_admin_features_routes.py
index ab75b0ec..28dcdd37 100644
--- a/app/modules/billing/tests/integration/test_admin_features_routes.py
+++ b/app/modules/billing/tests/integration/test_admin_features_routes.py
@@ -114,8 +114,7 @@ def ft_tier_with_features(db, ft_tier):
TierFeatureLimit(tier_id=ft_tier.id, feature_code="basic_shop", limit_value=None),
TierFeatureLimit(tier_id=ft_tier.id, feature_code="team_members", limit_value=5),
]
- for f in features:
- db.add(f)
+ db.add_all(features)
db.commit()
# Refresh so the tier's selectin-loaded feature_limits relationship is up to date
db.refresh(ft_tier)
diff --git a/app/modules/billing/tests/unit/test_billing_metrics.py b/app/modules/billing/tests/unit/test_billing_metrics.py
index c6ff9263..655cd4b9 100644
--- a/app/modules/billing/tests/unit/test_billing_metrics.py
+++ b/app/modules/billing/tests/unit/test_billing_metrics.py
@@ -83,13 +83,12 @@ def billing_extra_platforms(db):
"""Create additional platforms for multiple subscriptions (unique constraint: merchant+platform)."""
platforms = []
for i in range(2):
- p = Platform(
+ platforms.append(Platform(
code=f"bm_extra_{uuid.uuid4().hex[:8]}",
name=f"Extra Platform {i}",
is_active=True,
- )
- db.add(p)
- platforms.append(p)
+ ))
+ db.add_all(platforms)
db.commit()
for p in platforms:
db.refresh(p)
diff --git a/app/modules/billing/tests/unit/test_feature_service.py b/app/modules/billing/tests/unit/test_feature_service.py
index cdb466de..160308d6 100644
--- a/app/modules/billing/tests/unit/test_feature_service.py
+++ b/app/modules/billing/tests/unit/test_feature_service.py
@@ -90,8 +90,7 @@ def fs_tier_with_features(db, fs_tier):
TierFeatureLimit(tier_id=fs_tier.id, feature_code="feature_b", limit_value=100),
TierFeatureLimit(tier_id=fs_tier.id, feature_code="feature_c", limit_value=50),
]
- for f in features:
- db.add(f)
+ db.add_all(features)
db.commit()
return features
diff --git a/app/modules/core/tests/integration/test_merchant_dashboard_routes.py b/app/modules/core/tests/integration/test_merchant_dashboard_routes.py
index e1adf833..8a01190f 100644
--- a/app/modules/core/tests/integration/test_merchant_dashboard_routes.py
+++ b/app/modules/core/tests/integration/test_merchant_dashboard_routes.py
@@ -112,15 +112,14 @@ def dash_team_members(db, dash_stores, dash_owner):
auth = AuthManager()
users = []
for _ in range(2):
- u = User(
+ users.append(User(
email=f"dteam_{uuid.uuid4().hex[:8]}@test.com",
username=f"dteam_{uuid.uuid4().hex[:8]}",
hashed_password=auth.hash_password("pass123"),
role="store_user",
is_active=True,
- )
- db.add(u)
- users.append(u)
+ ))
+ db.add_all(users)
db.flush()
db.add(StoreUser(store_id=dash_stores[0].id, user_id=users[0].id, is_active=True))
@@ -132,19 +131,19 @@ def dash_team_members(db, dash_stores, dash_owner):
@pytest.fixture
def dash_customers(db, dash_stores):
"""Create customers in the merchant's stores."""
+ customers = []
for i in range(4):
uid = uuid.uuid4().hex[:8]
- db.add(
- Customer(
- store_id=dash_stores[0].id,
- email=f"dc_{uid}@test.com",
- hashed_password="hashed", # noqa: SEC001
- first_name=f"F{i}",
- last_name=f"L{i}",
- customer_number=f"DC{uid}",
- is_active=True,
- )
- )
+ customers.append(Customer(
+ store_id=dash_stores[0].id,
+ email=f"dc_{uid}@test.com",
+ hashed_password="hashed", # noqa: SEC001
+ first_name=f"F{i}",
+ last_name=f"L{i}",
+ customer_number=f"DC{uid}",
+ is_active=True,
+ ))
+ db.add_all(customers)
db.commit()
diff --git a/app/modules/customers/tests/unit/test_customer_metrics.py b/app/modules/customers/tests/unit/test_customer_metrics.py
index f75da6b7..54cfacff 100644
--- a/app/modules/customers/tests/unit/test_customer_metrics.py
+++ b/app/modules/customers/tests/unit/test_customer_metrics.py
@@ -45,16 +45,15 @@ def cust_stores(db, cust_merchant):
stores = []
for i in range(2):
uid = uuid.uuid4().hex[:8].upper()
- store = Store(
+ stores.append(Store(
merchant_id=cust_merchant.id,
store_code=f"CSTORE_{uid}",
subdomain=f"cstore{uid.lower()}",
name=f"Cust Store {i}",
is_active=True,
is_verified=True,
- )
- db.add(store)
- stores.append(store)
+ ))
+ db.add_all(stores)
db.commit()
for s in stores:
db.refresh(s)
@@ -68,7 +67,7 @@ def cust_customers(db, cust_stores):
# 3 customers in store 0
for i in range(3):
uid = uuid.uuid4().hex[:8]
- c = Customer(
+ customers.append(Customer(
store_id=cust_stores[0].id,
email=f"cust_{uid}@test.com",
hashed_password="hashed", # noqa: SEC001
@@ -76,13 +75,11 @@ def cust_customers(db, cust_stores):
last_name=f"Last{i}",
customer_number=f"C{uid}",
is_active=True,
- )
- db.add(c)
- customers.append(c)
+ ))
# 2 customers in store 1
for i in range(2):
uid = uuid.uuid4().hex[:8]
- c = Customer(
+ customers.append(Customer(
store_id=cust_stores[1].id,
email=f"cust_{uid}@test.com",
hashed_password="hashed", # noqa: SEC001
@@ -90,9 +87,8 @@ def cust_customers(db, cust_stores):
last_name=f"Last{i}",
customer_number=f"C{uid}",
is_active=True,
- )
- db.add(c)
- customers.append(c)
+ ))
+ db.add_all(customers)
db.commit()
return customers
diff --git a/app/modules/dev_tools/services/domain_health_service.py b/app/modules/dev_tools/services/domain_health_service.py
new file mode 100644
index 00000000..58dcd53a
--- /dev/null
+++ b/app/modules/dev_tools/services/domain_health_service.py
@@ -0,0 +1,232 @@
+# app/modules/dev_tools/services/domain_health_service.py
+"""
+Domain Health Check Service
+
+Simulates the middleware resolution pipeline for every active access method
+(custom subdomain, default subdomain, custom domain, path-based) to verify
+they resolve to the expected store.
+"""
+
+import logging
+
+from sqlalchemy.orm import Session
+
+logger = logging.getLogger(__name__)
+
+
+def run_domain_health_check(db: Session) -> dict:
+ """
+ Check all active store access methods by simulating middleware resolution:
+ 1. StorePlatform custom subdomains (acme-rewards.rewardflow.lu)
+ 2. Default subdomains per platform (acme.omsflow.lu)
+ 3. StoreDomain custom domains (wizatech.shop)
+ 4. Path-based store routes (/storefront/{subdomain}/, /store/{subdomain}/)
+
+ Returns:
+ dict with keys: total, passed, failed, details (list of entry dicts)
+ """
+ from app.modules.tenancy.models import Platform, Store, StoreDomain
+ from app.modules.tenancy.models.store_platform import StorePlatform
+ from middleware.store_context import StoreContextManager
+
+ details: list[dict] = []
+
+ # Pre-fetch all active stores, platforms, and memberships
+ active_stores = (
+ db.query(Store).filter(Store.is_active.is_(True)).all()
+ )
+ store_by_id: dict[int, Store] = {s.id: s for s in active_stores}
+
+ all_platforms = (
+ db.query(Platform).filter(Platform.is_active.is_(True)).all()
+ )
+ platform_by_id: dict[int, Platform] = {p.id: p for p in all_platforms}
+
+ all_store_platforms = (
+ db.query(StorePlatform)
+ .filter(StorePlatform.is_active.is_(True))
+ .all()
+ )
+
+ # Build store_id → list of (StorePlatform, Platform) for reuse
+ store_memberships: dict[int, list[tuple]] = {}
+ for sp in all_store_platforms:
+ platform = platform_by_id.get(sp.platform_id)
+ if platform:
+ store_memberships.setdefault(sp.store_id, []).append((sp, platform))
+
+ # ── 1. Custom subdomains (StorePlatform.custom_subdomain) ──
+ # e.g. acme-rewards.rewardflow.lu
+ for sp in all_store_platforms:
+ if not sp.custom_subdomain:
+ continue
+ expected_store = store_by_id.get(sp.store_id)
+ platform = platform_by_id.get(sp.platform_id)
+ if not expected_store or not platform:
+ continue
+
+ context = {
+ "detection_method": "subdomain",
+ "subdomain": sp.custom_subdomain,
+ "_platform": platform,
+ }
+ resolved = StoreContextManager.get_store_from_context(db, context)
+
+ passed = resolved is not None and resolved.id == expected_store.id
+ details.append(_entry(
+ domain=f"{sp.custom_subdomain}.{platform.domain or platform.code}",
+ entry_type="custom subdomain",
+ platform_code=platform.code,
+ expected_store=expected_store,
+ resolved_store=resolved,
+ passed=passed,
+ note="" if passed else "custom_subdomain lookup failed",
+ ))
+
+ # ── 2. Default subdomains per platform ──
+ # e.g. acme.omsflow.lu — uses Store.subdomain on the platform domain.
+ # With fail-closed routing, this only works if StorePlatform.custom_subdomain
+ # matches Store.subdomain, so this check surfaces missing entries.
+ for store in active_stores:
+ if not store.subdomain:
+ continue
+ memberships = store_memberships.get(store.id, [])
+ for sp, platform in memberships:
+ if not platform.domain:
+ continue
+ # If custom_subdomain is already set to this value, it's covered
+ # in check #1 above — but we still check default subdomain access
+ # to confirm it resolves (it exercises the same code path).
+ # Skip if custom_subdomain == subdomain to avoid duplicate entries.
+ if sp.custom_subdomain and sp.custom_subdomain.lower() == store.subdomain.lower():
+ continue
+
+ context = {
+ "detection_method": "subdomain",
+ "subdomain": store.subdomain,
+ "_platform": platform,
+ }
+ resolved = StoreContextManager.get_store_from_context(db, context)
+
+ passed = resolved is not None and resolved.id == store.id
+ note = ""
+ if not passed:
+ note = (
+ f"Store '{store.subdomain}' not found or has no active "
+ f"membership on platform {platform.code}"
+ )
+ details.append(_entry(
+ domain=f"{store.subdomain}.{platform.domain}",
+ entry_type="default subdomain",
+ platform_code=platform.code,
+ expected_store=store,
+ resolved_store=resolved,
+ passed=passed,
+ note=note,
+ ))
+
+ # ── 3. Custom domains (StoreDomain) ──
+ # e.g. wizatech.shop
+ store_domains = (
+ db.query(StoreDomain)
+ .filter(
+ StoreDomain.is_active.is_(True),
+ StoreDomain.is_verified.is_(True),
+ )
+ .all()
+ )
+ for sd in store_domains:
+ expected_store = sd.store
+ if not expected_store:
+ continue
+
+ platform = sd.platform
+ platform_code = platform.code if platform else None
+
+ context = {
+ "detection_method": "custom_domain",
+ "domain": sd.domain,
+ }
+ resolved = StoreContextManager.get_store_from_context(db, context)
+
+ passed = resolved is not None and resolved.id == expected_store.id
+ details.append(_entry(
+ domain=sd.domain,
+ entry_type="custom domain",
+ platform_code=platform_code,
+ expected_store=expected_store,
+ resolved_store=resolved,
+ passed=passed,
+ note="" if passed else "custom domain resolution failed",
+ ))
+
+ # ── 4. Path-based routes ──
+ # e.g. /storefront/luxweb/, /store/luxweb/
+ # Path-based URLs use Store.subdomain as the path segment.
+ for store in active_stores:
+ if not store.subdomain:
+ continue
+
+ memberships = store_memberships.get(store.id, [])
+ platform_codes = sorted(p.code for _, p in memberships) if memberships else []
+ platform_label = ", ".join(platform_codes) if platform_codes else None
+
+ for prefix, label in [
+ ("/storefront/", "storefront"),
+ ("/store/", "store"),
+ ]:
+ context = {
+ "detection_method": "path",
+ "subdomain": store.subdomain,
+ "path_prefix": f"{prefix}{store.subdomain}",
+ "full_prefix": prefix,
+ }
+ resolved = StoreContextManager.get_store_from_context(db, context)
+
+ passed = resolved is not None and resolved.id == store.id
+ details.append(_entry(
+ domain=f"{prefix}{store.subdomain}/",
+ entry_type=f"path ({label})",
+ platform_code=platform_label,
+ expected_store=store,
+ resolved_store=resolved,
+ passed=passed,
+ note="" if passed else f"path-based {label} resolution failed",
+ ))
+
+ total = len(details)
+ passed_count = sum(1 for d in details if d["status"] == "pass")
+
+ return {
+ "total": total,
+ "passed": passed_count,
+ "failed": total - passed_count,
+ "details": details,
+ }
+
+
+def _entry(
+ domain: str,
+ entry_type: str,
+ platform_code: str | None,
+ expected_store,
+ resolved_store,
+ passed: bool,
+ note: str,
+) -> dict:
+ """Build a single health-check result entry."""
+ return {
+ "domain": domain,
+ "type": entry_type,
+ "platform_code": platform_code,
+ "expected_store": (
+ getattr(expected_store, "store_code", expected_store.subdomain)
+ if expected_store else None
+ ),
+ "resolved_store": (
+ getattr(resolved_store, "store_code", resolved_store.subdomain)
+ if resolved_store else None
+ ),
+ "status": "pass" if passed else "fail",
+ "note": note,
+ }
diff --git a/app/modules/loyalty/tests/unit/test_card_service.py b/app/modules/loyalty/tests/unit/test_card_service.py
index 05d38299..f8fcac87 100644
--- a/app/modules/loyalty/tests/unit/test_card_service.py
+++ b/app/modules/loyalty/tests/unit/test_card_service.py
@@ -271,8 +271,8 @@ class TestGetStoreTransactions:
from datetime import UTC, datetime
# Create 3 transactions
- for i in range(3):
- t = LoyaltyTransaction(
+ txns = [
+ LoyaltyTransaction(
merchant_id=test_loyalty_card.merchant_id,
card_id=test_loyalty_card.id,
store_id=test_store.id,
@@ -280,7 +280,9 @@ class TestGetStoreTransactions:
points_delta=10 * (i + 1),
transaction_at=datetime.now(UTC),
)
- db.add(t)
+ for i in range(3)
+ ]
+ db.add_all(txns)
db.commit()
transactions, total = self.service.get_store_transactions(
diff --git a/app/modules/loyalty/tests/unit/test_program_service.py b/app/modules/loyalty/tests/unit/test_program_service.py
index 268dc8ef..8ffcfcee 100644
--- a/app/modules/loyalty/tests/unit/test_program_service.py
+++ b/app/modules/loyalty/tests/unit/test_program_service.py
@@ -414,9 +414,10 @@ class TestGetProgramStats:
db.flush()
# Create cards with customers
+ customers = []
for i in range(3):
uid = uuid.uuid4().hex[:8]
- customer = Customer(
+ customers.append(Customer(
email=f"stat_{uid}@test.com",
first_name="Stat",
last_name=f"Customer{i}",
@@ -424,11 +425,13 @@ class TestGetProgramStats:
customer_number=f"SC-{uid.upper()}",
store_id=store.id,
is_active=True,
- )
- db.add(customer)
- db.flush()
+ ))
+ db.add_all(customers)
+ db.flush()
- card = LoyaltyCard(
+ cards = []
+ for i, customer in enumerate(customers):
+ cards.append(LoyaltyCard(
merchant_id=ps_merchant.id,
program_id=ps_program.id,
customer_id=customer.id,
@@ -437,8 +440,8 @@ class TestGetProgramStats:
total_points_earned=100 * (i + 1),
is_active=True,
last_activity_at=datetime.now(UTC),
- )
- db.add(card)
+ ))
+ db.add_all(cards)
db.commit()
stats = self.service.get_program_stats(db, ps_program.id)
@@ -560,9 +563,10 @@ class TestGetMerchantStats:
db.add(store)
db.flush()
+ customers = []
for i in range(2):
uid = uuid.uuid4().hex[:8]
- customer = Customer(
+ customers.append(Customer(
email=f"mstat_{uid}@test.com",
first_name="MS",
last_name=f"Customer{i}",
@@ -570,11 +574,13 @@ class TestGetMerchantStats:
customer_number=f"MS-{uid.upper()}",
store_id=store.id,
is_active=True,
- )
- db.add(customer)
- db.flush()
+ ))
+ db.add_all(customers)
+ db.flush()
- card = LoyaltyCard(
+ cards = []
+ for i, customer in enumerate(customers):
+ cards.append(LoyaltyCard(
merchant_id=ps_merchant.id,
program_id=ps_program.id,
customer_id=customer.id,
@@ -584,8 +590,8 @@ class TestGetMerchantStats:
total_points_earned=200,
is_active=True,
last_activity_at=datetime.now(UTC),
- )
- db.add(card)
+ ))
+ db.add_all(cards)
db.commit()
stats = self.service.get_merchant_stats(db, ps_merchant.id)
diff --git a/app/modules/messaging/tests/integration/test_email_logs_api.py b/app/modules/messaging/tests/integration/test_email_logs_api.py
index 3e25aaea..037fc0f7 100644
--- a/app/modules/messaging/tests/integration/test_email_logs_api.py
+++ b/app/modules/messaging/tests/integration/test_email_logs_api.py
@@ -33,7 +33,7 @@ def audit_logs(db):
# Sent logs
for i in range(3):
uid = uuid.uuid4().hex[:8]
- log = EmailLog(
+ logs.append(EmailLog(
template_code="signup_welcome",
recipient_email=f"sent_{uid}@example.com",
recipient_name=f"Sent User {i}",
@@ -45,14 +45,12 @@ def audit_logs(db):
status=EmailStatus.SENT.value,
provider="debug",
sent_at=datetime.utcnow(),
- )
- db.add(log)
- logs.append(log)
+ ))
# Failed logs
for i in range(2):
uid = uuid.uuid4().hex[:8]
- log = EmailLog(
+ logs.append(EmailLog(
template_code="order_confirmation",
recipient_email=f"failed_{uid}@example.com",
recipient_name=f"Failed User {i}",
@@ -62,22 +60,19 @@ def audit_logs(db):
status=EmailStatus.FAILED.value,
provider="smtp",
error_message="Connection refused",
- )
- db.add(log)
- logs.append(log)
+ ))
# Pending log
uid = uuid.uuid4().hex[:8]
- log = EmailLog(
+ logs.append(EmailLog(
template_code="team_invitation",
recipient_email=f"pending_{uid}@example.com",
subject=f"Invitation {uid}",
from_email="noreply@orion.lu",
status=EmailStatus.PENDING.value,
- )
- db.add(log)
- logs.append(log)
+ ))
+ db.add_all(logs)
db.commit()
for log in logs:
db.refresh(log)
diff --git a/app/modules/messaging/tests/unit/test_email_log_service.py b/app/modules/messaging/tests/unit/test_email_log_service.py
index e070a317..ef020b6b 100644
--- a/app/modules/messaging/tests/unit/test_email_log_service.py
+++ b/app/modules/messaging/tests/unit/test_email_log_service.py
@@ -55,7 +55,7 @@ def email_logs(db, email_template):
for status, count in statuses:
for i in range(count):
uid = uuid.uuid4().hex[:8]
- log = EmailLog(
+ logs.append(EmailLog(
template_code=email_template.code,
recipient_email=f"user_{uid}@example.com",
recipient_name=f"User {uid}",
@@ -67,10 +67,9 @@ def email_logs(db, email_template):
status=status.value,
provider="debug",
sent_at=datetime.utcnow() if status != EmailStatus.PENDING else None,
- )
- db.add(log)
- logs.append(log)
+ ))
+ db.add_all(logs)
db.commit()
for log in logs:
db.refresh(log)
@@ -86,7 +85,7 @@ def multi_template_logs(db):
for tpl_code in templates:
for i in range(2):
uid = uuid.uuid4().hex[:8]
- log = EmailLog(
+ logs.append(EmailLog(
template_code=tpl_code,
recipient_email=f"{tpl_code}_{uid}@example.com",
subject=f"{tpl_code} email",
@@ -94,12 +93,10 @@ def multi_template_logs(db):
status=EmailStatus.SENT.value,
provider="debug",
sent_at=datetime.utcnow(),
- )
- db.add(log)
- logs.append(log)
+ ))
# Add one failed log
- log = EmailLog(
+ logs.append(EmailLog(
template_code="password_reset",
recipient_email="failed@example.com",
subject="Failed email",
@@ -107,10 +104,9 @@ def multi_template_logs(db):
status=EmailStatus.FAILED.value,
provider="debug",
error_message="SMTP connection refused",
- )
- db.add(log)
- logs.append(log)
+ ))
+ db.add_all(logs)
db.commit()
for log in logs:
db.refresh(log)
diff --git a/app/modules/prospecting/services/campaign_service.py b/app/modules/prospecting/services/campaign_service.py
index eefa5e32..ffa62512 100644
--- a/app/modules/prospecting/services/campaign_service.py
+++ b/app/modules/prospecting/services/campaign_service.py
@@ -138,9 +138,9 @@ class CampaignService:
sent_at=datetime.now(UTC),
sent_by_user_id=sent_by_user_id,
)
- db.add(send)
sends.append(send)
+ db.add_all(sends)
db.flush()
logger.info("Sent campaign %d to %d prospects", template_id, len(prospect_ids))
return sends
diff --git a/app/modules/prospecting/services/enrichment_service.py b/app/modules/prospecting/services/enrichment_service.py
index 820879e2..b7f4a42a 100644
--- a/app/modules/prospecting/services/enrichment_service.py
+++ b/app/modules/prospecting/services/enrichment_service.py
@@ -324,8 +324,7 @@ class EnrichmentService:
ProspectContact.source_element == "regex",
).delete()
- for contact in contacts:
- db.add(contact)
+ db.add_all(contacts)
# Mark first email and phone as primary
if contacts:
diff --git a/app/modules/prospecting/services/prospect_service.py b/app/modules/prospecting/services/prospect_service.py
index 8fb6bacf..cf028893 100644
--- a/app/modules/prospecting/services/prospect_service.py
+++ b/app/modules/prospecting/services/prospect_service.py
@@ -127,15 +127,17 @@ class ProspectService:
# Create inline contacts if provided
contacts = data.get("contacts", [])
+ contact_objects = []
for c in contacts:
- contact = ProspectContact(
+ contact_objects.append(ProspectContact(
prospect_id=prospect.id,
contact_type=c["contact_type"],
value=c["value"],
label=c.get("label"),
is_primary=c.get("is_primary", False),
- )
- db.add(contact)
+ ))
+ if contact_objects:
+ db.add_all(contact_objects)
db.flush()
logger.info("Created prospect: %s (channel=%s)", prospect.display_name, channel)
@@ -144,6 +146,7 @@ class ProspectService:
def create_bulk(self, db: Session, domain_names: list[str], source: str = "csv_import") -> tuple[int, int]:
created = 0
skipped = 0
+ new_prospects = []
for name in domain_names:
name = name.strip().lower()
if not name:
@@ -152,14 +155,15 @@ class ProspectService:
if existing:
skipped += 1
continue
- prospect = Prospect(
+ new_prospects.append(Prospect(
channel=ProspectChannel.DIGITAL,
domain_name=name,
source=source,
- )
- db.add(prospect)
+ ))
created += 1
+ if new_prospects:
+ db.add_all(new_prospects)
db.flush()
logger.info("Bulk import: %d created, %d skipped", created, skipped)
return created, skipped
diff --git a/app/modules/tenancy/routes/api/admin.py b/app/modules/tenancy/routes/api/admin.py
index 5097d67f..d4fc1244 100644
--- a/app/modules/tenancy/routes/api/admin.py
+++ b/app/modules/tenancy/routes/api/admin.py
@@ -27,6 +27,7 @@ from .admin_platform_users import admin_platform_users_router
from .admin_platforms import admin_platforms_router
from .admin_store_domains import admin_store_domains_router
from .admin_store_roles import admin_store_roles_router
+from .admin_store_subdomains import admin_store_subdomains_router
from .admin_stores import admin_stores_router
from .admin_users import admin_users_router
from .user_account import admin_account_router
@@ -42,6 +43,7 @@ router.include_router(admin_merchants_router, tags=["admin-merchants"])
router.include_router(admin_platforms_router, tags=["admin-platforms"])
router.include_router(admin_stores_router, tags=["admin-stores"])
router.include_router(admin_store_domains_router, tags=["admin-store-domains"])
+router.include_router(admin_store_subdomains_router, tags=["admin-store-subdomains"])
router.include_router(admin_store_roles_router, tags=["admin-store-roles"])
router.include_router(admin_merchant_domains_router, tags=["admin-merchant-domains"])
router.include_router(admin_modules_router, tags=["admin-modules"])
diff --git a/app/modules/tenancy/routes/api/admin_store_subdomains.py b/app/modules/tenancy/routes/api/admin_store_subdomains.py
new file mode 100644
index 00000000..bdd35118
--- /dev/null
+++ b/app/modules/tenancy/routes/api/admin_store_subdomains.py
@@ -0,0 +1,137 @@
+# app/modules/tenancy/routes/api/admin_store_subdomains.py
+"""
+Admin endpoints for managing store custom subdomains (per-platform).
+
+Each store can have a custom subdomain on each platform it belongs to.
+For example, store "WizaTech" on the loyalty platform could have
+custom_subdomain="wizatech-rewards" → wizatech-rewards.rewardflow.lu
+"""
+
+import logging
+
+from fastapi import APIRouter, Body, Depends, Path
+from pydantic import BaseModel, Field
+from sqlalchemy.orm import Session
+
+from app.api.deps import get_current_admin_api
+from app.core.database import get_db
+from app.modules.tenancy.schemas.auth import UserContext
+from app.modules.tenancy.services.store_subdomain_service import store_subdomain_service
+
+admin_store_subdomains_router = APIRouter(prefix="/stores")
+logger = logging.getLogger(__name__)
+
+
+# ── Schemas ──────────────────────────────────────────────────────────────────
+
+
+class CustomSubdomainEntry(BaseModel):
+ store_platform_id: int
+ platform_id: int
+ platform_code: str
+ platform_name: str
+ platform_domain: str | None
+ custom_subdomain: str | None
+ default_subdomain: str | None
+ full_url: str | None
+ default_url: str | None
+
+
+class CustomSubdomainListResponse(BaseModel):
+ subdomains: list[CustomSubdomainEntry]
+ total: int
+
+
+class SetCustomSubdomainRequest(BaseModel):
+ subdomain: str = Field(
+ ...,
+ min_length=3,
+ max_length=63,
+ description="Custom subdomain (lowercase, alphanumeric + hyphens)",
+ )
+
+
+class CustomSubdomainUpdateResponse(BaseModel):
+ message: str
+ platform_id: int
+ custom_subdomain: str | None
+
+
+# ── Endpoints ────────────────────────────────────────────────────────────────
+
+
+@admin_store_subdomains_router.get(
+ "/{store_id}/custom-subdomains",
+ response_model=CustomSubdomainListResponse,
+)
+def list_custom_subdomains(
+ store_id: int = Path(..., description="Store ID", gt=0),
+ db: Session = Depends(get_db),
+ current_admin: UserContext = Depends(get_current_admin_api),
+):
+ """
+ List all platform memberships with their custom subdomains.
+
+ Returns one entry per active platform the store belongs to,
+ showing the custom_subdomain (if set) and the default subdomain.
+ """
+ entries = store_subdomain_service.get_custom_subdomains(db, store_id)
+ return CustomSubdomainListResponse(
+ subdomains=[CustomSubdomainEntry(**e) for e in entries],
+ total=len(entries),
+ )
+
+
+@admin_store_subdomains_router.put(
+ "/{store_id}/custom-subdomains/{platform_id}",
+ response_model=CustomSubdomainUpdateResponse,
+)
+def set_custom_subdomain(
+ store_id: int = Path(..., description="Store ID", gt=0),
+ platform_id: int = Path(..., description="Platform ID", gt=0),
+ payload: SetCustomSubdomainRequest = Body(...),
+ db: Session = Depends(get_db),
+ current_admin: UserContext = Depends(get_current_admin_api),
+):
+ """
+ Set or update the custom subdomain for a store on a specific platform.
+
+ The subdomain must be unique on the platform (no other store can claim it).
+ Format: lowercase alphanumeric + hyphens, 3-63 characters.
+ """
+ sp = store_subdomain_service.set_custom_subdomain(
+ db, store_id, platform_id, payload.subdomain
+ )
+ db.commit()
+
+ return CustomSubdomainUpdateResponse(
+ message=f"Custom subdomain set to '{sp.custom_subdomain}'",
+ platform_id=platform_id,
+ custom_subdomain=sp.custom_subdomain,
+ )
+
+
+@admin_store_subdomains_router.delete(
+ "/{store_id}/custom-subdomains/{platform_id}",
+ response_model=CustomSubdomainUpdateResponse,
+)
+def clear_custom_subdomain(
+ store_id: int = Path(..., description="Store ID", gt=0),
+ platform_id: int = Path(..., description="Platform ID", gt=0),
+ db: Session = Depends(get_db),
+ current_admin: UserContext = Depends(get_current_admin_api),
+):
+ """
+ Clear the custom subdomain for a store on a specific platform.
+
+ The store will still be accessible via its default subdomain
+ (Store.subdomain + platform domain).
+ """
+ store_subdomain_service.clear_custom_subdomain(db, store_id, platform_id)
+ db.commit()
+
+ return CustomSubdomainUpdateResponse(
+ message="Custom subdomain cleared",
+ platform_id=platform_id,
+ custom_subdomain=None,
+ )
diff --git a/app/modules/tenancy/services/merchant_store_service.py b/app/modules/tenancy/services/merchant_store_service.py
index d0e14b77..93f6bb71 100644
--- a/app/modules/tenancy/services/merchant_store_service.py
+++ b/app/modules/tenancy/services/merchant_store_service.py
@@ -231,15 +231,17 @@ class MerchantStoreService:
# Assign to platforms if provided
platform_ids = store_data.get("platform_ids", [])
+ store_platforms = []
for pid in platform_ids:
platform = db.query(Platform).filter(Platform.id == pid).first()
if platform:
- sp = StorePlatform(
+ store_platforms.append(StorePlatform(
store_id=store.id,
platform_id=pid,
is_active=True,
- )
- db.add(sp)
+ ))
+ if store_platforms:
+ db.add_all(store_platforms)
db.flush()
db.refresh(store)
diff --git a/app/modules/tenancy/services/store_subdomain_service.py b/app/modules/tenancy/services/store_subdomain_service.py
new file mode 100644
index 00000000..7d5d2634
--- /dev/null
+++ b/app/modules/tenancy/services/store_subdomain_service.py
@@ -0,0 +1,170 @@
+# app/modules/tenancy/services/store_subdomain_service.py
+"""
+Service for managing StorePlatform custom subdomains.
+
+Handles validation (format, uniqueness) and CRUD operations on
+the custom_subdomain field of StorePlatform entries.
+"""
+
+import logging
+import re
+
+from sqlalchemy import func
+from sqlalchemy.orm import Session
+
+from app.exceptions.base import (
+ ConflictException,
+ ResourceNotFoundException,
+ ValidationException,
+)
+from app.modules.tenancy.models import Platform, Store
+from app.modules.tenancy.models.store_platform import StorePlatform
+
+logger = logging.getLogger(__name__)
+
+# Subdomain rules: lowercase alphanumeric + hyphens, 3-63 chars, no leading/trailing hyphen
+_SUBDOMAIN_RE = re.compile(r"^[a-z0-9]([a-z0-9-]{1,61}[a-z0-9])?$")
+
+
+class StoreSubdomainService:
+ """Manage custom subdomains on StorePlatform entries."""
+
+ def get_custom_subdomains(self, db: Session, store_id: int) -> list[dict]:
+ """
+ List all platform memberships for a store with their custom subdomains.
+
+ Returns a list of dicts with platform info and custom_subdomain (may be None).
+ """
+ store = db.query(Store).filter(Store.id == store_id).first()
+ if not store:
+ raise ResourceNotFoundException("Store", str(store_id))
+
+ memberships = (
+ db.query(StorePlatform)
+ .filter(
+ StorePlatform.store_id == store_id,
+ StorePlatform.is_active.is_(True),
+ )
+ .all()
+ )
+
+ results = []
+ for sp in memberships:
+ platform = db.query(Platform).filter(Platform.id == sp.platform_id).first()
+ if not platform:
+ continue
+ results.append({
+ "store_platform_id": sp.id,
+ "platform_id": sp.platform_id,
+ "platform_code": platform.code,
+ "platform_name": platform.name,
+ "platform_domain": platform.domain,
+ "custom_subdomain": sp.custom_subdomain,
+ "default_subdomain": store.subdomain,
+ "full_url": (
+ f"{sp.custom_subdomain}.{platform.domain}"
+ if sp.custom_subdomain and platform.domain
+ else None
+ ),
+ "default_url": (
+ f"{store.subdomain}.{platform.domain}"
+ if store.subdomain and platform.domain
+ else None
+ ),
+ })
+
+ return results
+
+ def set_custom_subdomain(
+ self, db: Session, store_id: int, platform_id: int, subdomain: str
+ ) -> StorePlatform:
+ """
+ Set or update the custom_subdomain for a store on a specific platform.
+
+ Validates:
+ - Subdomain format (lowercase, alphanumeric + hyphens)
+ - Uniqueness on the platform (no other store claims it)
+ - StorePlatform membership exists and is active
+ """
+ subdomain = subdomain.strip().lower()
+
+ # Validate format
+ if not _SUBDOMAIN_RE.match(subdomain):
+ raise ValidationException(
+ "Must be 3-63 characters, lowercase alphanumeric and hyphens, "
+ "cannot start or end with a hyphen.",
+ field="custom_subdomain",
+ )
+
+ # Find the membership
+ sp = (
+ db.query(StorePlatform)
+ .filter(
+ StorePlatform.store_id == store_id,
+ StorePlatform.platform_id == platform_id,
+ StorePlatform.is_active.is_(True),
+ )
+ .first()
+ )
+ if not sp:
+ raise ResourceNotFoundException(
+ "StorePlatform",
+ f"store_id={store_id}, platform_id={platform_id}",
+ )
+
+ # Check uniqueness on this platform (exclude current entry)
+ existing = (
+ db.query(StorePlatform)
+ .filter(
+ func.lower(StorePlatform.custom_subdomain) == subdomain,
+ StorePlatform.platform_id == platform_id,
+ StorePlatform.id != sp.id,
+ )
+ .first()
+ )
+ if existing:
+ raise ConflictException(
+ f"Subdomain '{subdomain}' is already claimed by another store "
+ f"on this platform."
+ )
+
+ sp.custom_subdomain = subdomain
+ db.flush()
+
+ logger.info(
+ f"Set custom_subdomain='{subdomain}' for store_id={store_id} "
+ f"on platform_id={platform_id}"
+ )
+ return sp
+
+ def clear_custom_subdomain(
+ self, db: Session, store_id: int, platform_id: int
+ ) -> StorePlatform:
+ """Clear the custom_subdomain for a store on a specific platform."""
+ sp = (
+ db.query(StorePlatform)
+ .filter(
+ StorePlatform.store_id == store_id,
+ StorePlatform.platform_id == platform_id,
+ StorePlatform.is_active.is_(True),
+ )
+ .first()
+ )
+ if not sp:
+ raise ResourceNotFoundException(
+ "StorePlatform",
+ f"store_id={store_id}, platform_id={platform_id}",
+ )
+
+ old_value = sp.custom_subdomain
+ sp.custom_subdomain = None
+ db.flush()
+
+ logger.info(
+ f"Cleared custom_subdomain (was '{old_value}') for store_id={store_id} "
+ f"on platform_id={platform_id}"
+ )
+ return sp
+
+
+store_subdomain_service = StoreSubdomainService()
diff --git a/app/modules/tenancy/static/admin/js/store-detail.js b/app/modules/tenancy/static/admin/js/store-detail.js
index 9b49fdff..ae85eefb 100644
--- a/app/modules/tenancy/static/admin/js/store-detail.js
+++ b/app/modules/tenancy/static/admin/js/store-detail.js
@@ -27,6 +27,13 @@ function adminStoreDetail() {
domainSaving: false,
newDomain: { domain: '', platform_id: '' },
+ // Custom subdomain management state
+ customSubdomains: [],
+ customSubdomainsLoading: false,
+ editingSubdomainPlatformId: null,
+ editingSubdomainValue: '',
+ subdomainSaving: false,
+
// Initialize
async init() {
// Load i18n translations
@@ -54,6 +61,7 @@ function adminStoreDetail() {
await Promise.all([
this.loadSubscriptions(),
this.loadDomains(),
+ this.loadCustomSubdomains(),
]);
}
} else {
@@ -274,6 +282,70 @@ function adminStoreDetail() {
}
},
+ // ====================================================================
+ // CUSTOM SUBDOMAIN MANAGEMENT
+ // ====================================================================
+
+ async loadCustomSubdomains() {
+ if (!this.store?.id) return;
+ this.customSubdomainsLoading = true;
+ try {
+ const url = `/admin/stores/${this.store.id}/custom-subdomains`;
+ const response = await apiClient.get(url);
+ this.customSubdomains = response.subdomains || [];
+ detailLog.info('Custom subdomains loaded:', this.customSubdomains.length);
+ } catch (error) {
+ if (error.status === 404) {
+ this.customSubdomains = [];
+ } else {
+ detailLog.warn('Failed to load custom subdomains:', error.message);
+ }
+ } finally {
+ this.customSubdomainsLoading = false;
+ }
+ },
+
+ startEditSubdomain(entry) {
+ this.editingSubdomainPlatformId = entry.platform_id;
+ this.editingSubdomainValue = entry.custom_subdomain || '';
+ },
+
+ cancelEditSubdomain() {
+ this.editingSubdomainPlatformId = null;
+ this.editingSubdomainValue = '';
+ },
+
+ async saveCustomSubdomain(platformId) {
+ if (!this.editingSubdomainValue || this.subdomainSaving) return;
+ this.subdomainSaving = true;
+ try {
+ await apiClient.put(
+ `/admin/stores/${this.store.id}/custom-subdomains/${platformId}`,
+ { subdomain: this.editingSubdomainValue.trim().toLowerCase() }
+ );
+ Utils.showToast('Custom subdomain saved', 'success');
+ this.cancelEditSubdomain();
+ await this.loadCustomSubdomains();
+ } catch (error) {
+ Utils.showToast(error.message || 'Failed to save subdomain', 'error');
+ } finally {
+ this.subdomainSaving = false;
+ }
+ },
+
+ async clearCustomSubdomain(platformId, subdomainName) {
+ if (!confirm(`Clear custom subdomain "${subdomainName}"? The store will use its default subdomain on this platform.`)) return;
+ try {
+ await apiClient.delete(
+ `/admin/stores/${this.store.id}/custom-subdomains/${platformId}`
+ );
+ Utils.showToast('Custom subdomain cleared', 'success');
+ await this.loadCustomSubdomains();
+ } catch (error) {
+ Utils.showToast(error.message || 'Failed to clear subdomain', 'error');
+ }
+ },
+
// Refresh store data
async refresh() {
detailLog.info('=== STORE REFRESH TRIGGERED ===');
diff --git a/app/modules/tenancy/templates/tenancy/admin/store-detail.html b/app/modules/tenancy/templates/tenancy/admin/store-detail.html
index 12f88191..bf4a2af2 100644
--- a/app/modules/tenancy/templates/tenancy/admin/store-detail.html
+++ b/app/modules/tenancy/templates/tenancy/admin/store-detail.html
@@ -492,6 +492,117 @@
+
+
+
+
+ Custom Subdomains
+
+
+
+
+
+ Each platform membership can have a custom subdomain override. If not set, the store's default subdomain
+ () is used on each platform.
+
+
+
+
+
Loading subdomains...
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Custom:
+
+
+
+
+
+
+
+ Not set
+
+
+
+ Default:
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
No active platform memberships found.
+
+
+
diff --git a/app/modules/tenancy/tests/unit/test_store_subdomain_service.py b/app/modules/tenancy/tests/unit/test_store_subdomain_service.py
new file mode 100644
index 00000000..0900bae1
--- /dev/null
+++ b/app/modules/tenancy/tests/unit/test_store_subdomain_service.py
@@ -0,0 +1,257 @@
+# tests/unit/services/test_store_subdomain_service.py
+"""Unit tests for StoreSubdomainService."""
+
+import uuid
+
+import pytest
+
+from app.exceptions.base import (
+ ConflictException,
+ ResourceNotFoundException,
+ ValidationException,
+)
+from app.modules.tenancy.models import Platform, Store
+from app.modules.tenancy.models.store_platform import StorePlatform
+from app.modules.tenancy.services.store_subdomain_service import StoreSubdomainService
+
+
+@pytest.fixture
+def subdomain_service():
+ return StoreSubdomainService()
+
+
+@pytest.fixture
+def sd_merchant(db, test_admin):
+ """Create a merchant for subdomain tests."""
+ from app.modules.tenancy.models import Merchant
+
+ unique_id = str(uuid.uuid4())[:8]
+ merchant = Merchant(
+ name=f"SD Merchant {unique_id}",
+ owner_user_id=test_admin.id,
+ contact_email=f"sd{unique_id}@test.com",
+ is_active=True,
+ is_verified=True,
+ )
+ db.add(merchant)
+ db.commit()
+ db.refresh(merchant)
+ return merchant
+
+
+@pytest.fixture
+def sd_platform(db):
+ """Create a test platform."""
+ unique_id = str(uuid.uuid4())[:8]
+ platform = Platform(
+ code=f"sd{unique_id}",
+ name=f"SD Platform {unique_id}",
+ domain=f"sd{unique_id}.example.com",
+ is_active=True,
+ )
+ db.add(platform)
+ db.commit()
+ db.refresh(platform)
+ return platform
+
+
+@pytest.fixture
+def sd_platform_2(db):
+ """Create a second test platform."""
+ unique_id = str(uuid.uuid4())[:8]
+ platform = Platform(
+ code=f"sd2{unique_id}",
+ name=f"SD Platform 2 {unique_id}",
+ domain=f"sd2{unique_id}.example.com",
+ is_active=True,
+ )
+ db.add(platform)
+ db.commit()
+ db.refresh(platform)
+ return platform
+
+
+@pytest.fixture
+def sd_store(db, sd_merchant):
+ """Create a test store."""
+ unique_id = str(uuid.uuid4())[:8]
+ store = Store(
+ store_code=f"SD{unique_id}".upper(),
+ name=f"SD Store {unique_id}",
+ subdomain=f"sdstore{unique_id}",
+ merchant_id=sd_merchant.id,
+ is_active=True,
+ )
+ db.add(store)
+ db.commit()
+ db.refresh(store)
+ return store
+
+
+@pytest.fixture
+def sd_membership(db, sd_store, sd_platform):
+ """Create a StorePlatform membership."""
+ sp = StorePlatform(
+ store_id=sd_store.id,
+ platform_id=sd_platform.id,
+ is_active=True,
+ )
+ db.add(sp)
+ db.commit()
+ db.refresh(sp)
+ return sp
+
+
+@pytest.mark.unit
+@pytest.mark.stores
+class TestGetCustomSubdomains:
+ """Tests for listing custom subdomains."""
+
+ def test_list_returns_memberships(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ results = subdomain_service.get_custom_subdomains(db, sd_store.id)
+
+ assert len(results) == 1
+ assert results[0]["platform_code"] == sd_platform.code
+ assert results[0]["custom_subdomain"] is None
+ assert results[0]["default_subdomain"] == sd_store.subdomain
+
+ def test_list_shows_custom_subdomain(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ sd_membership.custom_subdomain = "my-custom"
+ db.commit()
+
+ results = subdomain_service.get_custom_subdomains(db, sd_store.id)
+
+ assert results[0]["custom_subdomain"] == "my-custom"
+ assert results[0]["full_url"] == f"my-custom.{sd_platform.domain}"
+
+ def test_list_nonexistent_store_raises(self, db, subdomain_service):
+ with pytest.raises(ResourceNotFoundException):
+ subdomain_service.get_custom_subdomains(db, 999999)
+
+ def test_list_excludes_inactive_memberships(self, db, subdomain_service, sd_store, sd_membership):
+ sd_membership.is_active = False
+ db.commit()
+
+ results = subdomain_service.get_custom_subdomains(db, sd_store.id)
+
+ assert len(results) == 0
+
+
+@pytest.mark.unit
+@pytest.mark.stores
+class TestSetCustomSubdomain:
+ """Tests for setting a custom subdomain."""
+
+ def test_set_valid_subdomain(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ sp = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "my-store")
+
+ assert sp.custom_subdomain == "my-store"
+
+ def test_set_normalizes_to_lowercase(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ sp = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "My-Store")
+
+ assert sp.custom_subdomain == "my-store"
+
+ def test_set_strips_whitespace(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ sp = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, " my-store ")
+
+ assert sp.custom_subdomain == "my-store"
+
+ def test_set_rejects_leading_hyphen(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ with pytest.raises(ValidationException):
+ subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "-invalid")
+
+ def test_set_rejects_trailing_hyphen(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ with pytest.raises(ValidationException):
+ subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "invalid-")
+
+ def test_set_rejects_uppercase_special_chars(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ with pytest.raises(ValidationException):
+ subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "inv@lid")
+
+ def test_set_rejects_too_short(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ with pytest.raises(ValidationException):
+ subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "ab")
+
+ def test_set_nonexistent_membership_raises(self, db, subdomain_service, sd_store):
+ with pytest.raises(ResourceNotFoundException):
+ subdomain_service.set_custom_subdomain(db, sd_store.id, 999999, "test-sub")
+
+ def test_set_duplicate_on_same_platform_raises(
+ self, db, subdomain_service, sd_store, sd_membership, sd_platform, sd_merchant
+ ):
+ # Create another store with same subdomain on same platform
+ unique_id = str(uuid.uuid4())[:8]
+ other_store = Store(
+ store_code=f"OTHER{unique_id}".upper(),
+ name=f"Other Store {unique_id}",
+ subdomain=f"other{unique_id}",
+ merchant_id=sd_merchant.id,
+ is_active=True,
+ )
+ db.add(other_store)
+ db.flush()
+
+ other_sp = StorePlatform(
+ store_id=other_store.id,
+ platform_id=sd_platform.id,
+ is_active=True,
+ custom_subdomain="taken-name",
+ )
+ db.add(other_sp)
+ db.commit()
+
+ with pytest.raises(ConflictException):
+ subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "taken-name")
+
+ def test_set_same_subdomain_different_platform_ok(
+ self, db, subdomain_service, sd_store, sd_membership, sd_platform, sd_platform_2
+ ):
+ # Set on first platform
+ subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "shared-name")
+ db.commit()
+
+ # Create membership on second platform
+ sp2 = StorePlatform(
+ store_id=sd_store.id,
+ platform_id=sd_platform_2.id,
+ is_active=True,
+ )
+ db.add(sp2)
+ db.commit()
+
+ # Same subdomain on different platform should work
+ result = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform_2.id, "shared-name")
+
+ assert result.custom_subdomain == "shared-name"
+
+ def test_update_existing_subdomain(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "first")
+ db.commit()
+
+ sp = subdomain_service.set_custom_subdomain(db, sd_store.id, sd_platform.id, "second")
+
+ assert sp.custom_subdomain == "second"
+
+
+@pytest.mark.unit
+@pytest.mark.stores
+class TestClearCustomSubdomain:
+ """Tests for clearing a custom subdomain."""
+
+ def test_clear_existing_subdomain(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ sd_membership.custom_subdomain = "to-clear"
+ db.commit()
+
+ sp = subdomain_service.clear_custom_subdomain(db, sd_store.id, sd_platform.id)
+
+ assert sp.custom_subdomain is None
+
+ def test_clear_already_none_ok(self, db, subdomain_service, sd_store, sd_membership, sd_platform):
+ sp = subdomain_service.clear_custom_subdomain(db, sd_store.id, sd_platform.id)
+
+ assert sp.custom_subdomain is None
+
+ def test_clear_nonexistent_membership_raises(self, db, subdomain_service, sd_store):
+ with pytest.raises(ResourceNotFoundException):
+ subdomain_service.clear_custom_subdomain(db, sd_store.id, 999999)
diff --git a/app/modules/tenancy/tests/unit/test_tenancy_metrics.py b/app/modules/tenancy/tests/unit/test_tenancy_metrics.py
index 1fc9301c..fcd90992 100644
--- a/app/modules/tenancy/tests/unit/test_tenancy_metrics.py
+++ b/app/modules/tenancy/tests/unit/test_tenancy_metrics.py
@@ -68,15 +68,14 @@ def metrics_team_members(db, metrics_stores):
auth = AuthManager()
users = []
for i in range(3):
- u = User(
+ users.append(User(
email=f"team_{uuid.uuid4().hex[:8]}@test.com",
username=f"team_{uuid.uuid4().hex[:8]}",
hashed_password=auth.hash_password("pass123"),
role="store_user",
is_active=True,
- )
- db.add(u)
- users.append(u)
+ ))
+ db.add_all(users)
db.flush()
# User 0 on store 0 and store 1 (should be counted once)
diff --git a/docs/development/diagnostics/domain-health.md b/docs/development/diagnostics/domain-health.md
new file mode 100644
index 00000000..376a1bcd
--- /dev/null
+++ b/docs/development/diagnostics/domain-health.md
@@ -0,0 +1,162 @@
+# Domain Health Diagnostic Tool
+
+The Domain Health tool simulates the middleware resolution pipeline for every configured store access method, verifying that each domain, subdomain, and path-based route resolves to the expected store.
+
+## Overview
+
+| Aspect | Detail |
+|--------|--------|
+| Location | Admin > Diagnostics > Resolution > Domain Health |
+| URL | `/admin/platform-debug` (select "Domain Health" in sidebar) |
+| API Endpoint | `GET /api/v1/admin/debug/domain-health` |
+| Access | Super admin only |
+
+## What It Checks
+
+The tool runs four categories of checks against the **live database**:
+
+### 1. Custom Subdomains (StorePlatform)
+
+Checks every active `StorePlatform` entry that has a `custom_subdomain` set.
+
+| Example | What It Tests |
+|---------|--------------|
+| `acme-rewards.rewardflow.lu` | Does `detection_method=subdomain` with `custom_subdomain` + platform context resolve to the expected store? |
+
+This is the most critical check because the fail-closed routing policy means a custom subdomain that doesn't resolve will return a 404 rather than falling back to a global lookup.
+
+### 2. Default Subdomains per Platform
+
+Checks every active store's default `subdomain` on each platform it belongs to.
+
+| Example | What It Tests |
+|---------|--------------|
+| `acme.omsflow.lu` | Does `detection_method=subdomain` with `Store.subdomain` + platform context resolve to the expected store? |
+
+The middleware first tries `StorePlatform.custom_subdomain`, then falls back to `Store.subdomain` — but **only** if the store has an active `StorePlatform` membership on the detected platform. A store with no membership on that platform will be blocked (fail-closed, prevents cross-tenant leaks).
+
+Entries where `custom_subdomain` already equals the store's `subdomain` are de-duplicated (covered by check #1).
+
+### 3. Custom Domains (StoreDomain)
+
+Checks every active, verified `StoreDomain` entry.
+
+| Example | What It Tests |
+|---------|--------------|
+| `wizatech.shop` | Does `detection_method=custom_domain` resolve to the store that owns the domain? |
+
+The Platform column shows the platform associated with the `StoreDomain.platform_id` relationship.
+
+### 4. Path-Based Routes
+
+Checks every active store's `subdomain` field via path-based resolution.
+
+| Example | What It Tests |
+|---------|--------------|
+| `/storefront/luxweb/` | Does `detection_method=path` with `subdomain=luxweb` resolve to the correct store? |
+| `/store/luxweb/` | Same check for the store dashboard path |
+
+The Platform column shows all platforms the store is a member of (via `StorePlatform`).
+
+!!! note "Path segments use Store.subdomain"
+ Path-based URLs use the store's `subdomain` field, **not** `store_code`. For example, a store with `store_code=LUXWEBSITES` and `subdomain=luxweb` is accessed at `/storefront/luxweb/`, not `/storefront/LUXWEBSITES/`.
+
+## How It Works
+
+The tool calls the **same resolution functions** that the live middleware uses:
+
+```mermaid
+graph LR
+ A[Health Check] -->|builds context dict| B[StoreContextManager.get_store_from_context]
+ B -->|queries DB| C[Store resolved?]
+ C -->|id matches expected| D[PASS]
+ C -->|mismatch or None| E[FAIL]
+```
+
+It does **not** make actual HTTP requests. This means it validates database configuration and resolution logic, but cannot detect:
+
+- DNS misconfigurations
+- Reverse proxy / Cloudflare routing issues
+- SSL certificate problems
+- Network-level failures
+
+## Reading the Results
+
+### Summary Bar
+
+| Counter | Meaning |
+|---------|---------|
+| Total Checked | Number of access methods tested |
+| Passed | Resolved to the expected store |
+| Failed | Resolution mismatch or store not found |
+
+### Results Table
+
+| Column | Description |
+|--------|-------------|
+| Status | `pass` or `fail` badge |
+| Domain | The domain, subdomain, or path being tested |
+| Type | `custom subdomain`, `default subdomain`, `custom domain`, `path (storefront)`, or `path (store)` |
+| Platform | Platform code(s) the entry belongs to |
+| Expected Store | The store code we expect to resolve |
+| Resolved Store | The store code that actually resolved (or `--` if none) |
+| Note | Error description for failed entries |
+
+### Common Failure Scenarios
+
+| Symptom | Likely Cause |
+|---------|-------------|
+| Custom subdomain fails | `StorePlatform.custom_subdomain` doesn't match or `StorePlatform.is_active=false` |
+| Default subdomain fails | Store has no active `StorePlatform` membership on the detected platform (cross-tenant blocked) |
+| Custom domain fails | `StoreDomain` not verified, inactive, or store is inactive |
+| Path-based fails | Store's `subdomain` field is empty or store is inactive |
+| Resolved store differs from expected | Two stores have conflicting subdomain/domain entries |
+
+## API Response
+
+```
+GET /api/v1/admin/debug/domain-health
+```
+
+```json
+{
+ "total": 36,
+ "passed": 30,
+ "failed": 6,
+ "details": [
+ {
+ "domain": "acme-rewards.rewardflow.lu",
+ "type": "custom subdomain",
+ "platform_code": "loyalty",
+ "expected_store": "ACME",
+ "resolved_store": "ACME",
+ "status": "pass",
+ "note": ""
+ },
+ {
+ "domain": "acme.omsflow.lu",
+ "type": "default subdomain",
+ "platform_code": "oms",
+ "expected_store": "ACME",
+ "resolved_store": "ACME",
+ "status": "pass",
+ "note": ""
+ },
+ {
+ "domain": "/storefront/acme/",
+ "type": "path (storefront)",
+ "platform_code": "oms, loyalty",
+ "expected_store": "ACME",
+ "resolved_store": "ACME",
+ "status": "pass",
+ "note": ""
+ }
+ ]
+}
+```
+
+## Related Documentation
+
+- [Middleware Stack](../../architecture/middleware.md) - Full middleware pipeline reference
+- [Dev Tools Module](../../modules/dev_tools/index.md) - Other diagnostic tools
+- [Multi-Tenant System](../../architecture/multi-tenant.md) - Tenant detection architecture
diff --git a/middleware/frontend_type.py b/middleware/frontend_type.py
index 5d6a2f9c..3e2453cf 100644
--- a/middleware/frontend_type.py
+++ b/middleware/frontend_type.py
@@ -82,7 +82,7 @@ class FrontendTypeMiddleware(BaseHTTPMiddleware):
return response
-def get_frontend_type(request: Request) -> FrontendType:
+def get_frontend_type(request: Request) -> FrontendType | None:
"""
Helper function to get current frontend type from request.
@@ -90,6 +90,7 @@ def get_frontend_type(request: Request) -> FrontendType:
request: FastAPI request object
Returns:
- FrontendType enum value (defaults to PLATFORM if not set)
+ FrontendType enum value, or None if the middleware hasn't run yet.
+ Callers should handle None explicitly where context is clear.
"""
- return getattr(request.state, "frontend_type", FrontendType.PLATFORM)
+ return getattr(request.state, "frontend_type", None)
diff --git a/middleware/platform_context.py b/middleware/platform_context.py
index 7e8160ab..61f1a3d7 100644
--- a/middleware/platform_context.py
+++ b/middleware/platform_context.py
@@ -36,6 +36,31 @@ MAIN_PLATFORM_CODE = "main"
_LOCAL_HOSTS = {"localhost", "127.0.0.1", "testserver"}
+def _strip_port(host: str) -> str:
+ """
+ Remove port from host, handling IPv6 bracket notation.
+
+ Examples:
+ "localhost:8000" → "localhost"
+ "[::1]:8000" → "::1"
+ "[::1]" → "::1"
+ "example.com" → "example.com"
+ "192.168.1.1:80" → "192.168.1.1"
+ """
+ if not host:
+ return host
+ # IPv6 with brackets: [::1]:8000 or [::1]
+ if host.startswith("["):
+ bracket_end = host.find("]")
+ if bracket_end != -1:
+ return host[1:bracket_end]
+ return host # malformed, return as-is
+ # Regular host:port
+ if ":" in host:
+ return host.split(":")[0]
+ return host
+
+
class PlatformContextManager:
"""Manages platform context detection for multi-platform routing."""
@@ -62,7 +87,7 @@ class PlatformContextManager:
path = request.url.path
# Remove port from host if present (e.g., localhost:9999 -> localhost)
- host_without_port = host.split(":")[0] if ":" in host else host
+ host_without_port = _strip_port(host)
# Skip platform detection for admin routes - admin is global
if FrontendDetector.is_admin(host, path):
@@ -359,7 +384,7 @@ class PlatformContextMiddleware:
# For storefront API requests on localhost, the path doesn't contain
# /platforms/{code}/, so extract platform from the Referer header instead.
# e.g., Referer: http://localhost:8000/platforms/loyalty/storefront/FASHIONHUB/...
- host_without_port = host.split(":")[0] if ":" in host else host
+ host_without_port = _strip_port(host)
if (
host_without_port in _LOCAL_HOSTS
and path.startswith("/api/v1/storefront/")
@@ -450,7 +475,7 @@ class PlatformContextMiddleware:
- /platforms/oms/pricing → OMS platform pricing
- /platforms/loyalty/ → Loyalty platform homepage
"""
- host_without_port = host.split(":")[0] if ":" in host else host
+ host_without_port = _strip_port(host)
# Method 1: Domain-based (production)
if host_without_port and host_without_port not in _LOCAL_HOSTS:
diff --git a/middleware/store_context.py b/middleware/store_context.py
index c79e9c6b..29bb9136 100644
--- a/middleware/store_context.py
+++ b/middleware/store_context.py
@@ -245,7 +245,7 @@ class StoreContextManager:
)
return store
- # 2b. Fallback to Store.subdomain (global default)
+ # 2b. Fallback to Store.subdomain with platform membership check
store = (
db.query(Store)
.filter(func.lower(Store.subdomain) == subdomain.lower())
@@ -254,6 +254,33 @@ class StoreContextManager:
)
if store:
+ # When a platform context exists and detection is "subdomain",
+ # verify the store actually has an active membership on this
+ # platform. Without this check, a subdomain like
+ # "other-tenant.omsflow.lu" could resolve a store that only
+ # belongs to the loyalty platform — a cross-tenant leak.
+ if platform and context.get("detection_method") == "subdomain":
+ from app.modules.tenancy.models.store_platform import (
+ StorePlatform as SP,
+ )
+
+ has_membership = (
+ db.query(SP)
+ .filter(
+ SP.store_id == store.id,
+ SP.platform_id == platform.id,
+ SP.is_active.is_(True),
+ )
+ .first()
+ )
+ if not has_membership:
+ logger.warning(
+ f"[FAIL-CLOSED] Store '{subdomain}' exists but has no "
+ f"active membership on platform {platform.code} — "
+ f"blocking cross-tenant resolution"
+ )
+ return None
+
method = context.get("detection_method", "unknown")
logger.info(
f"[OK] Store found via {method}: {subdomain} → {store.name}"
diff --git a/middleware/storefront_access.py b/middleware/storefront_access.py
index 0685c963..9c1bfe01 100644
--- a/middleware/storefront_access.py
+++ b/middleware/storefront_access.py
@@ -83,6 +83,11 @@ def _is_static_request(path: str) -> bool:
return "favicon.ico" in lower
+def _looks_like_storefront(path: str) -> bool:
+ """Return True if path belongs to the storefront surface."""
+ return path.startswith(("/storefront/", "/api/v1/storefront/"))
+
+
class StorefrontAccessMiddleware(BaseHTTPMiddleware):
"""
Gate storefront requests behind an active subscription.
@@ -94,6 +99,25 @@ class StorefrontAccessMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
frontend_type = getattr(request.state, "frontend_type", None)
+ # Safety net: if frontend_type is None (upstream middleware failed) but
+ # the path looks like a storefront path, block instead of letting it
+ # through — a None frontend_type must never bypass the gate.
+ if frontend_type is None and _looks_like_storefront(request.url.path):
+ logger.error(
+ "[STOREFRONT_ACCESS] frontend_type is None on storefront path "
+ f"'{request.url.path}' — blocking (fail-closed). "
+ "Check middleware chain ordering."
+ )
+ if request.url.path.startswith("/api/"):
+ return JSONResponse(
+ status_code=403,
+ content={
+ "error": "storefront_not_available",
+ "reason": "middleware_misconfigured",
+ },
+ )
+ return self._render_unavailable(request, "not_found")
+
# Only gate storefront requests
if frontend_type != FrontendType.STOREFRONT:
return await call_next(request)
diff --git a/mkdocs.yml b/mkdocs.yml
index e5d02438..064d82e1 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -147,6 +147,8 @@ nav:
- Developer Documentation: development/error-rendering/error-rendering-developer-documentation.md
- Flow Diagram: development/error-rendering/html-error-rendering-flow-diagram.md
- Environment Detection: development/environment-detection.md
+ - Diagnostics:
+ - Domain Health: development/diagnostics/domain-health.md
- Database Seeder:
- Documentation: development/database-seeder/database-seeder-documentation.md
- Makefile Guide: development/database-seeder/makefile-database-seeder.md
diff --git a/scripts/seed/seed_demo.py b/scripts/seed/seed_demo.py
index c27bed4a..a7fe33f9 100644
--- a/scripts/seed/seed_demo.py
+++ b/scripts/seed/seed_demo.py
@@ -955,6 +955,7 @@ def create_demo_stores(
platform_rows = db.execute(select(Platform.id, Platform.code)).all()
platform_code_map = {code: pid for pid, code in platform_rows}
+ store_platforms = []
for i, (platform_id,) in enumerate(merchant_subs):
# Per-platform subdomain override for multi-platform stores
# Config uses platform codes; resolve to IDs
@@ -963,13 +964,14 @@ def create_demo_stores(
if platform_code_map.get(pcode) == platform_id:
custom_sub = subdomain_val
break
- sp = StorePlatform(
+ store_platforms.append(StorePlatform(
store_id=store.id,
platform_id=platform_id,
is_active=True,
custom_subdomain=custom_sub,
- )
- db.add(sp)
+ ))
+ if store_platforms:
+ db.add_all(store_platforms)
if merchant_subs:
db.flush()
diff --git a/tests/unit/middleware/test_fail_closed_store_context.py b/tests/unit/middleware/test_fail_closed_store_context.py
new file mode 100644
index 00000000..3602055b
--- /dev/null
+++ b/tests/unit/middleware/test_fail_closed_store_context.py
@@ -0,0 +1,215 @@
+# tests/unit/middleware/test_fail_closed_store_context.py
+"""
+Unit tests for fail-closed store context resolution.
+
+Verifies that subdomain detection with a platform context:
+1. Tries StorePlatform.custom_subdomain first (platform-specific override)
+2. Falls back to Store.subdomain but ONLY if the store has an active
+ membership on the detected platform (prevents cross-tenant leaks)
+3. Returns None if the store has no membership on the platform
+"""
+
+from unittest.mock import MagicMock, Mock
+
+import pytest
+from sqlalchemy.orm import Session
+
+from middleware.store_context import StoreContextManager
+
+
+@pytest.mark.unit
+@pytest.mark.middleware
+class TestFailClosedStoreContext:
+ """Ensure subdomain+platform blocks cross-tenant resolution."""
+
+ def test_subdomain_not_found_at_all_returns_none(self):
+ """
+ Platform + subdomain + no custom_subdomain match + no Store.subdomain match → None.
+ """
+ mock_db = Mock(spec=Session)
+ mock_platform = Mock()
+ mock_platform.id = 2
+ mock_platform.code = "loyalty"
+
+ # All queries return None
+ mock_db.query.return_value.filter.return_value.first.return_value = None
+ mock_db.query.return_value.filter.return_value.filter.return_value.first.return_value = None
+
+ context = {
+ "detection_method": "subdomain",
+ "subdomain": "nonexistent",
+ "_platform": mock_platform,
+ }
+
+ store = StoreContextManager.get_store_from_context(mock_db, context)
+
+ assert store is None
+
+ def test_subdomain_found_with_platform_membership_returns_store(self):
+ """
+ Platform + subdomain + Store.subdomain found + store HAS membership
+ on this platform → return store.
+ e.g. wizatech.omsflow.lu where WIZATECH is on OMS platform.
+ """
+ mock_db = Mock(spec=Session)
+ mock_platform = Mock()
+ mock_platform.id = 1
+ mock_platform.code = "oms"
+
+ mock_store = Mock()
+ mock_store.id = 10
+ mock_store.is_active = True
+ mock_store.name = "WizaTech"
+
+ mock_membership = Mock() # Active StorePlatform entry
+
+ call_count = [0]
+
+ def side_effect_query(model):
+ result = MagicMock()
+ call_count[0] += 1
+ if call_count[0] == 1:
+ # StorePlatform.custom_subdomain lookup → no match
+ result.filter.return_value.first.return_value = None
+ elif call_count[0] == 2:
+ # Store.subdomain lookup → found
+ result.filter.return_value.filter.return_value.first.return_value = mock_store
+ else:
+ # StorePlatform membership check → exists
+ result.filter.return_value.first.return_value = mock_membership
+ return result
+
+ mock_db.query.side_effect = side_effect_query
+
+ context = {
+ "detection_method": "subdomain",
+ "subdomain": "wizatech",
+ "_platform": mock_platform,
+ }
+
+ store = StoreContextManager.get_store_from_context(mock_db, context)
+
+ assert store is mock_store
+
+ def test_subdomain_found_without_platform_membership_returns_none(self):
+ """
+ Platform + subdomain + Store.subdomain found but store has NO
+ membership on this platform → None (cross-tenant blocked).
+ e.g. loyalty-only-store.omsflow.lu where the store is only on loyalty.
+ """
+ mock_db = Mock(spec=Session)
+ mock_platform = Mock()
+ mock_platform.id = 1
+ mock_platform.code = "oms"
+
+ mock_store = Mock()
+ mock_store.id = 10
+ mock_store.is_active = True
+ mock_store.name = "LoyaltyOnlyStore"
+
+ call_count = [0]
+
+ def side_effect_query(model):
+ result = MagicMock()
+ call_count[0] += 1
+ if call_count[0] == 1:
+ # StorePlatform.custom_subdomain lookup → no match
+ result.filter.return_value.first.return_value = None
+ elif call_count[0] == 2:
+ # Store.subdomain lookup → found
+ result.filter.return_value.filter.return_value.first.return_value = mock_store
+ else:
+ # StorePlatform membership check → NOT found (no membership)
+ result.filter.return_value.first.return_value = None
+ return result
+
+ mock_db.query.side_effect = side_effect_query
+
+ context = {
+ "detection_method": "subdomain",
+ "subdomain": "loyalty-only",
+ "_platform": mock_platform,
+ }
+
+ store = StoreContextManager.get_store_from_context(mock_db, context)
+
+ assert store is None
+
+ def test_custom_subdomain_found_returns_store(self):
+ """Platform + subdomain + custom_subdomain found → correct store."""
+ mock_db = Mock(spec=Session)
+ mock_platform = Mock()
+ mock_platform.id = 2
+ mock_platform.code = "loyalty"
+
+ mock_store_platform = Mock()
+ mock_store_platform.store_id = 42
+
+ mock_store = Mock()
+ mock_store.id = 42
+ mock_store.is_active = True
+ mock_store.name = "Acme Rewards"
+
+ call_count = [0]
+
+ def side_effect_query(model):
+ result = MagicMock()
+ call_count[0] += 1
+ if call_count[0] == 1:
+ result.filter.return_value.first.return_value = mock_store_platform
+ else:
+ result.filter.return_value.first.return_value = mock_store
+ return result
+
+ mock_db.query.side_effect = side_effect_query
+
+ context = {
+ "detection_method": "subdomain",
+ "subdomain": "acme-rewards",
+ "_platform": mock_platform,
+ }
+
+ store = StoreContextManager.get_store_from_context(mock_db, context)
+
+ assert store is mock_store
+
+ def test_no_platform_subdomain_uses_global_store(self):
+ """No platform + subdomain → uses Store.subdomain (dev mode)."""
+ mock_db = Mock(spec=Session)
+ mock_store = Mock()
+ mock_store.is_active = True
+ mock_store.name = "Acme"
+
+ mock_db.query.return_value.filter.return_value.filter.return_value.first.return_value = mock_store
+
+ context = {
+ "detection_method": "subdomain",
+ "subdomain": "acme",
+ # No "_platform" key — dev mode
+ }
+
+ store = StoreContextManager.get_store_from_context(mock_db, context)
+
+ assert store is mock_store
+
+ def test_path_based_uses_global_store(self):
+ """Path-based detection always uses Store.subdomain (unchanged)."""
+ mock_db = Mock(spec=Session)
+ mock_platform = Mock()
+ mock_platform.id = 1
+
+ mock_store = Mock()
+ mock_store.is_active = True
+ mock_store.name = "Acme"
+
+ mock_db.query.return_value.filter.return_value.filter.return_value.first.return_value = mock_store
+
+ context = {
+ "detection_method": "path",
+ "subdomain": "ACME",
+ "_platform": mock_platform,
+ }
+
+ store = StoreContextManager.get_store_from_context(mock_db, context)
+
+ assert store is mock_store
diff --git a/tests/unit/middleware/test_frontend_type.py b/tests/unit/middleware/test_frontend_type.py
index 66d17b0c..dfc66cf2 100644
--- a/tests/unit/middleware/test_frontend_type.py
+++ b/tests/unit/middleware/test_frontend_type.py
@@ -188,13 +188,13 @@ class TestGetFrontendTypeHelper:
assert result == FrontendType.ADMIN
def test_get_frontend_type_default(self):
- """Test getting frontend type returns PLATFORM as default."""
+ """Test getting frontend type returns None when not set (fail-aware)."""
request = Mock(spec=Request)
request.state = Mock(spec=[]) # No frontend_type attribute
result = get_frontend_type(request)
- assert result == FrontendType.PLATFORM
+ assert result is None
def test_get_frontend_type_for_all_types(self):
"""Test getting all frontend types."""
diff --git a/tests/unit/middleware/test_ipv6_host_parsing.py b/tests/unit/middleware/test_ipv6_host_parsing.py
new file mode 100644
index 00000000..c601d631
--- /dev/null
+++ b/tests/unit/middleware/test_ipv6_host_parsing.py
@@ -0,0 +1,63 @@
+# tests/unit/middleware/test_ipv6_host_parsing.py
+"""
+Unit tests for _strip_port() IPv6-safe host parsing utility.
+
+Ensures the middleware correctly strips ports from:
+- IPv4 hosts (localhost:8000)
+- IPv6 hosts ([::1]:8000)
+- Bare hostnames (example.com)
+- Edge cases (empty, malformed brackets)
+"""
+
+import pytest
+
+from middleware.platform_context import _strip_port
+
+
+@pytest.mark.unit
+@pytest.mark.middleware
+class TestStripPort:
+ """Test _strip_port() handles all host formats correctly."""
+
+ def test_ipv4_with_port(self):
+ assert _strip_port("127.0.0.1:8000") == "127.0.0.1"
+
+ def test_ipv4_without_port(self):
+ assert _strip_port("127.0.0.1") == "127.0.0.1"
+
+ def test_localhost_with_port(self):
+ assert _strip_port("localhost:9999") == "localhost"
+
+ def test_localhost_without_port(self):
+ assert _strip_port("localhost") == "localhost"
+
+ def test_domain_with_port(self):
+ assert _strip_port("example.com:443") == "example.com"
+
+ def test_domain_without_port(self):
+ assert _strip_port("example.com") == "example.com"
+
+ def test_ipv6_with_brackets_and_port(self):
+ assert _strip_port("[::1]:8000") == "::1"
+
+ def test_ipv6_with_brackets_no_port(self):
+ assert _strip_port("[::1]") == "::1"
+
+ def test_ipv6_full_address_with_port(self):
+ assert _strip_port("[2001:db8::1]:443") == "2001:db8::1"
+
+ def test_ipv6_full_address_no_port(self):
+ assert _strip_port("[2001:db8::1]") == "2001:db8::1"
+
+ def test_empty_string(self):
+ assert _strip_port("") == ""
+
+ def test_bare_hostname(self):
+ assert _strip_port("myhost") == "myhost"
+
+ def test_subdomain_with_port(self):
+ assert _strip_port("store.omsflow.lu:8080") == "store.omsflow.lu"
+
+ def test_malformed_brackets_no_closing(self):
+ """Malformed bracket with no closing ] returns as-is."""
+ assert _strip_port("[::1") == "[::1"
diff --git a/tests/unit/middleware/test_store_context_custom_subdomain.py b/tests/unit/middleware/test_store_context_custom_subdomain.py
index 3082c4e3..631c24a4 100644
--- a/tests/unit/middleware/test_store_context_custom_subdomain.py
+++ b/tests/unit/middleware/test_store_context_custom_subdomain.py
@@ -63,34 +63,15 @@ class TestCustomSubdomainResolution:
assert store is mock_store
- def test_custom_subdomain_not_found_falls_back_to_store_subdomain(self):
- """When custom_subdomain doesn't match, fall back to Store.subdomain."""
+ def test_custom_subdomain_not_found_returns_none_on_platform(self):
+ """When custom_subdomain doesn't match on a platform, return None (fail-closed)."""
mock_db = Mock(spec=Session)
mock_platform = Mock()
mock_platform.id = 2
mock_platform.code = "loyalty"
- mock_store = Mock()
- mock_store.is_active = True
- mock_store.name = "Acme Corp"
-
- # Query sequence:
- # 1. StorePlatform query → None (no custom_subdomain match)
- # 2. Store query → mock_store (subdomain match)
- call_count = [0]
-
- def side_effect_query(model):
- result = MagicMock()
- call_count[0] += 1
- if call_count[0] == 1:
- # StorePlatform query → no match
- result.filter.return_value.first.return_value = None
- else:
- # Store.subdomain fallback
- result.filter.return_value.filter.return_value.first.return_value = mock_store
- return result
-
- mock_db.query.side_effect = side_effect_query
+ # StorePlatform query → no match
+ mock_db.query.return_value.filter.return_value.first.return_value = None
context = {
"detection_method": "subdomain",
@@ -100,6 +81,25 @@ class TestCustomSubdomainResolution:
store = StoreContextManager.get_store_from_context(mock_db, context)
+ assert store is None
+
+ def test_no_platform_subdomain_uses_global_store_subdomain(self):
+ """When no platform context, subdomain detection uses global Store.subdomain."""
+ mock_db = Mock(spec=Session)
+ mock_store = Mock()
+ mock_store.is_active = True
+ mock_store.name = "Acme Corp"
+
+ mock_db.query.return_value.filter.return_value.filter.return_value.first.return_value = mock_store
+
+ context = {
+ "detection_method": "subdomain",
+ "subdomain": "acme",
+ # No "_platform" key — dev mode
+ }
+
+ store = StoreContextManager.get_store_from_context(mock_db, context)
+
assert store is mock_store
def test_no_platform_skips_custom_subdomain_lookup(self):
diff --git a/tests/unit/middleware/test_storefront_access.py b/tests/unit/middleware/test_storefront_access.py
index f280f88c..e1389ec7 100644
--- a/tests/unit/middleware/test_storefront_access.py
+++ b/tests/unit/middleware/test_storefront_access.py
@@ -172,10 +172,51 @@ class TestStorefrontAccessMiddlewarePassthrough:
call_next.assert_called_once_with(request)
@pytest.mark.asyncio
- async def test_no_frontend_type_passes_through(self):
- """Test request with no frontend_type set passes through."""
+ async def test_no_frontend_type_non_storefront_passes_through(self):
+ """Test request with no frontend_type on non-storefront path passes through."""
middleware = StorefrontAccessMiddleware(app=None)
- request = _make_request()
+ request = _make_request(path="/admin/dashboard")
+ request.state.frontend_type = None
+ call_next = AsyncMock(return_value=Mock())
+
+ await middleware.dispatch(request, call_next)
+
+ call_next.assert_called_once_with(request)
+
+ @pytest.mark.asyncio
+ async def test_no_frontend_type_storefront_path_blocked(self):
+ """Test request with no frontend_type on storefront path is blocked (fail-closed)."""
+ middleware = StorefrontAccessMiddleware(app=None)
+ request = _make_request(path="/storefront/products")
+ request.state.frontend_type = None
+ request.state.store = None
+ call_next = AsyncMock()
+
+ with patch("app.templates_config.templates") as mock_templates:
+ mock_templates.TemplateResponse.return_value = Mock(status_code=403)
+ await middleware.dispatch(request, call_next)
+
+ call_next.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_no_frontend_type_storefront_api_blocked_json(self):
+ """Test request with no frontend_type on storefront API returns JSON 403."""
+ middleware = StorefrontAccessMiddleware(app=None)
+ request = _make_request(path="/api/v1/storefront/cart")
+ request.state.frontend_type = None
+ call_next = AsyncMock()
+
+ response = await middleware.dispatch(request, call_next)
+
+ call_next.assert_not_called()
+ assert isinstance(response, JSONResponse)
+ assert response.status_code == 403
+
+ @pytest.mark.asyncio
+ async def test_no_frontend_type_static_passes_through(self):
+ """Test request with no frontend_type on static path passes through."""
+ middleware = StorefrontAccessMiddleware(app=None)
+ request = _make_request(path="/static/css/style.css")
request.state.frontend_type = None
call_next = AsyncMock(return_value=Mock())
diff --git a/tests/unit/middleware/test_storefront_gate_bypass.py b/tests/unit/middleware/test_storefront_gate_bypass.py
new file mode 100644
index 00000000..27d7b023
--- /dev/null
+++ b/tests/unit/middleware/test_storefront_gate_bypass.py
@@ -0,0 +1,126 @@
+# tests/unit/middleware/test_storefront_gate_bypass.py
+"""
+Unit tests for the storefront gate bypass safety net.
+
+Ensures that when frontend_type is None (upstream middleware failed),
+storefront paths are BLOCKED rather than passed through. Non-storefront
+paths with None frontend_type should pass through unchanged.
+"""
+
+from unittest.mock import AsyncMock, Mock, patch
+
+import pytest
+from fastapi import Request
+from starlette.responses import JSONResponse
+
+from middleware.storefront_access import (
+ StorefrontAccessMiddleware,
+ _looks_like_storefront,
+)
+
+
+@pytest.mark.unit
+@pytest.mark.middleware
+class TestLooksLikeStorefront:
+ """Test the _looks_like_storefront helper."""
+
+ def test_storefront_page_path(self):
+ assert _looks_like_storefront("/storefront/products") is True
+
+ def test_storefront_api_path(self):
+ assert _looks_like_storefront("/api/v1/storefront/cart") is True
+
+ def test_admin_path(self):
+ assert _looks_like_storefront("/admin/dashboard") is False
+
+ def test_static_path(self):
+ assert _looks_like_storefront("/static/css/style.css") is False
+
+ def test_store_path(self):
+ assert _looks_like_storefront("/store/ACME/login") is False
+
+ def test_root_path(self):
+ assert _looks_like_storefront("/") is False
+
+
+@pytest.mark.unit
+@pytest.mark.middleware
+class TestStorefrontGateBypass:
+ """Test that None frontend_type can't bypass the storefront gate."""
+
+ @pytest.mark.asyncio
+ async def test_none_frontend_type_storefront_page_blocked(self):
+ """frontend_type=None + /storefront/ path → blocked with HTML 403."""
+ middleware = StorefrontAccessMiddleware(app=None)
+ request = Mock(spec=Request)
+ request.url = Mock(path="/storefront/products")
+ request.state = Mock()
+ request.state.frontend_type = None
+ request.state.store = None
+ request.state.language = "en"
+ request.state.theme = None
+ call_next = AsyncMock()
+
+ with patch("app.templates_config.templates") as mock_templates:
+ mock_templates.TemplateResponse.return_value = Mock(status_code=403)
+ await middleware.dispatch(request, call_next)
+
+ call_next.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_none_frontend_type_storefront_api_blocked(self):
+ """frontend_type=None + /api/v1/storefront/ path → JSON 403."""
+ middleware = StorefrontAccessMiddleware(app=None)
+ request = Mock(spec=Request)
+ request.url = Mock(path="/api/v1/storefront/cart")
+ request.state = Mock()
+ request.state.frontend_type = None
+ call_next = AsyncMock()
+
+ response = await middleware.dispatch(request, call_next)
+
+ call_next.assert_not_called()
+ assert isinstance(response, JSONResponse)
+ assert response.status_code == 403
+
+ @pytest.mark.asyncio
+ async def test_none_frontend_type_admin_passes_through(self):
+ """frontend_type=None + /admin/ path → passes through (not storefront)."""
+ middleware = StorefrontAccessMiddleware(app=None)
+ request = Mock(spec=Request)
+ request.url = Mock(path="/admin/dashboard")
+ request.state = Mock()
+ request.state.frontend_type = None
+ call_next = AsyncMock(return_value=Mock())
+
+ await middleware.dispatch(request, call_next)
+
+ call_next.assert_called_once_with(request)
+
+ @pytest.mark.asyncio
+ async def test_none_frontend_type_static_passes_through(self):
+ """frontend_type=None + /static/ path → passes through."""
+ middleware = StorefrontAccessMiddleware(app=None)
+ request = Mock(spec=Request)
+ request.url = Mock(path="/static/css/style.css")
+ request.state = Mock()
+ request.state.frontend_type = None
+ call_next = AsyncMock(return_value=Mock())
+
+ await middleware.dispatch(request, call_next)
+
+ call_next.assert_called_once_with(request)
+
+ @pytest.mark.asyncio
+ async def test_none_frontend_type_root_passes_through(self):
+ """frontend_type=None + / path → passes through."""
+ middleware = StorefrontAccessMiddleware(app=None)
+ request = Mock(spec=Request)
+ request.url = Mock(path="/")
+ request.state = Mock()
+ request.state.frontend_type = None
+ call_next = AsyncMock(return_value=Mock())
+
+ await middleware.dispatch(request, call_next)
+
+ call_next.assert_called_once_with(request)