Files
orion/app/modules/tenancy/routes/api/merchant_auth.py
Samir Boulahtit d9fc52d47a
Some checks failed
CI / ruff (push) Successful in 10s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled
feat: email verification, merchant/store password reset, seed gap fix
- Add EmailVerificationToken and UserPasswordResetToken models with migration
- Add email verification flow: verify-email page route, resend-verification API
- Block login for unverified users (EmailNotVerifiedException in auth_service)
- Add forgot-password/reset-password endpoints for merchant and store auth
- Add "Forgot Password?" links to merchant and store login pages
- Send welcome email with verification link on merchant creation
- Seed email_verification and merchant_password_reset email templates
- Fix db-reset Makefile to run all init-prod seed scripts
- Add UserAuthService to satisfy architecture validation rules
- Add 52 new tests (unit + integration) with full coverage

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 23:22:46 +01:00

214 lines
7.0 KiB
Python

# app/modules/tenancy/routes/api/merchant_auth.py
"""
Merchant authentication endpoints.
Implements dual token storage with path restriction:
- Sets HTTP-only cookie with path=/merchants (restricted to merchant routes only)
- Returns token in response for localStorage (API calls)
This prevents merchant cookies from being sent to admin or store routes.
"""
import logging
from fastapi import APIRouter, Depends, Request, Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_merchant_from_cookie_or_header
from app.core.database import get_db
from app.core.environment import should_use_secure_cookies
from app.modules.core.services.auth_service import auth_service
from app.modules.tenancy.models.user_password_reset_token import (
UserPasswordResetToken, # noqa: API-007
)
from app.modules.tenancy.services.user_auth_service import user_auth_service
from models.schema.auth import (
LoginResponse,
LogoutResponse,
UserContext,
UserLogin,
UserResponse,
)
merchant_auth_router = APIRouter(prefix="/auth")
logger = logging.getLogger(__name__)
@merchant_auth_router.post("/login", response_model=LoginResponse)
def merchant_login(
user_credentials: UserLogin, response: Response, db: Session = Depends(get_db)
):
"""
Merchant login endpoint.
Only allows users who own at least one active merchant to login.
Returns JWT token for authenticated merchant users.
Sets token in two places:
1. HTTP-only cookie with path=/merchants (for browser page navigation)
2. Response body (for localStorage and API calls)
The cookie is restricted to /merchants/* routes only to prevent
it from being sent to admin or store routes.
"""
# Authenticate user and verify merchant ownership
login_result = auth_service.login_merchant(db=db, user_credentials=user_credentials)
logger.info(f"Merchant login successful: {login_result['user'].username}")
# Set HTTP-only cookie for browser navigation
# CRITICAL: path=/merchants restricts cookie to merchant routes only
response.set_cookie(
key="merchant_token",
value=login_result["token_data"]["access_token"],
httponly=True, # JavaScript cannot access (XSS protection)
secure=should_use_secure_cookies(), # HTTPS only in production/staging
samesite="lax", # CSRF protection
max_age=login_result["token_data"]["expires_in"], # Match JWT expiry
path="/merchants", # RESTRICTED TO MERCHANT ROUTES ONLY
)
logger.debug(
f"Set merchant_token cookie with {login_result['token_data']['expires_in']}s expiry "
f"(path=/merchants, httponly=True, secure={should_use_secure_cookies()})"
)
# Also return token in response for localStorage (API calls)
return LoginResponse(
access_token=login_result["token_data"]["access_token"],
token_type=login_result["token_data"]["token_type"],
expires_in=login_result["token_data"]["expires_in"],
user=login_result["user"],
)
@merchant_auth_router.get("/me", response_model=UserResponse)
def get_current_merchant(
current_user: UserContext = Depends(get_current_merchant_from_cookie_or_header),
):
"""
Get current authenticated merchant user.
This endpoint validates the token and ensures the user owns merchants.
Returns the current user's information.
Token can come from:
- Authorization header (API calls)
- merchant_token cookie (browser navigation, path=/merchants only)
"""
logger.info(f"Merchant user info requested: {current_user.username}")
return current_user
@merchant_auth_router.post("/logout", response_model=LogoutResponse)
def merchant_logout(response: Response):
"""
Merchant logout endpoint.
Clears the merchant_token cookie.
Client should also remove token from localStorage.
"""
logger.info("Merchant logout")
# Clear the cookie (must match path used when setting)
response.delete_cookie(
key="merchant_token",
path="/merchants",
)
logger.debug("Deleted merchant_token cookie (path=/merchants)")
return LogoutResponse(message="Logged out successfully")
# ============================================================================
# PASSWORD RESET
# ============================================================================
class ForgotPasswordRequest(BaseModel):
email: str
class ForgotPasswordResponse(BaseModel):
message: str
class ResetPasswordRequest(BaseModel):
token: str
new_password: str
class ResetPasswordResponse(BaseModel):
message: str
@merchant_auth_router.post("/forgot-password", response_model=ForgotPasswordResponse)
def merchant_forgot_password(
request: Request,
body: ForgotPasswordRequest,
db: Session = Depends(get_db),
):
"""
Request password reset for merchant user.
Sends password reset email if account exists.
Always returns success to prevent email enumeration.
"""
user, plaintext_token = user_auth_service.request_password_reset(db, body.email)
if user and plaintext_token:
try:
scheme = "https" if should_use_secure_cookies() else "http"
host = request.headers.get("host", "localhost:8000")
reset_link = f"{scheme}://{host}/merchants/reset-password?token={plaintext_token}"
from app.modules.messaging.services.email_service import EmailService
email_service = EmailService(db)
email_service.send_template(
template_code="merchant_password_reset",
to_email=user.email,
to_name=user.username,
language="en",
variables={
"first_name": user.username,
"reset_link": reset_link,
"expiry_hours": str(UserPasswordResetToken.TOKEN_EXPIRY_HOURS),
"platform_name": "Orion",
},
)
db.commit()
logger.info(f"Password reset email sent to {user.email}") # noqa: SEC021
except Exception as e:
db.rollback()
logger.error(f"Failed to send password reset email: {e}") # noqa: SEC021
else:
logger.info(f"Password reset requested for non-existent/inactive email {body.email}") # noqa: SEC021
return ForgotPasswordResponse(
message="If an account exists with this email, a password reset link has been sent."
)
@merchant_auth_router.post("/reset-password", response_model=ResetPasswordResponse)
def merchant_reset_password(
body: ResetPasswordRequest,
db: Session = Depends(get_db),
):
"""
Reset merchant user password using reset token.
Validates the token and sets the new password.
"""
user = user_auth_service.reset_password(db, body.token, body.new_password)
db.commit()
logger.info(f"Password reset completed for user {user.id} ({user.email})") # noqa: SEC021
return ResetPasswordResponse(
message="Password reset successfully. You can now log in with your new password."
)