Files
orion/app/modules/tenancy/services/user_auth_service.py
Samir Boulahtit d9fc52d47a
Some checks failed
CI / ruff (push) Successful in 10s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled
feat: email verification, merchant/store password reset, seed gap fix
- Add EmailVerificationToken and UserPasswordResetToken models with migration
- Add email verification flow: verify-email page route, resend-verification API
- Block login for unverified users (EmailNotVerifiedException in auth_service)
- Add forgot-password/reset-password endpoints for merchant and store auth
- Add "Forgot Password?" links to merchant and store login pages
- Send welcome email with verification link on merchant creation
- Seed email_verification and merchant_password_reset email templates
- Fix db-reset Makefile to run all init-prod seed scripts
- Add UserAuthService to satisfy architecture validation rules
- Add 52 new tests (unit + integration) with full coverage

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 23:22:46 +01:00

92 lines
2.8 KiB
Python

# app/modules/tenancy/services/user_auth_service.py
"""
Service for user-level auth operations: password reset, email verification.
Handles:
- Password reset token creation, validation, and password update
- Email verification token creation and resend logic
"""
import logging
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.modules.tenancy.exceptions import InvalidTokenException
from app.modules.tenancy.models import User
from app.modules.tenancy.models.email_verification_token import EmailVerificationToken
from app.modules.tenancy.models.user_password_reset_token import UserPasswordResetToken
from middleware.auth import AuthManager
logger = logging.getLogger(__name__)
class UserAuthService:
"""Service for user password reset and email verification."""
def __init__(self):
self.auth_manager = AuthManager()
def request_password_reset(self, db: Session, email: str) -> tuple[User | None, str | None]:
"""
Create a password reset token for a user by email.
Returns (user, plaintext_token) if user exists and is active,
otherwise (None, None).
"""
user = db.execute(
select(User).where(User.email == email)
).scalar_one_or_none()
if not user or not user.is_active:
return None, None
plaintext_token = UserPasswordResetToken.create_for_user(db, user.id)
return user, plaintext_token
def reset_password(self, db: Session, token: str, new_password: str) -> User:
"""
Validate reset token and set new password.
Args:
db: Database session
token: Plaintext reset token
new_password: New password to set
Returns:
The user whose password was reset
Raises:
InvalidTokenException: If token is invalid or expired
"""
token_record = UserPasswordResetToken.find_valid_token(db, token)
if not token_record:
raise InvalidTokenException("Invalid or expired password reset token")
user = token_record.user
user.hashed_password = self.auth_manager.hash_password(new_password)
token_record.mark_used(db)
return user
def request_verification_resend(self, db: Session, email: str) -> tuple[User | None, str | None]:
"""
Create a new email verification token for an unverified user.
Returns (user, plaintext_token) if user exists and is not yet verified,
otherwise (None, None).
"""
user = db.execute(
select(User).where(User.email == email)
).scalar_one_or_none()
if not user or user.is_email_verified:
return None, None
plaintext_token = EmailVerificationToken.create_for_user(db, user.id)
return user, plaintext_token
user_auth_service = UserAuthService()