Files
orion/app/modules/tenancy/routes/api/store_auth.py
Samir Boulahtit cfce6c0ca4
Some checks failed
CI / ruff (push) Successful in 10s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled
fix: loyalty module end-to-end — merchant route, store menus, sidebar, API error handling
- Add merchant loyalty overview route and template (was 404)
- Fix store loyalty route paths to match menu URLs (/{store_code}/loyalty/...)
- Add loyalty rewards card to storefront account dashboard
- Fix merchant overview to resolve merchant via get_merchant_for_current_user_page
- Fix store login to use store's primary platform for JWT token (interim fix)
- Fix apiClient to attach status/errorCode to thrown errors (fixes error.status
  checks in 12+ JS files — loyalty settings, terminal, email templates, etc.)
- Hide "Add Product" sidebar button when catalog module is not enabled
- Add proposal doc for proper platform detection in store login flow

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:52:11 +01:00

322 lines
11 KiB
Python

# app/modules/tenancy/routes/api/store_auth.py
"""
Store team authentication endpoints.
Implements dual token storage with path restriction:
- Sets HTTP-only cookie with path=/store (restricted to store routes only)
- Returns token in response for localStorage (API calls)
This prevents:
- Store cookies from being sent to admin routes
- Admin cookies from being sent to store routes
- Cross-context authentication confusion
"""
import logging
from fastapi import APIRouter, Depends, Request, Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_store_api
from app.core.database import get_db
from app.core.environment import should_use_secure_cookies
from app.modules.core.services.auth_service import auth_service
from app.modules.tenancy.exceptions import InvalidCredentialsException
from app.modules.tenancy.models.user_password_reset_token import (
UserPasswordResetToken, # noqa: API-007
)
from app.modules.tenancy.services.user_auth_service import user_auth_service
from middleware.platform_context import get_current_platform
from middleware.store_context import get_current_store
from models.schema.auth import LogoutResponse, StoreUserResponse, UserContext, UserLogin
store_auth_router = APIRouter(prefix="/auth")
logger = logging.getLogger(__name__)
# Response model for store login
class StoreLoginResponse(BaseModel):
access_token: str
token_type: str
expires_in: int
user: dict
store: dict
store_role: str
@store_auth_router.post("/login", response_model=StoreLoginResponse)
def store_login(
user_credentials: UserLogin,
request: Request,
response: Response,
db: Session = Depends(get_db),
):
"""
Store team member login.
Authenticates users who are part of a store team.
Validates against store context if available.
Sets token in two places:
1. HTTP-only cookie with path=/store (for browser page navigation)
2. Response body (for localStorage and API calls)
Prevents admin users from logging into store portal.
"""
# Try to get store from middleware first
store = get_current_store(request)
# If no store from middleware, try to get from request body
if not store and hasattr(user_credentials, "store_code"):
store_code = getattr(user_credentials, "store_code", None)
if store_code:
store = auth_service.get_store_by_code(db, store_code)
# Authenticate user
login_result = auth_service.login_user(db=db, user_credentials=user_credentials)
user = login_result["user"]
# CRITICAL: Prevent admin users from using store login
if user.is_admin:
logger.warning(f"Admin user attempted store login: {user.username}")
raise InvalidCredentialsException(
"Admins cannot access store portal. Please use admin portal."
)
# Determine store and role
store_role = "Member"
if store:
# Check if user has access to this store
has_access, role = auth_service.get_user_store_role(db, user, store)
if has_access:
store_role = role
else:
logger.warning(
f"User {user.username} attempted login to store {store.store_code} "
f"but is not authorized"
)
raise InvalidCredentialsException("You do not have access to this store")
else:
# No store context - find which store this user belongs to
store, store_role = auth_service.find_user_store(user)
if not store:
raise InvalidCredentialsException("User is not associated with any store")
logger.info(
f"Store team login successful: {user.username} "
f"for store {store.store_code} as {store_role}"
)
# Resolve platform from the store's primary platform link.
# Middleware-detected platform is unreliable for API paths on localhost
# (e.g., /api/v1/store/auth/login defaults to "main" instead of the store's platform).
platform_id = None
platform_code = None
if store:
from app.modules.core.services.menu_service import menu_service
from app.modules.tenancy.services.platform_service import platform_service
primary_pid = menu_service.get_store_primary_platform_id(db, store.id)
if primary_pid:
plat = platform_service.get_platform_by_id(db, primary_pid)
if plat:
platform_id = plat.id
platform_code = plat.code
if platform_id is None:
# Fallback to middleware-detected platform
platform = get_current_platform(request)
platform_id = platform.id if platform else None
platform_code = platform.code if platform else None
# Create store-scoped access token with store information
token_data = auth_service.auth_manager.create_access_token(
user=user,
store_id=store.id,
store_code=store.store_code,
store_role=store_role,
platform_id=platform_id,
platform_code=platform_code,
)
# Set HTTP-only cookie for browser navigation
# CRITICAL: path=/store restricts cookie to store routes only
response.set_cookie(
key="store_token",
value=token_data["access_token"],
httponly=True, # JavaScript cannot access (XSS protection)
secure=should_use_secure_cookies(), # HTTPS only in production/staging
samesite="lax", # CSRF protection
max_age=token_data["expires_in"], # Match JWT expiry
path="/store", # RESTRICTED TO STORE ROUTES ONLY
)
logger.debug(
f"Set store_token cookie with {token_data['expires_in']}s expiry "
f"(path=/store, httponly=True, secure={should_use_secure_cookies()})"
)
# Return full login response with store-scoped token
return StoreLoginResponse(
access_token=token_data["access_token"],
token_type=token_data["token_type"],
expires_in=token_data["expires_in"],
user={
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role,
"is_active": user.is_active,
},
store={
"id": store.id,
"store_code": store.store_code,
"subdomain": store.subdomain,
"name": store.name,
"is_active": store.is_active,
"is_verified": store.is_verified,
},
store_role=store_role,
)
@store_auth_router.post("/logout", response_model=LogoutResponse)
def store_logout(response: Response):
"""
Store team member logout.
Clears the store_token cookie.
Client should also remove token from localStorage.
"""
logger.info("Store logout")
# Clear the cookie (must match path used when setting)
response.delete_cookie(
key="store_token",
path="/store",
)
logger.debug("Deleted store_token cookie")
return LogoutResponse(message="Logged out successfully")
@store_auth_router.get("/me", response_model=StoreUserResponse)
def get_current_store_user(
user: UserContext = Depends(get_current_store_api), db: Session = Depends(get_db)
):
"""
Get current authenticated store user.
This endpoint can be called to verify authentication and get user info.
Requires Authorization header (header-only authentication for API endpoints).
"""
return StoreUserResponse(
id=user.id,
username=user.username,
email=user.email,
role=user.role,
is_active=user.is_active,
)
# ============================================================================
# PASSWORD RESET
# ============================================================================
class StoreForgotPasswordRequest(BaseModel):
email: str
class StoreForgotPasswordResponse(BaseModel):
message: str
class StoreResetPasswordRequest(BaseModel):
token: str
new_password: str
class StoreResetPasswordResponse(BaseModel):
message: str
@store_auth_router.post("/forgot-password", response_model=StoreForgotPasswordResponse)
def store_forgot_password(
request: Request,
body: StoreForgotPasswordRequest,
db: Session = Depends(get_db),
):
"""
Request password reset for store team user.
Sends password reset email if account exists.
Always returns success to prevent email enumeration.
"""
user, plaintext_token = user_auth_service.request_password_reset(db, body.email)
if user and plaintext_token:
try:
scheme = "https" if should_use_secure_cookies() else "http"
host = request.headers.get("host", "localhost:8000")
# Include store code in reset link if available
store = get_current_store(request)
if store:
reset_link = f"{scheme}://{host}/store/{store.store_code}/reset-password?token={plaintext_token}"
else:
reset_link = f"{scheme}://{host}/merchants/reset-password?token={plaintext_token}"
from app.modules.messaging.services.email_service import EmailService
email_service = EmailService(db)
email_service.send_template(
template_code="merchant_password_reset",
to_email=user.email,
to_name=user.username,
language="en",
variables={
"first_name": user.username,
"reset_link": reset_link,
"expiry_hours": str(UserPasswordResetToken.TOKEN_EXPIRY_HOURS),
"platform_name": "Orion",
},
store_id=store.id if store else None,
)
db.commit()
logger.info(f"Password reset email sent to {user.email}") # noqa: SEC021
except Exception as e:
db.rollback()
logger.error(f"Failed to send password reset email: {e}") # noqa: SEC021
else:
logger.info(f"Password reset requested for non-existent/inactive email {body.email}") # noqa: SEC021
return StoreForgotPasswordResponse(
message="If an account exists with this email, a password reset link has been sent."
)
@store_auth_router.post("/reset-password", response_model=StoreResetPasswordResponse)
def store_reset_password(
body: StoreResetPasswordRequest,
db: Session = Depends(get_db),
):
"""
Reset store team user password using reset token.
Validates the token and sets the new password.
"""
user = user_auth_service.reset_password(db, body.token, body.new_password)
db.commit()
logger.info(f"Password reset completed for user {user.id} ({user.email})") # noqa: SEC021
return StoreResetPasswordResponse(
message="Password reset successfully. You can now log in with your new password."
)