diff --git a/alembic/versions/t8b9c0d1e2f3_add_password_reset_tokens.py b/alembic/versions/t8b9c0d1e2f3_add_password_reset_tokens.py
new file mode 100644
index 00000000..329a3189
--- /dev/null
+++ b/alembic/versions/t8b9c0d1e2f3_add_password_reset_tokens.py
@@ -0,0 +1,53 @@
+"""add password_reset_tokens table
+
+Revision ID: t8b9c0d1e2f3
+Revises: s7a8b9c0d1e2
+Create Date: 2026-01-03
+
+"""
+
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "t8b9c0d1e2f3"
+down_revision = "s7a8b9c0d1e2"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "password_reset_tokens",
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("customer_id", sa.Integer(), nullable=False),
+ sa.Column("token_hash", sa.String(64), nullable=False),
+ sa.Column("expires_at", sa.DateTime(), nullable=False),
+ sa.Column("used_at", sa.DateTime(), nullable=True),
+ sa.Column(
+ "created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False
+ ),
+ sa.ForeignKeyConstraint(
+ ["customer_id"],
+ ["customers.id"],
+ ondelete="CASCADE",
+ ),
+ sa.PrimaryKeyConstraint("id"),
+ )
+ op.create_index(
+ "ix_password_reset_tokens_customer_id",
+ "password_reset_tokens",
+ ["customer_id"],
+ )
+ op.create_index(
+ "ix_password_reset_tokens_token_hash",
+ "password_reset_tokens",
+ ["token_hash"],
+ )
+
+
+def downgrade() -> None:
+ op.drop_index("ix_password_reset_tokens_token_hash", table_name="password_reset_tokens")
+ op.drop_index("ix_password_reset_tokens_customer_id", table_name="password_reset_tokens")
+ op.drop_table("password_reset_tokens")
diff --git a/app/api/v1/shop/auth.py b/app/api/v1/shop/auth.py
index 88caade1..e0f83cec 100644
--- a/app/api/v1/shop/auth.py
+++ b/app/api/v1/shop/auth.py
@@ -16,7 +16,7 @@ This prevents:
import logging
-from fastapi import APIRouter, Depends, Request, Response
+from fastapi import APIRouter, Depends, HTTPException, Request, Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
@@ -24,6 +24,9 @@ from app.core.database import get_db
from app.core.environment import should_use_secure_cookies
from app.exceptions import VendorNotFoundException
from app.services.customer_service import customer_service
+from app.services.email_service import EmailService
+from models.database.customer import Customer
+from models.database.password_reset_token import PasswordResetToken
from models.schema.auth import (
LogoutResponse,
PasswordResetRequestResponse,
@@ -275,14 +278,61 @@ def forgot_password(request: Request, email: str, db: Session = Depends(get_db))
},
)
- # TODO: Implement password reset functionality
- # - Generate reset token
- # - Store token in database with expiry
- # - Send reset email to customer
- # - Return success message (don't reveal if email exists)
+ # Look up customer by email (vendor-scoped)
+ customer = (
+ db.query(Customer)
+ .filter(
+ Customer.vendor_id == vendor.id,
+ Customer.email == email.lower(),
+ Customer.is_active == True, # noqa: E712
+ )
+ .first()
+ )
- logger.info(f"Password reset requested for {email} (vendor: {vendor.subdomain})") # noqa: sec-021
+ # If customer exists, generate token and send email
+ if customer:
+ try:
+ # Generate reset token (returns plaintext token)
+ plaintext_token = PasswordResetToken.create_for_customer(db, customer.id)
+ # Build reset link
+ # Use request host to construct the URL
+ scheme = "https" if should_use_secure_cookies() else "http"
+ host = request.headers.get("host", "localhost")
+ reset_link = f"{scheme}://{host}/shop/account/reset-password?token={plaintext_token}"
+
+ # Send password reset email
+ email_service = EmailService(db)
+ email_service.send_template(
+ template_code="password_reset",
+ to_email=customer.email,
+ to_name=customer.full_name,
+ language=customer.preferred_language or "en",
+ variables={
+ "customer_name": customer.first_name or customer.full_name,
+ "reset_link": reset_link,
+ "expiry_hours": str(PasswordResetToken.TOKEN_EXPIRY_HOURS),
+ },
+ vendor_id=vendor.id,
+ related_type="customer",
+ related_id=customer.id,
+ )
+
+ db.commit()
+ logger.info(
+ f"Password reset email sent to {email} (vendor: {vendor.subdomain})"
+ )
+ except Exception as e:
+ db.rollback()
+ logger.error(f"Failed to send password reset email: {e}")
+ # Don't reveal the error to the user for security
+ else:
+ # Log but don't reveal that email doesn't exist
+ logger.info(
+ f"Password reset requested for non-existent email {email} (vendor: {vendor.subdomain})"
+ )
+
+ # Always return the same message (don't reveal if email exists)
return PasswordResetRequestResponse(
message="If an account exists with this email, a password reset link has been sent."
)
@@ -299,7 +349,7 @@ def reset_password(
Request Body:
- reset_token: Password reset token from email
- - new_password: New password
+ - new_password: New password (minimum 8 characters)
"""
# Get vendor from middleware
vendor = getattr(request.state, "vendor", None)
@@ -315,14 +365,49 @@ def reset_password(
},
)
- # TODO: Implement password reset
- # - Validate reset token
- # - Check token expiry
- # - Update customer password
- # - Invalidate reset token
- # - Return success
+ # Validate password length
+ if len(new_password) < 8:
+ raise HTTPException(
+ status_code=400,
+ detail="Password must be at least 8 characters long",
+ )
- logger.info(f"Password reset completed (vendor: {vendor.subdomain})") # noqa: sec-021
+ # Find valid token
+ token_record = PasswordResetToken.find_valid_token(db, reset_token)
+
+ if not token_record:
+ raise HTTPException(
+ status_code=400,
+ detail="Invalid or expired password reset link. Please request a new one.",
+ )
+
+ # Get the customer and verify they belong to this vendor
+ customer = db.query(Customer).filter(Customer.id == token_record.customer_id).first()
+
+ if not customer or customer.vendor_id != vendor.id:
+ raise HTTPException(
+ status_code=400,
+ detail="Invalid or expired password reset link. Please request a new one.",
+ )
+
+ if not customer.is_active:
+ raise HTTPException(
+ status_code=400,
+ detail="This account is not active. Please contact support.",
+ )
+
+ # Hash the new password and update customer
+ hashed_password = customer_service.auth_service.hash_password(new_password)
+ customer.hashed_password = hashed_password
+
+ # Mark token as used
+ token_record.mark_used(db)
+
+ db.commit()
+
+ logger.info(
+ f"Password reset completed for customer {customer.id} (vendor: {vendor.subdomain})"
+ )
return PasswordResetResponse(
message="Password reset successfully. You can now log in with your new password."
diff --git a/app/routes/shop_pages.py b/app/routes/shop_pages.py
index 0f6c6d4b..a548151c 100644
--- a/app/routes/shop_pages.py
+++ b/app/routes/shop_pages.py
@@ -409,6 +409,32 @@ async def shop_forgot_password_page(request: Request, db: Session = Depends(get_
)
+@router.get(
+ "/account/reset-password", response_class=HTMLResponse, include_in_schema=False
+)
+async def shop_reset_password_page(
+ request: Request, token: str = None, db: Session = Depends(get_db)
+):
+ """
+ Render reset password page.
+ User lands here after clicking the reset link in their email.
+ Token is passed as query parameter.
+ """
+ logger.debug(
+ "[SHOP_HANDLER] shop_reset_password_page REACHED",
+ extra={
+ "path": request.url.path,
+ "vendor": getattr(request.state, "vendor", "NOT SET"),
+ "context": getattr(request.state, "context_type", "NOT SET"),
+ "has_token": bool(token),
+ },
+ )
+
+ return templates.TemplateResponse(
+ "shop/account/reset-password.html", get_shop_context(request, db=db)
+ )
+
+
# ============================================================================
# CUSTOMER ACCOUNT - AUTHENTICATED ROUTES
# ============================================================================
diff --git a/app/templates/shop/account/reset-password.html b/app/templates/shop/account/reset-password.html
new file mode 100644
index 00000000..37217b3a
--- /dev/null
+++ b/app/templates/shop/account/reset-password.html
@@ -0,0 +1,321 @@
+{# app/templates/shop/account/reset-password.html #}
+{# standalone #}
+
+
+
+
+
+ Reset Password - {{ vendor.name }}
+
+
+
+
+ {# CRITICAL: Inject theme CSS variables #}
+
+
+ {# Tailwind CSS v4 (built locally via standalone CLI) #}
+
+
+
+
+
+
+
+
+
+ {% if theme.branding.logo %}
+
+ {% else %}
+
đ
+ {% endif %}
+
{{ vendor.name }}
+
Create new password
+
+
+
+
+
+
+
+
+
+
+
+
+ Invalid or Expired Link
+
+
+
+ This password reset link is invalid or has expired.
+ Please request a new password reset link.
+
+
+
+ Request New Link
+
+
+
+
+
+
+
+
+ Reset Your Password
+
+
+
+ Enter your new password below. Password must be at least 8 characters.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Password Reset Complete
+
+
+
+ Your password has been successfully reset.
+ You can now sign in with your new password.
+
+
+
+ Sign In
+
+
+
+
+
+
+
+ Remember your password?
+
+ Sign in
+
+
+
+
+ â Continue shopping
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/models/database/__init__.py b/models/database/__init__.py
index 806715a4..e381da05 100644
--- a/models/database/__init__.py
+++ b/models/database/__init__.py
@@ -18,6 +18,7 @@ from .base import Base
from .company import Company
from .content_page import ContentPage
from .customer import Customer, CustomerAddress
+from .password_reset_token import PasswordResetToken
from .email import EmailCategory, EmailLog, EmailStatus, EmailTemplate
from .feature import Feature, FeatureCategory, FeatureCode, FeatureUILocation
from .inventory import Inventory
@@ -102,9 +103,10 @@ __all__ = [
"VendorTheme",
# Content
"ContentPage",
- # Customer
+ # Customer & Auth
"Customer",
"CustomerAddress",
+ "PasswordResetToken",
# Email
"EmailCategory",
"EmailLog",
diff --git a/models/database/password_reset_token.py b/models/database/password_reset_token.py
new file mode 100644
index 00000000..b46cef5c
--- /dev/null
+++ b/models/database/password_reset_token.py
@@ -0,0 +1,85 @@
+import hashlib
+import secrets
+from datetime import datetime, timedelta
+
+from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
+from sqlalchemy.orm import Session, relationship
+
+from app.core.database import Base
+
+
+class PasswordResetToken(Base):
+ """Password reset token for customer accounts.
+
+ Security:
+ - Tokens are stored as SHA256 hashes, not plaintext
+ - Tokens expire after 1 hour
+ - Only one active token per customer (old tokens invalidated on new request)
+ """
+
+ __tablename__ = "password_reset_tokens"
+
+ # Token expiry in hours
+ TOKEN_EXPIRY_HOURS = 1
+
+ id = Column(Integer, primary_key=True, index=True)
+ customer_id = Column(Integer, ForeignKey("customers.id", ondelete="CASCADE"), nullable=False)
+ token_hash = Column(String(64), nullable=False, index=True)
+ expires_at = Column(DateTime, nullable=False)
+ used_at = Column(DateTime, nullable=True)
+ created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
+
+ # Relationships
+ customer = relationship("Customer")
+
+ def __repr__(self):
+ return f""
+
+ @staticmethod
+ def hash_token(token: str) -> str:
+ """Hash a token using SHA256."""
+ return hashlib.sha256(token.encode()).hexdigest()
+
+ @classmethod
+ def create_for_customer(cls, db: Session, customer_id: int) -> str:
+ """Create a new password reset token for a customer.
+
+ Invalidates any existing tokens for the customer.
+ Returns the plaintext token (to be sent via email).
+ """
+ # Invalidate existing tokens for this customer
+ db.query(cls).filter(
+ cls.customer_id == customer_id,
+ cls.used_at.is_(None),
+ ).delete()
+
+ # Generate new token
+ plaintext_token = secrets.token_urlsafe(32)
+ token_hash = cls.hash_token(plaintext_token)
+
+ # Create token record
+ token = cls(
+ customer_id=customer_id,
+ token_hash=token_hash,
+ expires_at=datetime.utcnow() + timedelta(hours=cls.TOKEN_EXPIRY_HOURS),
+ )
+ db.add(token)
+ db.flush()
+
+ return plaintext_token
+
+ @classmethod
+ def find_valid_token(cls, db: Session, plaintext_token: str) -> "PasswordResetToken | None":
+ """Find a valid (not expired, not used) token."""
+ token_hash = cls.hash_token(plaintext_token)
+
+ return db.query(cls).filter(
+ cls.token_hash == token_hash,
+ cls.expires_at > datetime.utcnow(),
+ cls.used_at.is_(None),
+ ).first()
+
+ def mark_used(self, db: Session) -> None:
+ """Mark this token as used."""
+ self.used_at = datetime.utcnow()
+ db.flush()
diff --git a/scripts/seed_email_templates.py b/scripts/seed_email_templates.py
index d9ed0001..3871103d 100644
--- a/scripts/seed_email_templates.py
+++ b/scripts/seed_email_templates.py
@@ -653,6 +653,249 @@ Liwweradress:
Dir kritt eng weider E-Mail wann Ăr Bestellung verschĂ©ckt gĂ«tt.
Merci fir Ăren Akaf!
+""",
+ },
+ # -------------------------------------------------------------------------
+ # PASSWORD RESET
+ # -------------------------------------------------------------------------
+ {
+ "code": "password_reset",
+ "language": "en",
+ "name": "Password Reset",
+ "description": "Sent to customers when they request a password reset",
+ "category": EmailCategory.AUTH.value,
+ "variables": json.dumps([
+ "customer_name", "reset_link", "expiry_hours"
+ ]),
+ "subject": "Reset Your Password",
+ "body_html": """
+
+
+
+
+
+
+
+
Reset Your Password
+
+
+
+
Hi {{ customer_name }},
+
+
We received a request to reset your password. Click the button below to create a new password:
+
+
+
+
+ This link will expire in {{ expiry_hours }} hour(s). If you didn't request this password reset, you can safely ignore this email.
+
+
+
+ If the button doesn't work, copy and paste this link into your browser:
+ {{ reset_link }}
+
+
+
Best regards,The Team
+
+
+
+
This is an automated email. Please do not reply directly.
+
+
+""",
+ "body_text": """Reset Your Password
+
+Hi {{ customer_name }},
+
+We received a request to reset your password. Click the link below to create a new password:
+
+{{ reset_link }}
+
+This link will expire in {{ expiry_hours }} hour(s). If you didn't request this password reset, you can safely ignore this email.
+
+Best regards,
+The Team
+""",
+ },
+ {
+ "code": "password_reset",
+ "language": "fr",
+ "name": "Reinitialisation du mot de passe",
+ "description": "Envoye aux clients lorsqu'ils demandent une reinitialisation de mot de passe",
+ "category": EmailCategory.AUTH.value,
+ "variables": json.dumps([
+ "customer_name", "reset_link", "expiry_hours"
+ ]),
+ "subject": "Reinitialiser votre mot de passe",
+ "body_html": """
+
+
+
+
+
+
+
+
Reinitialiser votre mot de passe
+
+
+
+
Bonjour {{ customer_name }},
+
+
Nous avons recu une demande de reinitialisation de votre mot de passe. Cliquez sur le bouton ci-dessous pour creer un nouveau mot de passe :
+
+
+
+
+ Ce lien expirera dans {{ expiry_hours }} heure(s). Si vous n'avez pas demande cette reinitialisation, vous pouvez ignorer cet email.
+
+
+
+ Si le bouton ne fonctionne pas, copiez et collez ce lien dans votre navigateur :
+ {{ reset_link }}
+
+
+
Cordialement,L'equipe
+
+
+""",
+ "body_text": """Reinitialiser votre mot de passe
+
+Bonjour {{ customer_name }},
+
+Nous avons recu une demande de reinitialisation de votre mot de passe. Cliquez sur le lien ci-dessous pour creer un nouveau mot de passe :
+
+{{ reset_link }}
+
+Ce lien expirera dans {{ expiry_hours }} heure(s). Si vous n'avez pas demande cette reinitialisation, vous pouvez ignorer cet email.
+
+Cordialement,
+L'equipe
+""",
+ },
+ {
+ "code": "password_reset",
+ "language": "de",
+ "name": "Passwort zurucksetzen",
+ "description": "An Kunden gesendet, wenn sie eine Passwortzurucksetzung anfordern",
+ "category": EmailCategory.AUTH.value,
+ "variables": json.dumps([
+ "customer_name", "reset_link", "expiry_hours"
+ ]),
+ "subject": "Passwort zurucksetzen",
+ "body_html": """
+
+
+
+
+
+
+
+
Passwort zurucksetzen
+
+
+
+
Hallo {{ customer_name }},
+
+
Wir haben eine Anfrage zur Zurucksetzung Ihres Passworts erhalten. Klicken Sie auf die Schaltflache unten, um ein neues Passwort zu erstellen:
+
+
+
+
+ Dieser Link lauft in {{ expiry_hours }} Stunde(n) ab. Wenn Sie diese Passwortzurucksetzung nicht angefordert haben, konnen Sie diese E-Mail ignorieren.
+
+
+
+ Wenn die Schaltflache nicht funktioniert, kopieren Sie diesen Link in Ihren Browser:
+ {{ reset_link }}
+
+
+
Mit freundlichen Grussen,Das Team
+
+
+""",
+ "body_text": """Passwort zurucksetzen
+
+Hallo {{ customer_name }},
+
+Wir haben eine Anfrage zur Zurucksetzung Ihres Passworts erhalten. Klicken Sie auf den Link unten, um ein neues Passwort zu erstellen:
+
+{{ reset_link }}
+
+Dieser Link lauft in {{ expiry_hours }} Stunde(n) ab. Wenn Sie diese Passwortzurucksetzung nicht angefordert haben, konnen Sie diese E-Mail ignorieren.
+
+Mit freundlichen Grussen,
+Das Team
+""",
+ },
+ {
+ "code": "password_reset",
+ "language": "lb",
+ "name": "Passwuert zrecksetzen",
+ "description": "Un Clienten gescheckt wann si eng Passwuertzrecksetzung ufroen",
+ "category": EmailCategory.AUTH.value,
+ "variables": json.dumps([
+ "customer_name", "reset_link", "expiry_hours"
+ ]),
+ "subject": "Passwuert zrecksetzen",
+ "body_html": """
+
+
+
+
+
+
+
+
Passwuert zrecksetzen
+
+
+
+
Moien {{ customer_name }},
+
+
Mir hunn eng Ufro kritt fir Aert Passwuert zreckzesetzen. Klickt op de KnÀppchen hei drënner fir en neit Passwuert ze kreéieren:
+
+
+
+
+ Dëse Link leeft an {{ expiry_hours }} Stonn(en) of. Wann Dir dës Passwuertzrecksetzung net ugefrot hutt, kënnt Dir dës E-Mail ignoréieren.
+
+
+
+ Wann de KnĂ€ppchen net fonctionnĂ©iert, kopĂ©iert dĂ«se Link an Ăre Browser:
+ {{ reset_link }}
+
+
+
Mat beschte Gréiss,D'Team
+
+
+""",
+ "body_text": """Passwuert zrecksetzen
+
+Moien {{ customer_name }},
+
+Mir hunn eng Ufro kritt fir Aert Passwuert zreckzesetzen. Klickt op de Link hei drënner fir en neit Passwuert ze kreéieren:
+
+{{ reset_link }}
+
+Dëse Link leeft an {{ expiry_hours }} Stonn(en) of. Wann Dir dës Passwuertzrecksetzung net ugefrot hutt, kënnt Dir dës E-Mail ignoréieren.
+
+Mat beschte Gréiss,
+D'Team
""",
},
]
diff --git a/tests/integration/api/v1/shop/test_password_reset.py b/tests/integration/api/v1/shop/test_password_reset.py
new file mode 100644
index 00000000..a2640b83
--- /dev/null
+++ b/tests/integration/api/v1/shop/test_password_reset.py
@@ -0,0 +1,451 @@
+# tests/integration/api/v1/shop/test_password_reset.py
+"""Integration tests for shop password reset API endpoints.
+
+Tests the /api/v1/shop/auth/forgot-password and /api/v1/shop/auth/reset-password endpoints.
+"""
+
+from datetime import UTC, datetime, timedelta
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from models.database.customer import Customer
+from models.database.password_reset_token import PasswordResetToken
+
+
+@pytest.fixture
+def shop_customer(db, test_vendor):
+ """Create a test customer for shop API tests."""
+ from middleware.auth import AuthManager
+
+ auth_manager = AuthManager()
+ customer = Customer(
+ vendor_id=test_vendor.id,
+ email="customer@example.com",
+ hashed_password=auth_manager.hash_password("oldpassword123"),
+ first_name="Test",
+ last_name="Customer",
+ customer_number="CUST001",
+ is_active=True,
+ )
+ db.add(customer)
+ db.commit()
+ db.refresh(customer)
+ return customer
+
+
+@pytest.fixture
+def inactive_customer(db, test_vendor):
+ """Create an inactive customer for testing."""
+ from middleware.auth import AuthManager
+
+ auth_manager = AuthManager()
+ customer = Customer(
+ vendor_id=test_vendor.id,
+ email="inactive@example.com",
+ hashed_password=auth_manager.hash_password("password123"),
+ first_name="Inactive",
+ last_name="Customer",
+ customer_number="CUST002",
+ is_active=False,
+ )
+ db.add(customer)
+ db.commit()
+ db.refresh(customer)
+ return customer
+
+
+@pytest.fixture
+def valid_reset_token(db, shop_customer):
+ """Create a valid password reset token."""
+ token = PasswordResetToken.create_for_customer(db, shop_customer.id)
+ db.commit()
+ return token
+
+
+@pytest.fixture
+def expired_reset_token(db, shop_customer):
+ """Create an expired password reset token."""
+ import secrets
+
+ token = secrets.token_urlsafe(32)
+ token_hash = PasswordResetToken.hash_token(token)
+
+ reset_token = PasswordResetToken(
+ customer_id=shop_customer.id,
+ token_hash=token_hash,
+ expires_at=datetime.utcnow() - timedelta(hours=2), # Already expired
+ )
+ db.add(reset_token)
+ db.commit()
+ return token
+
+
+@pytest.fixture
+def used_reset_token(db, shop_customer):
+ """Create a used password reset token."""
+ import secrets
+
+ token = secrets.token_urlsafe(32)
+ token_hash = PasswordResetToken.hash_token(token)
+
+ reset_token = PasswordResetToken(
+ customer_id=shop_customer.id,
+ token_hash=token_hash,
+ expires_at=datetime.utcnow() + timedelta(hours=1),
+ used_at=datetime.utcnow(), # Already used
+ )
+ db.add(reset_token)
+ db.commit()
+ return token
+
+
+@pytest.mark.integration
+@pytest.mark.api
+@pytest.mark.shop
+class TestForgotPasswordAPI:
+ """Test forgot password endpoint at /api/v1/shop/auth/forgot-password."""
+
+ def test_forgot_password_existing_customer(
+ self, client, db, test_vendor, shop_customer
+ ):
+ """Test password reset request for existing customer."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ # Mock email service to avoid actual email sending
+ with patch("app.api.v1.shop.auth.EmailService") as mock_email_service:
+ mock_instance = MagicMock()
+ mock_email_service.return_value = mock_instance
+
+ response = client.post(
+ "/api/v1/shop/auth/forgot-password",
+ params={"email": shop_customer.email},
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ assert "password reset link has been sent" in data["message"].lower()
+
+ # Verify email was sent
+ mock_instance.send_template.assert_called_once()
+ call_kwargs = mock_instance.send_template.call_args.kwargs
+ assert call_kwargs["template_code"] == "password_reset"
+ assert call_kwargs["to_email"] == shop_customer.email
+
+ def test_forgot_password_nonexistent_email(self, client, db, test_vendor):
+ """Test password reset request for non-existent email (same response)."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ response = client.post(
+ "/api/v1/shop/auth/forgot-password",
+ params={"email": "nonexistent@example.com"},
+ )
+
+ # Should return same success message to prevent email enumeration
+ assert response.status_code == 200
+ data = response.json()
+ assert "password reset link has been sent" in data["message"].lower()
+
+ def test_forgot_password_inactive_customer(
+ self, client, db, test_vendor, inactive_customer
+ ):
+ """Test password reset request for inactive customer."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ response = client.post(
+ "/api/v1/shop/auth/forgot-password",
+ params={"email": inactive_customer.email},
+ )
+
+ # Should return same success message (inactive customers can't reset)
+ assert response.status_code == 200
+ data = response.json()
+ assert "password reset link has been sent" in data["message"].lower()
+
+ def test_forgot_password_creates_token(
+ self, client, db, test_vendor, shop_customer
+ ):
+ """Test that forgot password creates a token in the database."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ with patch("app.api.v1.shop.auth.EmailService"):
+ response = client.post(
+ "/api/v1/shop/auth/forgot-password",
+ params={"email": shop_customer.email},
+ )
+
+ assert response.status_code == 200
+
+ # Verify token was created
+ token = (
+ db.query(PasswordResetToken)
+ .filter(PasswordResetToken.customer_id == shop_customer.id)
+ .first()
+ )
+ assert token is not None
+ assert token.used_at is None
+ assert token.expires_at > datetime.utcnow()
+
+ def test_forgot_password_invalidates_old_tokens(
+ self, client, db, test_vendor, shop_customer, valid_reset_token
+ ):
+ """Test that requesting new token invalidates old ones."""
+ # Get the old token record
+ old_token_count = (
+ db.query(PasswordResetToken)
+ .filter(
+ PasswordResetToken.customer_id == shop_customer.id,
+ PasswordResetToken.used_at.is_(None),
+ )
+ .count()
+ )
+ assert old_token_count == 1
+
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ with patch("app.api.v1.shop.auth.EmailService"):
+ response = client.post(
+ "/api/v1/shop/auth/forgot-password",
+ params={"email": shop_customer.email},
+ )
+
+ assert response.status_code == 200
+
+ # Old token should be deleted, new one created
+ new_token_count = (
+ db.query(PasswordResetToken)
+ .filter(
+ PasswordResetToken.customer_id == shop_customer.id,
+ PasswordResetToken.used_at.is_(None),
+ )
+ .count()
+ )
+ assert new_token_count == 1
+
+
+@pytest.mark.integration
+@pytest.mark.api
+@pytest.mark.shop
+class TestResetPasswordAPI:
+ """Test reset password endpoint at /api/v1/shop/auth/reset-password."""
+
+ def test_reset_password_success(
+ self, client, db, test_vendor, shop_customer, valid_reset_token
+ ):
+ """Test successful password reset."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ new_password = "newpassword123"
+ response = client.post(
+ "/api/v1/shop/auth/reset-password",
+ params={
+ "reset_token": valid_reset_token,
+ "new_password": new_password,
+ },
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ assert "password reset successfully" in data["message"].lower()
+
+ # Verify password was changed
+ db.refresh(shop_customer)
+ from middleware.auth import AuthManager
+
+ auth_manager = AuthManager()
+ assert auth_manager.verify_password(
+ new_password, shop_customer.hashed_password
+ )
+
+ def test_reset_password_token_marked_used(
+ self, client, db, test_vendor, shop_customer, valid_reset_token
+ ):
+ """Test that token is marked as used after successful reset."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ response = client.post(
+ "/api/v1/shop/auth/reset-password",
+ params={
+ "reset_token": valid_reset_token,
+ "new_password": "newpassword123",
+ },
+ )
+
+ assert response.status_code == 200
+
+ # Verify token is marked as used
+ token_record = (
+ db.query(PasswordResetToken)
+ .filter(PasswordResetToken.customer_id == shop_customer.id)
+ .first()
+ )
+ assert token_record.used_at is not None
+
+ def test_reset_password_invalid_token(self, client, db, test_vendor):
+ """Test reset with invalid token."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ response = client.post(
+ "/api/v1/shop/auth/reset-password",
+ params={
+ "reset_token": "invalid_token_12345",
+ "new_password": "newpassword123",
+ },
+ )
+
+ assert response.status_code == 400
+
+ def test_reset_password_expired_token(
+ self, client, db, test_vendor, shop_customer, expired_reset_token
+ ):
+ """Test reset with expired token."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ response = client.post(
+ "/api/v1/shop/auth/reset-password",
+ params={
+ "reset_token": expired_reset_token,
+ "new_password": "newpassword123",
+ },
+ )
+
+ assert response.status_code == 400
+
+ def test_reset_password_used_token(
+ self, client, db, test_vendor, shop_customer, used_reset_token
+ ):
+ """Test reset with already used token."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ response = client.post(
+ "/api/v1/shop/auth/reset-password",
+ params={
+ "reset_token": used_reset_token,
+ "new_password": "newpassword123",
+ },
+ )
+
+ assert response.status_code == 400
+
+ def test_reset_password_short_password(
+ self, client, db, test_vendor, shop_customer, valid_reset_token
+ ):
+ """Test reset with password that's too short."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ response = client.post(
+ "/api/v1/shop/auth/reset-password",
+ params={
+ "reset_token": valid_reset_token,
+ "new_password": "short", # Less than 8 chars
+ },
+ )
+
+ assert response.status_code == 400
+
+ def test_reset_password_cannot_reuse_token(
+ self, client, db, test_vendor, shop_customer, valid_reset_token
+ ):
+ """Test that token cannot be reused after successful reset."""
+ with patch("app.api.v1.shop.auth.getattr") as mock_getattr:
+ mock_getattr.return_value = test_vendor
+
+ # First reset should succeed
+ response1 = client.post(
+ "/api/v1/shop/auth/reset-password",
+ params={
+ "reset_token": valid_reset_token,
+ "new_password": "newpassword123",
+ },
+ )
+ assert response1.status_code == 200
+
+ # Second reset with same token should fail
+ response2 = client.post(
+ "/api/v1/shop/auth/reset-password",
+ params={
+ "reset_token": valid_reset_token,
+ "new_password": "anotherpassword123",
+ },
+ )
+ assert response2.status_code == 400
+
+
+@pytest.mark.integration
+@pytest.mark.api
+@pytest.mark.shop
+class TestPasswordResetTokenModel:
+ """Test PasswordResetToken model functionality."""
+
+ def test_token_hash_is_deterministic(self):
+ """Test that hashing the same token produces the same hash."""
+ token = "test_token_12345"
+ hash1 = PasswordResetToken.hash_token(token)
+ hash2 = PasswordResetToken.hash_token(token)
+ assert hash1 == hash2
+
+ def test_different_tokens_produce_different_hashes(self):
+ """Test that different tokens produce different hashes."""
+ hash1 = PasswordResetToken.hash_token("token1")
+ hash2 = PasswordResetToken.hash_token("token2")
+ assert hash1 != hash2
+
+ def test_create_for_customer_returns_plaintext(self, db, shop_customer):
+ """Test that create_for_customer returns plaintext token."""
+ token = PasswordResetToken.create_for_customer(db, shop_customer.id)
+ db.commit()
+
+ # Token should be URL-safe base64
+ assert token is not None
+ assert len(token) > 20 # secrets.token_urlsafe(32) produces ~43 chars
+
+ def test_find_valid_token_works_with_plaintext(self, db, shop_customer):
+ """Test that find_valid_token works with plaintext token."""
+ plaintext_token = PasswordResetToken.create_for_customer(db, shop_customer.id)
+ db.commit()
+
+ found = PasswordResetToken.find_valid_token(db, plaintext_token)
+ assert found is not None
+ assert found.customer_id == shop_customer.id
+
+ def test_find_valid_token_returns_none_for_invalid(self, db):
+ """Test that find_valid_token returns None for invalid token."""
+ found = PasswordResetToken.find_valid_token(db, "invalid_token")
+ assert found is None
+
+ def test_mark_used_sets_timestamp(self, db, shop_customer):
+ """Test that mark_used sets the used_at timestamp."""
+ plaintext_token = PasswordResetToken.create_for_customer(db, shop_customer.id)
+ db.commit()
+
+ token_record = PasswordResetToken.find_valid_token(db, plaintext_token)
+ assert token_record.used_at is None
+
+ token_record.mark_used(db)
+ db.commit()
+
+ assert token_record.used_at is not None
+
+ def test_used_token_not_found_by_find_valid(self, db, shop_customer):
+ """Test that used tokens are not returned by find_valid_token."""
+ plaintext_token = PasswordResetToken.create_for_customer(db, shop_customer.id)
+ db.commit()
+
+ token_record = PasswordResetToken.find_valid_token(db, plaintext_token)
+ token_record.mark_used(db)
+ db.commit()
+
+ # Should not find the used token
+ found = PasswordResetToken.find_valid_token(db, plaintext_token)
+ assert found is None