Move actual code implementations into module directories: - orders: 5 services, 4 models, order/invoice schemas - inventory: 3 services, 2 models, 30+ schemas - customers: 3 services, 2 models, customer schemas - messaging: 3 services, 2 models, message/notification schemas - monitoring: background_tasks_service - marketplace: 5+ services including letzshop submodule - dev_tools: code_quality_service, test_runner_service - billing: billing_service - contracts: definition.py Legacy files in app/services/, models/database/, models/schema/ now re-export from canonical module locations for backwards compatibility. Architecture validator passes with 0 errors. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
92 lines
2.8 KiB
Python
92 lines
2.8 KiB
Python
# app/modules/customers/models/password_reset_token.py
|
|
"""
|
|
Password reset token model for customer accounts.
|
|
|
|
Security features:
|
|
- Tokens are stored as SHA256 hashes, not plaintext
|
|
- Tokens expire after 1 hour
|
|
- Only one active token per customer (old tokens invalidated on new request)
|
|
"""
|
|
|
|
import hashlib
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
|
|
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
|
|
from sqlalchemy.orm import Session, relationship
|
|
|
|
from app.core.database import Base
|
|
|
|
|
|
class PasswordResetToken(Base):
|
|
"""Password reset token for customer accounts."""
|
|
|
|
__tablename__ = "password_reset_tokens"
|
|
|
|
# Token expiry in hours
|
|
TOKEN_EXPIRY_HOURS = 1
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
customer_id = Column(
|
|
Integer, ForeignKey("customers.id", ondelete="CASCADE"), nullable=False
|
|
)
|
|
token_hash = Column(String(64), nullable=False, index=True)
|
|
expires_at = Column(DateTime, nullable=False)
|
|
used_at = Column(DateTime, nullable=True)
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
|
|
# Relationships
|
|
customer = relationship("Customer")
|
|
|
|
def __repr__(self):
|
|
return f"<PasswordResetToken(id={self.id}, customer_id={self.customer_id}, expires_at={self.expires_at})>"
|
|
|
|
@staticmethod
|
|
def hash_token(token: str) -> str:
|
|
"""Hash a token using SHA256."""
|
|
return hashlib.sha256(token.encode()).hexdigest()
|
|
|
|
@classmethod
|
|
def create_for_customer(cls, db: Session, customer_id: int) -> str:
|
|
"""Create a new password reset token for a customer.
|
|
|
|
Invalidates any existing tokens for the customer.
|
|
Returns the plaintext token (to be sent via email).
|
|
"""
|
|
# Invalidate existing tokens for this customer
|
|
db.query(cls).filter(
|
|
cls.customer_id == customer_id,
|
|
cls.used_at.is_(None),
|
|
).delete()
|
|
|
|
# Generate new token
|
|
plaintext_token = secrets.token_urlsafe(32)
|
|
token_hash = cls.hash_token(plaintext_token)
|
|
|
|
# Create token record
|
|
token = cls(
|
|
customer_id=customer_id,
|
|
token_hash=token_hash,
|
|
expires_at=datetime.utcnow() + timedelta(hours=cls.TOKEN_EXPIRY_HOURS),
|
|
)
|
|
db.add(token)
|
|
db.flush()
|
|
|
|
return plaintext_token
|
|
|
|
@classmethod
|
|
def find_valid_token(cls, db: Session, plaintext_token: str) -> "PasswordResetToken | None":
|
|
"""Find a valid (not expired, not used) token."""
|
|
token_hash = cls.hash_token(plaintext_token)
|
|
|
|
return db.query(cls).filter(
|
|
cls.token_hash == token_hash,
|
|
cls.expires_at > datetime.utcnow(),
|
|
cls.used_at.is_(None),
|
|
).first()
|
|
|
|
def mark_used(self, db: Session) -> None:
|
|
"""Mark this token as used."""
|
|
self.used_at = datetime.utcnow()
|
|
db.flush()
|