Files
orion/app/modules/loyalty/services/pin_service.py
Samir Boulahtit b5a803cde8 feat(loyalty): implement complete loyalty module MVP
Add stamp-based and points-based loyalty programs for vendors with:

Database Models (5 tables):
- loyalty_programs: Vendor program configuration
- loyalty_cards: Customer cards with stamp/point balances
- loyalty_transactions: Immutable audit log
- staff_pins: Fraud prevention PINs (bcrypt hashed)
- apple_device_registrations: Apple Wallet push tokens

Services:
- program_service: Program CRUD and statistics
- card_service: Customer enrollment and card lookup
- stamp_service: Stamp operations with anti-fraud checks
- points_service: Points earning and redemption
- pin_service: Staff PIN management with lockout
- wallet_service: Unified wallet abstraction
- google_wallet_service: Google Wallet API integration
- apple_wallet_service: Apple Wallet .pkpass generation

API Routes:
- Admin: /api/v1/admin/loyalty/* (programs list, stats)
- Vendor: /api/v1/vendor/loyalty/* (stamp, points, cards, PINs)
- Public: /api/v1/loyalty/* (enrollment, Apple Web Service)

Anti-Fraud Features:
- Staff PIN verification (configurable per program)
- Cooldown period between stamps (default 15 min)
- Daily stamp limits (default 5/day)
- PIN lockout after failed attempts

Wallet Integration:
- Google Wallet: LoyaltyClass and LoyaltyObject management
- Apple Wallet: .pkpass generation with PKCS#7 signing
- Apple Web Service endpoints for device registration/updates

Also includes:
- Alembic migration for all tables with indexes
- Localization files (en, fr, de, lu)
- Module documentation
- Phase 2 interface and user journey plan

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 23:04:00 +01:00

282 lines
7.4 KiB
Python

# app/modules/loyalty/services/pin_service.py
"""
Staff PIN service.
Handles PIN operations including:
- PIN creation and management
- PIN verification with lockout
- PIN security (failed attempts, lockout)
"""
import logging
from datetime import UTC, datetime
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
InvalidStaffPinException,
StaffPinLockedException,
StaffPinNotFoundException,
)
from app.modules.loyalty.models import StaffPin
from app.modules.loyalty.schemas.pin import PinCreate, PinUpdate
logger = logging.getLogger(__name__)
class PinService:
"""Service for staff PIN operations."""
# =========================================================================
# Read Operations
# =========================================================================
def get_pin(self, db: Session, pin_id: int) -> StaffPin | None:
"""Get a staff PIN by ID."""
return db.query(StaffPin).filter(StaffPin.id == pin_id).first()
def get_pin_by_staff_id(
self,
db: Session,
program_id: int,
staff_id: str,
) -> StaffPin | None:
"""Get a staff PIN by employee ID."""
return (
db.query(StaffPin)
.filter(
StaffPin.program_id == program_id,
StaffPin.staff_id == staff_id,
)
.first()
)
def require_pin(self, db: Session, pin_id: int) -> StaffPin:
"""Get a PIN or raise exception if not found."""
pin = self.get_pin(db, pin_id)
if not pin:
raise StaffPinNotFoundException(str(pin_id))
return pin
def list_pins(
self,
db: Session,
program_id: int,
*,
is_active: bool | None = None,
) -> list[StaffPin]:
"""List all staff PINs for a program."""
query = db.query(StaffPin).filter(StaffPin.program_id == program_id)
if is_active is not None:
query = query.filter(StaffPin.is_active == is_active)
return query.order_by(StaffPin.name).all()
# =========================================================================
# Write Operations
# =========================================================================
def create_pin(
self,
db: Session,
program_id: int,
vendor_id: int,
data: PinCreate,
) -> StaffPin:
"""
Create a new staff PIN.
Args:
db: Database session
program_id: Program ID
vendor_id: Vendor ID
data: PIN creation data
Returns:
Created PIN
"""
pin = StaffPin(
program_id=program_id,
vendor_id=vendor_id,
name=data.name,
staff_id=data.staff_id,
)
pin.set_pin(data.pin)
db.add(pin)
db.commit()
db.refresh(pin)
logger.info(f"Created staff PIN {pin.id} for '{pin.name}' in program {program_id}")
return pin
def update_pin(
self,
db: Session,
pin_id: int,
data: PinUpdate,
) -> StaffPin:
"""
Update a staff PIN.
Args:
db: Database session
pin_id: PIN ID
data: Update data
Returns:
Updated PIN
"""
pin = self.require_pin(db, pin_id)
if data.name is not None:
pin.name = data.name
if data.staff_id is not None:
pin.staff_id = data.staff_id
if data.pin is not None:
pin.set_pin(data.pin)
# Reset lockout when PIN is changed
pin.failed_attempts = 0
pin.locked_until = None
if data.is_active is not None:
pin.is_active = data.is_active
db.commit()
db.refresh(pin)
logger.info(f"Updated staff PIN {pin_id}")
return pin
def delete_pin(self, db: Session, pin_id: int) -> None:
"""Delete a staff PIN."""
pin = self.require_pin(db, pin_id)
program_id = pin.program_id
db.delete(pin)
db.commit()
logger.info(f"Deleted staff PIN {pin_id} from program {program_id}")
def unlock_pin(self, db: Session, pin_id: int) -> StaffPin:
"""Unlock a locked staff PIN."""
pin = self.require_pin(db, pin_id)
pin.unlock()
db.commit()
db.refresh(pin)
logger.info(f"Unlocked staff PIN {pin_id}")
return pin
# =========================================================================
# Verification
# =========================================================================
def verify_pin(
self,
db: Session,
program_id: int,
plain_pin: str,
) -> StaffPin:
"""
Verify a staff PIN.
Checks all active PINs for the program and returns the matching one.
Args:
db: Database session
program_id: Program ID
plain_pin: Plain text PIN to verify
Returns:
Verified StaffPin object
Raises:
InvalidStaffPinException: PIN is invalid
StaffPinLockedException: PIN is locked
"""
# Get all active PINs for the program
pins = self.list_pins(db, program_id, is_active=True)
if not pins:
raise InvalidStaffPinException()
# Try each PIN
for pin in pins:
# Check if locked
if pin.is_locked:
continue
# Verify PIN
if pin.verify_pin(plain_pin):
# Success - record it
pin.record_success()
db.commit()
logger.debug(f"PIN verified for '{pin.name}' in program {program_id}")
return pin
# No match found - record failed attempt on all unlocked PINs
# This is a simplified approach; in production you might want to
# track which PIN was attempted based on additional context
locked_pin = None
remaining = None
for pin in pins:
if not pin.is_locked:
is_now_locked = pin.record_failed_attempt(
max_attempts=config.pin_max_failed_attempts,
lockout_minutes=config.pin_lockout_minutes,
)
if is_now_locked:
locked_pin = pin
else:
remaining = pin.remaining_attempts
db.commit()
# If a PIN just got locked, raise that specific error
if locked_pin:
raise StaffPinLockedException(locked_pin.locked_until.isoformat())
raise InvalidStaffPinException(remaining)
def find_matching_pin(
self,
db: Session,
program_id: int,
plain_pin: str,
) -> StaffPin | None:
"""
Find a matching PIN without recording attempts.
Useful for checking PIN validity without side effects.
Args:
db: Database session
program_id: Program ID
plain_pin: Plain text PIN to check
Returns:
Matching StaffPin or None
"""
pins = self.list_pins(db, program_id, is_active=True)
for pin in pins:
if not pin.is_locked and pin.verify_pin(plain_pin):
return pin
return None
# Singleton instance
pin_service = PinService()