Some checks failed
Security: - Fix TOCTOU race conditions: move balance/limit checks after row lock in redeem_points, add_stamp, redeem_stamps - Add PIN ownership verification to update/delete/unlock store routes - Gate adjust_points endpoint to merchant_owner role only Data integrity: - Track total_points_voided in void_points - Add order_reference idempotency guard in earn_points Correctness: - Fix LoyaltyProgramAlreadyExistsException to use merchant_id parameter - Add StorefrontProgramResponse excluding wallet IDs from public API - Add bounds (±100000) to PointsAdjustRequest.points_delta Audit & config: - Add CARD_REACTIVATED transaction type with audit record - Improve admin audit logging with actor identity and old values - Use merchant-specific PIN lockout settings with global fallback - Guard MerchantLoyaltySettings creation with get_or_create pattern Tests: 27 new tests (265 total) covering all 12 items — unit and integration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
368 lines
10 KiB
Python
368 lines
10 KiB
Python
# app/modules/loyalty/services/pin_service.py
|
|
"""
|
|
Staff PIN service.
|
|
|
|
Merchant-based PIN operations:
|
|
- PINs belong to a merchant's loyalty program
|
|
- Each store (location) has its own set of staff PINs
|
|
- Staff can only use PINs at their assigned location
|
|
|
|
Handles PIN operations including:
|
|
- PIN creation and management
|
|
- PIN verification with lockout (per store)
|
|
- PIN security (failed attempts, lockout)
|
|
"""
|
|
|
|
import logging
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.modules.loyalty.config import config
|
|
from app.modules.loyalty.exceptions import (
|
|
InvalidStaffPinException,
|
|
StaffPinLockedException,
|
|
StaffPinNotFoundException,
|
|
)
|
|
from app.modules.loyalty.models import StaffPin
|
|
from app.modules.loyalty.schemas.pin import PinCreate, PinUpdate
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PinService:
|
|
"""Service for staff PIN operations."""
|
|
|
|
# =========================================================================
|
|
# Read Operations
|
|
# =========================================================================
|
|
|
|
def get_pin(self, db: Session, pin_id: int) -> StaffPin | None:
|
|
"""Get a staff PIN by ID."""
|
|
return db.query(StaffPin).filter(StaffPin.id == pin_id).first()
|
|
|
|
def get_pin_by_staff_id(
|
|
self,
|
|
db: Session,
|
|
program_id: int,
|
|
staff_id: str,
|
|
*,
|
|
store_id: int | None = None,
|
|
) -> StaffPin | None:
|
|
"""Get a staff PIN by employee ID."""
|
|
query = db.query(StaffPin).filter(
|
|
StaffPin.program_id == program_id,
|
|
StaffPin.staff_id == staff_id,
|
|
)
|
|
if store_id:
|
|
query = query.filter(StaffPin.store_id == store_id)
|
|
return query.first()
|
|
|
|
def require_pin(self, db: Session, pin_id: int) -> StaffPin:
|
|
"""Get a PIN or raise exception if not found."""
|
|
pin = self.get_pin(db, pin_id)
|
|
if not pin:
|
|
raise StaffPinNotFoundException(str(pin_id))
|
|
return pin
|
|
|
|
def list_pins(
|
|
self,
|
|
db: Session,
|
|
program_id: int,
|
|
*,
|
|
store_id: int | None = None,
|
|
is_active: bool | None = None,
|
|
) -> list[StaffPin]:
|
|
"""
|
|
List staff PINs for a program.
|
|
|
|
Args:
|
|
db: Database session
|
|
program_id: Program ID
|
|
store_id: Optional filter by store (location)
|
|
is_active: Filter by active status
|
|
|
|
Returns:
|
|
List of StaffPin objects
|
|
"""
|
|
query = db.query(StaffPin).filter(StaffPin.program_id == program_id)
|
|
|
|
if store_id is not None:
|
|
query = query.filter(StaffPin.store_id == store_id)
|
|
|
|
if is_active is not None:
|
|
query = query.filter(StaffPin.is_active == is_active)
|
|
|
|
return query.order_by(StaffPin.name).all()
|
|
|
|
def list_pins_for_merchant(
|
|
self,
|
|
db: Session,
|
|
merchant_id: int,
|
|
*,
|
|
store_id: int | None = None,
|
|
is_active: bool | None = None,
|
|
) -> list[StaffPin]:
|
|
"""
|
|
List staff PINs for a merchant.
|
|
|
|
Args:
|
|
db: Database session
|
|
merchant_id: Merchant ID
|
|
store_id: Optional filter by store (location)
|
|
is_active: Filter by active status
|
|
|
|
Returns:
|
|
List of StaffPin objects
|
|
"""
|
|
query = db.query(StaffPin).filter(StaffPin.merchant_id == merchant_id)
|
|
|
|
if store_id is not None:
|
|
query = query.filter(StaffPin.store_id == store_id)
|
|
|
|
if is_active is not None:
|
|
query = query.filter(StaffPin.is_active == is_active)
|
|
|
|
return query.order_by(StaffPin.store_id, StaffPin.name).all()
|
|
|
|
# =========================================================================
|
|
# Write Operations
|
|
# =========================================================================
|
|
|
|
def create_pin(
|
|
self,
|
|
db: Session,
|
|
program_id: int,
|
|
store_id: int,
|
|
data: PinCreate,
|
|
) -> StaffPin:
|
|
"""
|
|
Create a new staff PIN.
|
|
|
|
Args:
|
|
db: Database session
|
|
program_id: Program ID
|
|
store_id: Store ID (location where staff works)
|
|
data: PIN creation data
|
|
|
|
Returns:
|
|
Created PIN
|
|
"""
|
|
from app.modules.loyalty.models import LoyaltyProgram
|
|
|
|
# Get merchant_id from program
|
|
program = db.query(LoyaltyProgram).filter(LoyaltyProgram.id == program_id).first()
|
|
if not program:
|
|
raise StaffPinNotFoundException(f"program:{program_id}")
|
|
|
|
pin = StaffPin(
|
|
merchant_id=program.merchant_id,
|
|
program_id=program_id,
|
|
store_id=store_id,
|
|
name=data.name,
|
|
staff_id=data.staff_id,
|
|
)
|
|
pin.set_pin(data.pin)
|
|
|
|
db.add(pin)
|
|
db.commit()
|
|
db.refresh(pin)
|
|
|
|
logger.info(
|
|
f"Created staff PIN {pin.id} for '{pin.name}' at store {store_id}"
|
|
)
|
|
|
|
return pin
|
|
|
|
def update_pin(
|
|
self,
|
|
db: Session,
|
|
pin_id: int,
|
|
data: PinUpdate,
|
|
) -> StaffPin:
|
|
"""
|
|
Update a staff PIN.
|
|
|
|
Args:
|
|
db: Database session
|
|
pin_id: PIN ID
|
|
data: Update data
|
|
|
|
Returns:
|
|
Updated PIN
|
|
"""
|
|
pin = self.require_pin(db, pin_id)
|
|
|
|
if data.name is not None:
|
|
pin.name = data.name
|
|
|
|
if data.staff_id is not None:
|
|
pin.staff_id = data.staff_id
|
|
|
|
if data.pin is not None:
|
|
pin.set_pin(data.pin)
|
|
# Reset lockout when PIN is changed
|
|
pin.failed_attempts = 0
|
|
pin.locked_until = None
|
|
|
|
if data.is_active is not None:
|
|
pin.is_active = data.is_active
|
|
|
|
db.commit()
|
|
db.refresh(pin)
|
|
|
|
logger.info(f"Updated staff PIN {pin_id}")
|
|
|
|
return pin
|
|
|
|
def delete_pin(self, db: Session, pin_id: int) -> None:
|
|
"""Delete a staff PIN."""
|
|
pin = self.require_pin(db, pin_id)
|
|
store_id = pin.store_id
|
|
|
|
db.delete(pin)
|
|
db.commit()
|
|
|
|
logger.info(f"Deleted staff PIN {pin_id} from store {store_id}")
|
|
|
|
def unlock_pin(self, db: Session, pin_id: int) -> StaffPin:
|
|
"""Unlock a locked staff PIN."""
|
|
pin = self.require_pin(db, pin_id)
|
|
pin.unlock()
|
|
db.commit()
|
|
db.refresh(pin)
|
|
|
|
logger.info(f"Unlocked staff PIN {pin_id}")
|
|
|
|
return pin
|
|
|
|
# =========================================================================
|
|
# Verification
|
|
# =========================================================================
|
|
|
|
def verify_pin(
|
|
self,
|
|
db: Session,
|
|
program_id: int,
|
|
plain_pin: str,
|
|
*,
|
|
store_id: int | None = None,
|
|
) -> StaffPin:
|
|
"""
|
|
Verify a staff PIN.
|
|
|
|
For merchant-wide programs, if store_id is provided, only checks
|
|
PINs assigned to that store. This ensures staff can only use
|
|
their PIN at their assigned location.
|
|
|
|
Args:
|
|
db: Database session
|
|
program_id: Program ID
|
|
plain_pin: Plain text PIN to verify
|
|
store_id: Optional store ID to restrict PIN lookup
|
|
|
|
Returns:
|
|
Verified StaffPin object
|
|
|
|
Raises:
|
|
InvalidStaffPinException: PIN is invalid
|
|
StaffPinLockedException: PIN is locked
|
|
"""
|
|
# Get active PINs (optionally filtered by store)
|
|
pins = self.list_pins(db, program_id, store_id=store_id, is_active=True)
|
|
|
|
if not pins:
|
|
raise InvalidStaffPinException()
|
|
|
|
# Try each PIN
|
|
for pin in pins:
|
|
# Check if locked
|
|
if pin.is_locked:
|
|
continue
|
|
|
|
# Verify PIN
|
|
if pin.verify_pin(plain_pin):
|
|
# Success - record it
|
|
pin.record_success()
|
|
db.commit()
|
|
|
|
logger.debug(
|
|
f"PIN verified for '{pin.name}' at store {pin.store_id}"
|
|
)
|
|
|
|
return pin
|
|
|
|
# No match found - record failed attempt on the first unlocked PIN only
|
|
# This limits blast radius to 1 lockout instead of N
|
|
|
|
# Use merchant-specific settings if available, fall back to global config
|
|
max_attempts = config.pin_max_failed_attempts
|
|
lockout_minutes = config.pin_lockout_minutes
|
|
if pins:
|
|
from app.modules.loyalty.models import LoyaltyProgram
|
|
program = db.query(LoyaltyProgram).filter(LoyaltyProgram.id == program_id).first()
|
|
if program:
|
|
from app.modules.loyalty.services.program_service import (
|
|
program_service as _ps,
|
|
)
|
|
merchant_settings = _ps.get_merchant_settings(db, program.merchant_id)
|
|
if merchant_settings:
|
|
max_attempts = merchant_settings.staff_pin_lockout_attempts
|
|
lockout_minutes = merchant_settings.staff_pin_lockout_minutes
|
|
|
|
locked_pin = None
|
|
remaining = None
|
|
|
|
for pin in pins:
|
|
if not pin.is_locked:
|
|
is_now_locked = pin.record_failed_attempt(
|
|
max_attempts=max_attempts,
|
|
lockout_minutes=lockout_minutes,
|
|
)
|
|
if is_now_locked:
|
|
locked_pin = pin
|
|
else:
|
|
remaining = max(0, max_attempts - pin.failed_attempts)
|
|
break # Only record on the first unlocked PIN
|
|
|
|
db.commit()
|
|
|
|
# If a PIN just got locked, raise that specific error
|
|
if locked_pin:
|
|
raise StaffPinLockedException(locked_pin.locked_until.isoformat())
|
|
|
|
raise InvalidStaffPinException(remaining)
|
|
|
|
def find_matching_pin(
|
|
self,
|
|
db: Session,
|
|
program_id: int,
|
|
plain_pin: str,
|
|
*,
|
|
store_id: int | None = None,
|
|
) -> StaffPin | None:
|
|
"""
|
|
Find a matching PIN without recording attempts.
|
|
|
|
Useful for checking PIN validity without side effects.
|
|
|
|
Args:
|
|
db: Database session
|
|
program_id: Program ID
|
|
plain_pin: Plain text PIN to check
|
|
store_id: Optional store ID to restrict lookup
|
|
|
|
Returns:
|
|
Matching StaffPin or None
|
|
"""
|
|
pins = self.list_pins(db, program_id, store_id=store_id, is_active=True)
|
|
|
|
for pin in pins:
|
|
if not pin.is_locked and pin.verify_pin(plain_pin):
|
|
return pin
|
|
|
|
return None
|
|
|
|
|
|
# Singleton instance
|
|
pin_service = PinService()
|