feat(loyalty): implement complete loyalty module MVP

Add stamp-based and points-based loyalty programs for vendors with:

Database Models (5 tables):
- loyalty_programs: Vendor program configuration
- loyalty_cards: Customer cards with stamp/point balances
- loyalty_transactions: Immutable audit log
- staff_pins: Fraud prevention PINs (bcrypt hashed)
- apple_device_registrations: Apple Wallet push tokens

Services:
- program_service: Program CRUD and statistics
- card_service: Customer enrollment and card lookup
- stamp_service: Stamp operations with anti-fraud checks
- points_service: Points earning and redemption
- pin_service: Staff PIN management with lockout
- wallet_service: Unified wallet abstraction
- google_wallet_service: Google Wallet API integration
- apple_wallet_service: Apple Wallet .pkpass generation

API Routes:
- Admin: /api/v1/admin/loyalty/* (programs list, stats)
- Vendor: /api/v1/vendor/loyalty/* (stamp, points, cards, PINs)
- Public: /api/v1/loyalty/* (enrollment, Apple Web Service)

Anti-Fraud Features:
- Staff PIN verification (configurable per program)
- Cooldown period between stamps (default 15 min)
- Daily stamp limits (default 5/day)
- PIN lockout after failed attempts

Wallet Integration:
- Google Wallet: LoyaltyClass and LoyaltyObject management
- Apple Wallet: .pkpass generation with PKCS#7 signing
- Apple Web Service endpoints for device registration/updates

Also includes:
- Alembic migration for all tables with indexes
- Localization files (en, fr, de, lu)
- Module documentation
- Phase 2 interface and user journey plan

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-28 23:04:00 +01:00
parent fbcf07914e
commit b5a803cde8
44 changed files with 8073 additions and 0 deletions

View File

@@ -0,0 +1,59 @@
# app/modules/loyalty/services/__init__.py
"""
Loyalty module services.
Provides loyalty program management, card operations, stamp/points
handling, and wallet integration.
"""
from app.modules.loyalty.services.program_service import (
ProgramService,
program_service,
)
from app.modules.loyalty.services.card_service import (
CardService,
card_service,
)
from app.modules.loyalty.services.stamp_service import (
StampService,
stamp_service,
)
from app.modules.loyalty.services.points_service import (
PointsService,
points_service,
)
from app.modules.loyalty.services.pin_service import (
PinService,
pin_service,
)
from app.modules.loyalty.services.wallet_service import (
WalletService,
wallet_service,
)
from app.modules.loyalty.services.google_wallet_service import (
GoogleWalletService,
google_wallet_service,
)
from app.modules.loyalty.services.apple_wallet_service import (
AppleWalletService,
apple_wallet_service,
)
__all__ = [
"ProgramService",
"program_service",
"CardService",
"card_service",
"StampService",
"stamp_service",
"PointsService",
"points_service",
"PinService",
"pin_service",
"WalletService",
"wallet_service",
"GoogleWalletService",
"google_wallet_service",
"AppleWalletService",
"apple_wallet_service",
]

View File

@@ -0,0 +1,388 @@
# app/modules/loyalty/services/apple_wallet_service.py
"""
Apple Wallet service.
Handles Apple Wallet integration including:
- Generating .pkpass files
- Apple Web Service for device registration
- Push notifications for pass updates
"""
import hashlib
import io
import json
import logging
import zipfile
from typing import Any
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
AppleWalletNotConfiguredException,
WalletIntegrationException,
)
from app.modules.loyalty.models import AppleDeviceRegistration, LoyaltyCard, LoyaltyProgram
logger = logging.getLogger(__name__)
class AppleWalletService:
"""Service for Apple Wallet integration."""
@property
def is_configured(self) -> bool:
"""Check if Apple Wallet is configured."""
return bool(
config.apple_pass_type_id
and config.apple_team_id
and config.apple_wwdr_cert_path
and config.apple_signer_cert_path
and config.apple_signer_key_path
)
# =========================================================================
# Pass Generation
# =========================================================================
def generate_pass(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate a .pkpass file for a loyalty card.
The .pkpass is a ZIP file containing:
- pass.json: Pass configuration
- icon.png, icon@2x.png: App icon
- logo.png, logo@2x.png: Logo on pass
- manifest.json: SHA-1 hashes of all files
- signature: PKCS#7 signature
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
"""
if not self.is_configured:
raise AppleWalletNotConfiguredException()
program = card.program
# Ensure serial number is set
if not card.apple_serial_number:
card.apple_serial_number = f"card_{card.id}_{card.qr_code_data[:8]}"
db.commit()
# Build pass.json
pass_data = self._build_pass_json(card, program)
# Create the pass package
pass_files = {
"pass.json": json.dumps(pass_data).encode("utf-8"),
}
# Add placeholder images (in production, these would be actual images)
# For now, we'll skip images and use the pass.json only
# pass_files["icon.png"] = self._get_icon_bytes(program)
# pass_files["icon@2x.png"] = self._get_icon_bytes(program, scale=2)
# pass_files["logo.png"] = self._get_logo_bytes(program)
# pass_files["logo@2x.png"] = self._get_logo_bytes(program, scale=2)
# Create manifest
manifest = {}
for filename, content in pass_files.items():
manifest[filename] = hashlib.sha1(content).hexdigest()
pass_files["manifest.json"] = json.dumps(manifest).encode("utf-8")
# Sign the manifest
try:
signature = self._sign_manifest(pass_files["manifest.json"])
pass_files["signature"] = signature
except Exception as e:
logger.error(f"Failed to sign pass: {e}")
raise WalletIntegrationException("apple", f"Failed to sign pass: {e}")
# Create ZIP file
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for filename, content in pass_files.items():
zf.writestr(filename, content)
return buffer.getvalue()
def _build_pass_json(self, card: LoyaltyCard, program: LoyaltyProgram) -> dict[str, Any]:
"""Build the pass.json structure for a loyalty card."""
pass_data = {
"formatVersion": 1,
"passTypeIdentifier": config.apple_pass_type_id,
"serialNumber": card.apple_serial_number,
"teamIdentifier": config.apple_team_id,
"organizationName": program.display_name,
"description": f"{program.display_name} Loyalty Card",
"backgroundColor": self._hex_to_rgb(program.card_color),
"foregroundColor": "rgb(255, 255, 255)",
"labelColor": "rgb(255, 255, 255)",
"authenticationToken": card.apple_auth_token,
"webServiceURL": self._get_web_service_url(),
"barcode": {
"message": card.qr_code_data,
"format": "PKBarcodeFormatQR",
"messageEncoding": "iso-8859-1",
},
"barcodes": [
{
"message": card.qr_code_data,
"format": "PKBarcodeFormatQR",
"messageEncoding": "iso-8859-1",
}
],
}
# Add loyalty-specific fields
if program.is_stamps_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "stamps",
"label": "STAMPS",
"value": f"{card.stamp_count}/{program.stamps_target}",
}
],
"primaryFields": [
{
"key": "reward",
"label": "NEXT REWARD",
"value": program.stamps_reward_description,
}
],
"secondaryFields": [
{
"key": "progress",
"label": "PROGRESS",
"value": f"{card.stamp_count} stamps collected",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalStamps",
"label": "Total Stamps Earned",
"value": str(card.total_stamps_earned),
},
{
"key": "redemptions",
"label": "Total Rewards",
"value": str(card.stamps_redeemed),
},
],
}
elif program.is_points_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "points",
"label": "POINTS",
"value": str(card.points_balance),
}
],
"primaryFields": [
{
"key": "balance",
"label": "BALANCE",
"value": f"{card.points_balance} points",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalPoints",
"label": "Total Points Earned",
"value": str(card.total_points_earned),
},
{
"key": "redeemed",
"label": "Points Redeemed",
"value": str(card.points_redeemed),
},
],
}
return pass_data
def _hex_to_rgb(self, hex_color: str) -> str:
"""Convert hex color to RGB format for Apple Wallet."""
hex_color = hex_color.lstrip("#")
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
return f"rgb({r}, {g}, {b})"
def _get_web_service_url(self) -> str:
"""Get the base URL for Apple Web Service endpoints."""
# This should be configured based on your deployment
# For now, return a placeholder
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/apple"
def _sign_manifest(self, manifest_data: bytes) -> bytes:
"""
Sign the manifest using PKCS#7.
This requires the Apple WWDR certificate and your
pass signing certificate and key.
"""
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7
# Load certificates
with open(config.apple_wwdr_cert_path, "rb") as f:
wwdr_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_cert_path, "rb") as f:
signer_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_key_path, "rb") as f:
signer_key = serialization.load_pem_private_key(f.read(), password=None)
# Create PKCS#7 signature
signature = (
pkcs7.PKCS7SignatureBuilder()
.set_data(manifest_data)
.add_signer(signer_cert, signer_key, hashes.SHA256())
.add_certificate(wwdr_cert)
.sign(serialization.Encoding.DER, [pkcs7.PKCS7Options.DetachedSignature])
)
return signature
except FileNotFoundError as e:
raise WalletIntegrationException("apple", f"Certificate file not found: {e}")
except Exception as e:
raise WalletIntegrationException("apple", f"Failed to sign manifest: {e}")
# =========================================================================
# Pass URLs
# =========================================================================
def get_pass_url(self, card: LoyaltyCard) -> str:
"""Get the URL to download the .pkpass file."""
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/passes/apple/{card.apple_serial_number}.pkpass"
# =========================================================================
# Device Registration (Apple Web Service)
# =========================================================================
def register_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
push_token: str,
) -> None:
"""
Register a device for push notifications.
Called by Apple when user adds pass to their wallet.
"""
# Check if already registered
existing = (
db.query(AppleDeviceRegistration)
.filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
)
.first()
)
if existing:
# Update push token
existing.push_token = push_token
else:
# Create new registration
registration = AppleDeviceRegistration(
card_id=card.id,
device_library_identifier=device_library_id,
push_token=push_token,
)
db.add(registration)
db.commit()
logger.info(f"Registered device {device_library_id[:8]}... for card {card.id}")
def unregister_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
) -> None:
"""
Unregister a device.
Called by Apple when user removes pass from their wallet.
"""
db.query(AppleDeviceRegistration).filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
).delete()
db.commit()
logger.info(f"Unregistered device {device_library_id[:8]}... for card {card.id}")
def send_push_updates(self, db: Session, card: LoyaltyCard) -> None:
"""
Send push notifications to all registered devices for a card.
This tells Apple Wallet to fetch the updated pass.
"""
registrations = (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.card_id == card.id)
.all()
)
if not registrations:
return
# Send push notification to each device
for registration in registrations:
try:
self._send_push(registration.push_token)
except Exception as e:
logger.warning(
f"Failed to send push to device {registration.device_library_identifier[:8]}...: {e}"
)
def _send_push(self, push_token: str) -> None:
"""
Send an empty push notification to trigger pass update.
Apple Wallet will then call our web service to fetch the updated pass.
"""
# This would use APNs to send the push notification
# For now, we'll log and skip the actual push
logger.debug(f"Would send push to token {push_token[:8]}...")
# In production, you would use something like:
# from apns2.client import APNsClient
# from apns2.payload import Payload
# client = APNsClient(config.apple_signer_cert_path, use_sandbox=True)
# payload = Payload()
# client.send_notification(push_token, payload, "pass.com.example.loyalty")
# Singleton instance
apple_wallet_service = AppleWalletService()

View File

@@ -0,0 +1,348 @@
# app/modules/loyalty/services/card_service.py
"""
Loyalty card service.
Handles card operations including:
- Customer enrollment
- Card lookup (by ID, QR code, card number)
- Card management (activation, deactivation)
"""
import logging
from datetime import UTC, datetime
from sqlalchemy.orm import Session, joinedload
from app.modules.loyalty.exceptions import (
LoyaltyCardAlreadyExistsException,
LoyaltyCardNotFoundException,
LoyaltyProgramInactiveException,
LoyaltyProgramNotFoundException,
)
from app.modules.loyalty.models import LoyaltyCard, LoyaltyProgram, LoyaltyTransaction, TransactionType
logger = logging.getLogger(__name__)
class CardService:
"""Service for loyalty card operations."""
# =========================================================================
# Read Operations
# =========================================================================
def get_card(self, db: Session, card_id: int) -> LoyaltyCard | None:
"""Get a loyalty card by ID."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(LoyaltyCard.id == card_id)
.first()
)
def get_card_by_qr_code(self, db: Session, qr_code: str) -> LoyaltyCard | None:
"""Get a loyalty card by QR code data."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(LoyaltyCard.qr_code_data == qr_code)
.first()
)
def get_card_by_number(self, db: Session, card_number: str) -> LoyaltyCard | None:
"""Get a loyalty card by card number."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(LoyaltyCard.card_number == card_number)
.first()
)
def get_card_by_customer_and_program(
self,
db: Session,
customer_id: int,
program_id: int,
) -> LoyaltyCard | None:
"""Get a customer's card for a specific program."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(
LoyaltyCard.customer_id == customer_id,
LoyaltyCard.program_id == program_id,
)
.first()
)
def require_card(self, db: Session, card_id: int) -> LoyaltyCard:
"""Get a card or raise exception if not found."""
card = self.get_card(db, card_id)
if not card:
raise LoyaltyCardNotFoundException(str(card_id))
return card
def lookup_card(
self,
db: Session,
*,
card_id: int | None = None,
qr_code: str | None = None,
card_number: str | None = None,
) -> LoyaltyCard:
"""
Look up a card by any identifier.
Args:
db: Database session
card_id: Card ID
qr_code: QR code data
card_number: Card number
Returns:
Found card
Raises:
LoyaltyCardNotFoundException: If no card found
"""
card = None
if card_id:
card = self.get_card(db, card_id)
elif qr_code:
card = self.get_card_by_qr_code(db, qr_code)
elif card_number:
card = self.get_card_by_number(db, card_number)
if not card:
identifier = card_id or qr_code or card_number or "unknown"
raise LoyaltyCardNotFoundException(str(identifier))
return card
def list_cards(
self,
db: Session,
vendor_id: int,
*,
skip: int = 0,
limit: int = 50,
is_active: bool | None = None,
search: str | None = None,
) -> tuple[list[LoyaltyCard], int]:
"""
List loyalty cards for a vendor.
Args:
db: Database session
vendor_id: Vendor ID
skip: Pagination offset
limit: Pagination limit
is_active: Filter by active status
search: Search by card number or customer email
Returns:
(cards, total_count)
"""
from models.database.customer import Customer
query = (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.customer))
.filter(LoyaltyCard.vendor_id == vendor_id)
)
if is_active is not None:
query = query.filter(LoyaltyCard.is_active == is_active)
if search:
query = query.join(Customer).filter(
(LoyaltyCard.card_number.ilike(f"%{search}%"))
| (Customer.email.ilike(f"%{search}%"))
| (Customer.first_name.ilike(f"%{search}%"))
| (Customer.last_name.ilike(f"%{search}%"))
)
total = query.count()
cards = (
query.order_by(LoyaltyCard.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return cards, total
def list_customer_cards(
self,
db: Session,
customer_id: int,
) -> list[LoyaltyCard]:
"""List all loyalty cards for a customer."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(LoyaltyCard.customer_id == customer_id)
.all()
)
# =========================================================================
# Write Operations
# =========================================================================
def enroll_customer(
self,
db: Session,
customer_id: int,
vendor_id: int,
*,
program_id: int | None = None,
) -> LoyaltyCard:
"""
Enroll a customer in a loyalty program.
Args:
db: Database session
customer_id: Customer ID
vendor_id: Vendor ID
program_id: Optional program ID (defaults to vendor's program)
Returns:
Created loyalty card
Raises:
LoyaltyProgramNotFoundException: If no program exists
LoyaltyProgramInactiveException: If program is inactive
LoyaltyCardAlreadyExistsException: If customer already enrolled
"""
# Get the program
if program_id:
program = (
db.query(LoyaltyProgram)
.filter(LoyaltyProgram.id == program_id)
.first()
)
else:
program = (
db.query(LoyaltyProgram)
.filter(LoyaltyProgram.vendor_id == vendor_id)
.first()
)
if not program:
raise LoyaltyProgramNotFoundException(f"vendor:{vendor_id}")
if not program.is_active:
raise LoyaltyProgramInactiveException(program.id)
# Check if customer already has a card
existing = self.get_card_by_customer_and_program(db, customer_id, program.id)
if existing:
raise LoyaltyCardAlreadyExistsException(customer_id, program.id)
# Create the card
card = LoyaltyCard(
customer_id=customer_id,
program_id=program.id,
vendor_id=vendor_id,
)
db.add(card)
db.flush() # Get the card ID
# Create enrollment transaction
transaction = LoyaltyTransaction(
card_id=card.id,
vendor_id=vendor_id,
transaction_type=TransactionType.CARD_CREATED.value,
transaction_at=datetime.now(UTC),
)
db.add(transaction)
db.commit()
db.refresh(card)
logger.info(
f"Enrolled customer {customer_id} in loyalty program {program.id} "
f"(card: {card.card_number})"
)
return card
def deactivate_card(self, db: Session, card_id: int) -> LoyaltyCard:
"""Deactivate a loyalty card."""
card = self.require_card(db, card_id)
card.is_active = False
# Create deactivation transaction
transaction = LoyaltyTransaction(
card_id=card.id,
vendor_id=card.vendor_id,
transaction_type=TransactionType.CARD_DEACTIVATED.value,
transaction_at=datetime.now(UTC),
)
db.add(transaction)
db.commit()
db.refresh(card)
logger.info(f"Deactivated loyalty card {card_id}")
return card
def reactivate_card(self, db: Session, card_id: int) -> LoyaltyCard:
"""Reactivate a deactivated loyalty card."""
card = self.require_card(db, card_id)
card.is_active = True
db.commit()
db.refresh(card)
logger.info(f"Reactivated loyalty card {card_id}")
return card
# =========================================================================
# Helpers
# =========================================================================
def get_stamps_today(self, db: Session, card_id: int) -> int:
"""Get number of stamps earned today for a card."""
from sqlalchemy import func
today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
count = (
db.query(func.count(LoyaltyTransaction.id))
.filter(
LoyaltyTransaction.card_id == card_id,
LoyaltyTransaction.transaction_type == TransactionType.STAMP_EARNED.value,
LoyaltyTransaction.transaction_at >= today_start,
)
.scalar()
)
return count or 0
def get_card_transactions(
self,
db: Session,
card_id: int,
*,
skip: int = 0,
limit: int = 50,
) -> tuple[list[LoyaltyTransaction], int]:
"""Get transaction history for a card."""
query = (
db.query(LoyaltyTransaction)
.filter(LoyaltyTransaction.card_id == card_id)
.order_by(LoyaltyTransaction.transaction_at.desc())
)
total = query.count()
transactions = query.offset(skip).limit(limit).all()
return transactions, total
# Singleton instance
card_service = CardService()

View File

@@ -0,0 +1,366 @@
# app/modules/loyalty/services/google_wallet_service.py
"""
Google Wallet service.
Handles Google Wallet integration including:
- Creating LoyaltyClass for programs
- Creating LoyaltyObject for cards
- Updating objects on balance changes
- Generating "Add to Wallet" URLs
"""
import json
import logging
from typing import Any
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
GoogleWalletNotConfiguredException,
WalletIntegrationException,
)
from app.modules.loyalty.models import LoyaltyCard, LoyaltyProgram
logger = logging.getLogger(__name__)
class GoogleWalletService:
"""Service for Google Wallet integration."""
def __init__(self):
"""Initialize the Google Wallet service."""
self._credentials = None
self._http_client = None
@property
def is_configured(self) -> bool:
"""Check if Google Wallet is configured."""
return bool(config.google_issuer_id and config.google_service_account_json)
def _get_credentials(self):
"""Get Google service account credentials."""
if self._credentials:
return self._credentials
if not config.google_service_account_json:
raise GoogleWalletNotConfiguredException()
try:
from google.oauth2 import service_account
scopes = ["https://www.googleapis.com/auth/wallet_object.issuer"]
self._credentials = service_account.Credentials.from_service_account_file(
config.google_service_account_json,
scopes=scopes,
)
return self._credentials
except Exception as e:
logger.error(f"Failed to load Google credentials: {e}")
raise WalletIntegrationException("google", str(e))
def _get_http_client(self):
"""Get authenticated HTTP client."""
if self._http_client:
return self._http_client
try:
from google.auth.transport.requests import AuthorizedSession
credentials = self._get_credentials()
self._http_client = AuthorizedSession(credentials)
return self._http_client
except Exception as e:
logger.error(f"Failed to create Google HTTP client: {e}")
raise WalletIntegrationException("google", str(e))
# =========================================================================
# LoyaltyClass Operations (Program-level)
# =========================================================================
def create_class(self, db: Session, program: LoyaltyProgram) -> str:
"""
Create a LoyaltyClass for a loyalty program.
Args:
db: Database session
program: Loyalty program
Returns:
Google Wallet class ID
"""
if not self.is_configured:
raise GoogleWalletNotConfiguredException()
issuer_id = config.google_issuer_id
class_id = f"{issuer_id}.loyalty_program_{program.id}"
class_data = {
"id": class_id,
"issuerId": issuer_id,
"reviewStatus": "UNDER_REVIEW",
"programName": program.display_name,
"programLogo": {
"sourceUri": {
"uri": program.logo_url or "https://via.placeholder.com/100",
},
},
"hexBackgroundColor": program.card_color,
"localizedProgramName": {
"defaultValue": {
"language": "en",
"value": program.display_name,
},
},
}
# Add hero image if configured
if program.hero_image_url:
class_data["heroImage"] = {
"sourceUri": {"uri": program.hero_image_url},
}
try:
http = self._get_http_client()
response = http.post(
"https://walletobjects.googleapis.com/walletobjects/v1/loyaltyClass",
json=class_data,
)
if response.status_code in (200, 201):
# Update program with class ID
program.google_class_id = class_id
db.commit()
logger.info(f"Created Google Wallet class {class_id} for program {program.id}")
return class_id
elif response.status_code == 409:
# Class already exists
program.google_class_id = class_id
db.commit()
return class_id
else:
error = response.json() if response.text else {}
raise WalletIntegrationException(
"google",
f"Failed to create class: {response.status_code} - {error}",
)
except WalletIntegrationException:
raise
except Exception as e:
logger.error(f"Failed to create Google Wallet class: {e}")
raise WalletIntegrationException("google", str(e))
def update_class(self, db: Session, program: LoyaltyProgram) -> None:
"""Update a LoyaltyClass when program settings change."""
if not program.google_class_id:
return
class_data = {
"programName": program.display_name,
"hexBackgroundColor": program.card_color,
}
if program.logo_url:
class_data["programLogo"] = {
"sourceUri": {"uri": program.logo_url},
}
try:
http = self._get_http_client()
response = http.patch(
f"https://walletobjects.googleapis.com/walletobjects/v1/loyaltyClass/{program.google_class_id}",
json=class_data,
)
if response.status_code not in (200, 201):
logger.warning(
f"Failed to update Google Wallet class {program.google_class_id}: "
f"{response.status_code}"
)
except Exception as e:
logger.error(f"Failed to update Google Wallet class: {e}")
# =========================================================================
# LoyaltyObject Operations (Card-level)
# =========================================================================
def create_object(self, db: Session, card: LoyaltyCard) -> str:
"""
Create a LoyaltyObject for a loyalty card.
Args:
db: Database session
card: Loyalty card
Returns:
Google Wallet object ID
"""
if not self.is_configured:
raise GoogleWalletNotConfiguredException()
program = card.program
if not program.google_class_id:
# Create class first
self.create_class(db, program)
issuer_id = config.google_issuer_id
object_id = f"{issuer_id}.loyalty_card_{card.id}"
object_data = self._build_object_data(card, object_id)
try:
http = self._get_http_client()
response = http.post(
"https://walletobjects.googleapis.com/walletobjects/v1/loyaltyObject",
json=object_data,
)
if response.status_code in (200, 201):
card.google_object_id = object_id
db.commit()
logger.info(f"Created Google Wallet object {object_id} for card {card.id}")
return object_id
elif response.status_code == 409:
# Object already exists
card.google_object_id = object_id
db.commit()
return object_id
else:
error = response.json() if response.text else {}
raise WalletIntegrationException(
"google",
f"Failed to create object: {response.status_code} - {error}",
)
except WalletIntegrationException:
raise
except Exception as e:
logger.error(f"Failed to create Google Wallet object: {e}")
raise WalletIntegrationException("google", str(e))
def update_object(self, db: Session, card: LoyaltyCard) -> None:
"""Update a LoyaltyObject when card balance changes."""
if not card.google_object_id:
return
object_data = self._build_object_data(card, card.google_object_id)
try:
http = self._get_http_client()
response = http.patch(
f"https://walletobjects.googleapis.com/walletobjects/v1/loyaltyObject/{card.google_object_id}",
json=object_data,
)
if response.status_code in (200, 201):
logger.debug(f"Updated Google Wallet object for card {card.id}")
else:
logger.warning(
f"Failed to update Google Wallet object {card.google_object_id}: "
f"{response.status_code}"
)
except Exception as e:
logger.error(f"Failed to update Google Wallet object: {e}")
def _build_object_data(self, card: LoyaltyCard, object_id: str) -> dict[str, Any]:
"""Build the LoyaltyObject data structure."""
program = card.program
object_data = {
"id": object_id,
"classId": program.google_class_id,
"state": "ACTIVE" if card.is_active else "INACTIVE",
"accountId": card.card_number,
"accountName": card.card_number,
"barcode": {
"type": "QR_CODE",
"value": card.qr_code_data,
},
}
# Add loyalty points (stamps as points for display)
if program.is_stamps_enabled:
object_data["loyaltyPoints"] = {
"label": "Stamps",
"balance": {
"int": card.stamp_count,
},
}
# Add secondary points showing target
object_data["secondaryLoyaltyPoints"] = {
"label": f"of {program.stamps_target}",
"balance": {
"int": program.stamps_target,
},
}
elif program.is_points_enabled:
object_data["loyaltyPoints"] = {
"label": "Points",
"balance": {
"int": card.points_balance,
},
}
return object_data
# =========================================================================
# Save URL Generation
# =========================================================================
def get_save_url(self, db: Session, card: LoyaltyCard) -> str:
"""
Get the "Add to Google Wallet" URL for a card.
Args:
db: Database session
card: Loyalty card
Returns:
URL for adding pass to Google Wallet
"""
if not self.is_configured:
raise GoogleWalletNotConfiguredException()
# Ensure object exists
if not card.google_object_id:
self.create_object(db, card)
# Generate JWT for save link
try:
import jwt
from datetime import datetime, timedelta
credentials = self._get_credentials()
claims = {
"iss": credentials.service_account_email,
"aud": "google",
"origins": [],
"typ": "savetowallet",
"payload": {
"loyaltyObjects": [{"id": card.google_object_id}],
},
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=1),
}
# Sign with service account private key
token = jwt.encode(
claims,
credentials._signer._key,
algorithm="RS256",
)
card.google_object_jwt = token
db.commit()
return f"https://pay.google.com/gp/v/save/{token}"
except Exception as e:
logger.error(f"Failed to generate Google Wallet save URL: {e}")
raise WalletIntegrationException("google", str(e))
# Singleton instance
google_wallet_service = GoogleWalletService()

View File

@@ -0,0 +1,281 @@
# app/modules/loyalty/services/pin_service.py
"""
Staff PIN service.
Handles PIN operations including:
- PIN creation and management
- PIN verification with lockout
- PIN security (failed attempts, lockout)
"""
import logging
from datetime import UTC, datetime
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
InvalidStaffPinException,
StaffPinLockedException,
StaffPinNotFoundException,
)
from app.modules.loyalty.models import StaffPin
from app.modules.loyalty.schemas.pin import PinCreate, PinUpdate
logger = logging.getLogger(__name__)
class PinService:
"""Service for staff PIN operations."""
# =========================================================================
# Read Operations
# =========================================================================
def get_pin(self, db: Session, pin_id: int) -> StaffPin | None:
"""Get a staff PIN by ID."""
return db.query(StaffPin).filter(StaffPin.id == pin_id).first()
def get_pin_by_staff_id(
self,
db: Session,
program_id: int,
staff_id: str,
) -> StaffPin | None:
"""Get a staff PIN by employee ID."""
return (
db.query(StaffPin)
.filter(
StaffPin.program_id == program_id,
StaffPin.staff_id == staff_id,
)
.first()
)
def require_pin(self, db: Session, pin_id: int) -> StaffPin:
"""Get a PIN or raise exception if not found."""
pin = self.get_pin(db, pin_id)
if not pin:
raise StaffPinNotFoundException(str(pin_id))
return pin
def list_pins(
self,
db: Session,
program_id: int,
*,
is_active: bool | None = None,
) -> list[StaffPin]:
"""List all staff PINs for a program."""
query = db.query(StaffPin).filter(StaffPin.program_id == program_id)
if is_active is not None:
query = query.filter(StaffPin.is_active == is_active)
return query.order_by(StaffPin.name).all()
# =========================================================================
# Write Operations
# =========================================================================
def create_pin(
self,
db: Session,
program_id: int,
vendor_id: int,
data: PinCreate,
) -> StaffPin:
"""
Create a new staff PIN.
Args:
db: Database session
program_id: Program ID
vendor_id: Vendor ID
data: PIN creation data
Returns:
Created PIN
"""
pin = StaffPin(
program_id=program_id,
vendor_id=vendor_id,
name=data.name,
staff_id=data.staff_id,
)
pin.set_pin(data.pin)
db.add(pin)
db.commit()
db.refresh(pin)
logger.info(f"Created staff PIN {pin.id} for '{pin.name}' in program {program_id}")
return pin
def update_pin(
self,
db: Session,
pin_id: int,
data: PinUpdate,
) -> StaffPin:
"""
Update a staff PIN.
Args:
db: Database session
pin_id: PIN ID
data: Update data
Returns:
Updated PIN
"""
pin = self.require_pin(db, pin_id)
if data.name is not None:
pin.name = data.name
if data.staff_id is not None:
pin.staff_id = data.staff_id
if data.pin is not None:
pin.set_pin(data.pin)
# Reset lockout when PIN is changed
pin.failed_attempts = 0
pin.locked_until = None
if data.is_active is not None:
pin.is_active = data.is_active
db.commit()
db.refresh(pin)
logger.info(f"Updated staff PIN {pin_id}")
return pin
def delete_pin(self, db: Session, pin_id: int) -> None:
"""Delete a staff PIN."""
pin = self.require_pin(db, pin_id)
program_id = pin.program_id
db.delete(pin)
db.commit()
logger.info(f"Deleted staff PIN {pin_id} from program {program_id}")
def unlock_pin(self, db: Session, pin_id: int) -> StaffPin:
"""Unlock a locked staff PIN."""
pin = self.require_pin(db, pin_id)
pin.unlock()
db.commit()
db.refresh(pin)
logger.info(f"Unlocked staff PIN {pin_id}")
return pin
# =========================================================================
# Verification
# =========================================================================
def verify_pin(
self,
db: Session,
program_id: int,
plain_pin: str,
) -> StaffPin:
"""
Verify a staff PIN.
Checks all active PINs for the program and returns the matching one.
Args:
db: Database session
program_id: Program ID
plain_pin: Plain text PIN to verify
Returns:
Verified StaffPin object
Raises:
InvalidStaffPinException: PIN is invalid
StaffPinLockedException: PIN is locked
"""
# Get all active PINs for the program
pins = self.list_pins(db, program_id, is_active=True)
if not pins:
raise InvalidStaffPinException()
# Try each PIN
for pin in pins:
# Check if locked
if pin.is_locked:
continue
# Verify PIN
if pin.verify_pin(plain_pin):
# Success - record it
pin.record_success()
db.commit()
logger.debug(f"PIN verified for '{pin.name}' in program {program_id}")
return pin
# No match found - record failed attempt on all unlocked PINs
# This is a simplified approach; in production you might want to
# track which PIN was attempted based on additional context
locked_pin = None
remaining = None
for pin in pins:
if not pin.is_locked:
is_now_locked = pin.record_failed_attempt(
max_attempts=config.pin_max_failed_attempts,
lockout_minutes=config.pin_lockout_minutes,
)
if is_now_locked:
locked_pin = pin
else:
remaining = pin.remaining_attempts
db.commit()
# If a PIN just got locked, raise that specific error
if locked_pin:
raise StaffPinLockedException(locked_pin.locked_until.isoformat())
raise InvalidStaffPinException(remaining)
def find_matching_pin(
self,
db: Session,
program_id: int,
plain_pin: str,
) -> StaffPin | None:
"""
Find a matching PIN without recording attempts.
Useful for checking PIN validity without side effects.
Args:
db: Database session
program_id: Program ID
plain_pin: Plain text PIN to check
Returns:
Matching StaffPin or None
"""
pins = self.list_pins(db, program_id, is_active=True)
for pin in pins:
if not pin.is_locked and pin.verify_pin(plain_pin):
return pin
return None
# Singleton instance
pin_service = PinService()

View File

@@ -0,0 +1,356 @@
# app/modules/loyalty/services/points_service.py
"""
Points service.
Handles points operations including:
- Earning points from purchases
- Redeeming points for rewards
- Points balance management
"""
import logging
from datetime import UTC, datetime
from sqlalchemy.orm import Session
from app.modules.loyalty.exceptions import (
InsufficientPointsException,
InvalidRewardException,
LoyaltyCardInactiveException,
LoyaltyProgramInactiveException,
StaffPinRequiredException,
)
from app.modules.loyalty.models import LoyaltyCard, LoyaltyTransaction, TransactionType
from app.modules.loyalty.services.card_service import card_service
from app.modules.loyalty.services.pin_service import pin_service
logger = logging.getLogger(__name__)
class PointsService:
"""Service for points operations."""
def earn_points(
self,
db: Session,
*,
card_id: int | None = None,
qr_code: str | None = None,
card_number: str | None = None,
purchase_amount_cents: int,
order_reference: str | None = None,
staff_pin: str | None = None,
ip_address: str | None = None,
user_agent: str | None = None,
notes: str | None = None,
) -> dict:
"""
Earn points from a purchase.
Points are calculated based on the program's points_per_euro rate.
Args:
db: Database session
card_id: Card ID
qr_code: QR code data
card_number: Card number
purchase_amount_cents: Purchase amount in cents
order_reference: Order reference for tracking
staff_pin: Staff PIN for verification
ip_address: Request IP for audit
user_agent: Request user agent for audit
notes: Optional notes
Returns:
Dict with operation result
"""
# Look up the card
card = card_service.lookup_card(
db,
card_id=card_id,
qr_code=qr_code,
card_number=card_number,
)
# Validate card and program
if not card.is_active:
raise LoyaltyCardInactiveException(card.id)
program = card.program
if not program.is_active:
raise LoyaltyProgramInactiveException(program.id)
# Check if points are enabled
if not program.is_points_enabled:
logger.warning(f"Points attempted on stamps-only program {program.id}")
raise LoyaltyCardInactiveException(card.id)
# Verify staff PIN if required
verified_pin = None
if program.require_staff_pin:
if not staff_pin:
raise StaffPinRequiredException()
verified_pin = pin_service.verify_pin(db, program.id, staff_pin)
# Calculate points
# points_per_euro is per full euro, so divide cents by 100
purchase_euros = purchase_amount_cents / 100
points_earned = int(purchase_euros * program.points_per_euro)
if points_earned <= 0:
return {
"success": True,
"message": "Purchase too small to earn points",
"points_earned": 0,
"points_per_euro": program.points_per_euro,
"purchase_amount_cents": purchase_amount_cents,
"card_id": card.id,
"card_number": card.card_number,
"points_balance": card.points_balance,
"total_points_earned": card.total_points_earned,
}
# Add points
now = datetime.now(UTC)
card.points_balance += points_earned
card.total_points_earned += points_earned
card.last_points_at = now
# Create transaction
transaction = LoyaltyTransaction(
card_id=card.id,
vendor_id=card.vendor_id,
staff_pin_id=verified_pin.id if verified_pin else None,
transaction_type=TransactionType.POINTS_EARNED.value,
points_delta=points_earned,
stamps_balance_after=card.stamp_count,
points_balance_after=card.points_balance,
purchase_amount_cents=purchase_amount_cents,
order_reference=order_reference,
ip_address=ip_address,
user_agent=user_agent,
notes=notes,
transaction_at=now,
)
db.add(transaction)
db.commit()
db.refresh(card)
logger.info(
f"Added {points_earned} points to card {card.id} "
f"(purchase: €{purchase_euros:.2f}, balance: {card.points_balance})"
)
return {
"success": True,
"message": "Points earned successfully",
"points_earned": points_earned,
"points_per_euro": program.points_per_euro,
"purchase_amount_cents": purchase_amount_cents,
"card_id": card.id,
"card_number": card.card_number,
"points_balance": card.points_balance,
"total_points_earned": card.total_points_earned,
}
def redeem_points(
self,
db: Session,
*,
card_id: int | None = None,
qr_code: str | None = None,
card_number: str | None = None,
reward_id: str,
staff_pin: str | None = None,
ip_address: str | None = None,
user_agent: str | None = None,
notes: str | None = None,
) -> dict:
"""
Redeem points for a reward.
Args:
db: Database session
card_id: Card ID
qr_code: QR code data
card_number: Card number
reward_id: ID of the reward to redeem
staff_pin: Staff PIN for verification
ip_address: Request IP for audit
user_agent: Request user agent for audit
notes: Optional notes
Returns:
Dict with operation result
Raises:
InvalidRewardException: Reward not found or inactive
InsufficientPointsException: Not enough points
"""
# Look up the card
card = card_service.lookup_card(
db,
card_id=card_id,
qr_code=qr_code,
card_number=card_number,
)
# Validate card and program
if not card.is_active:
raise LoyaltyCardInactiveException(card.id)
program = card.program
if not program.is_active:
raise LoyaltyProgramInactiveException(program.id)
# Find the reward
reward = program.get_points_reward(reward_id)
if not reward:
raise InvalidRewardException(reward_id)
if not reward.get("is_active", True):
raise InvalidRewardException(reward_id)
points_required = reward["points_required"]
reward_name = reward["name"]
# Check if enough points
if card.points_balance < points_required:
raise InsufficientPointsException(card.points_balance, points_required)
# Verify staff PIN if required
verified_pin = None
if program.require_staff_pin:
if not staff_pin:
raise StaffPinRequiredException()
verified_pin = pin_service.verify_pin(db, program.id, staff_pin)
# Redeem points
now = datetime.now(UTC)
card.points_balance -= points_required
card.points_redeemed += points_required
card.last_redemption_at = now
# Create transaction
transaction = LoyaltyTransaction(
card_id=card.id,
vendor_id=card.vendor_id,
staff_pin_id=verified_pin.id if verified_pin else None,
transaction_type=TransactionType.POINTS_REDEEMED.value,
points_delta=-points_required,
stamps_balance_after=card.stamp_count,
points_balance_after=card.points_balance,
reward_id=reward_id,
reward_description=reward_name,
ip_address=ip_address,
user_agent=user_agent,
notes=notes,
transaction_at=now,
)
db.add(transaction)
db.commit()
db.refresh(card)
logger.info(
f"Redeemed {points_required} points from card {card.id} "
f"(reward: {reward_name}, balance: {card.points_balance})"
)
return {
"success": True,
"message": "Reward redeemed successfully",
"reward_id": reward_id,
"reward_name": reward_name,
"points_spent": points_required,
"card_id": card.id,
"card_number": card.card_number,
"points_balance": card.points_balance,
"total_points_redeemed": card.points_redeemed,
}
def adjust_points(
self,
db: Session,
card_id: int,
points_delta: int,
*,
reason: str,
staff_pin: str | None = None,
ip_address: str | None = None,
user_agent: str | None = None,
) -> dict:
"""
Manually adjust points (admin operation).
Args:
db: Database session
card_id: Card ID
points_delta: Points to add (positive) or remove (negative)
reason: Reason for adjustment
staff_pin: Staff PIN for verification
ip_address: Request IP for audit
user_agent: Request user agent for audit
Returns:
Dict with operation result
"""
card = card_service.require_card(db, card_id)
program = card.program
# Verify staff PIN if required
verified_pin = None
if program.require_staff_pin and staff_pin:
verified_pin = pin_service.verify_pin(db, program.id, staff_pin)
# Apply adjustment
now = datetime.now(UTC)
card.points_balance += points_delta
if points_delta > 0:
card.total_points_earned += points_delta
else:
# Negative adjustment - don't add to redeemed, just reduce balance
pass
# Ensure balance doesn't go negative
if card.points_balance < 0:
card.points_balance = 0
# Create transaction
transaction = LoyaltyTransaction(
card_id=card.id,
vendor_id=card.vendor_id,
staff_pin_id=verified_pin.id if verified_pin else None,
transaction_type=TransactionType.POINTS_ADJUSTMENT.value,
points_delta=points_delta,
stamps_balance_after=card.stamp_count,
points_balance_after=card.points_balance,
notes=reason,
ip_address=ip_address,
user_agent=user_agent,
transaction_at=now,
)
db.add(transaction)
db.commit()
db.refresh(card)
logger.info(
f"Adjusted points for card {card.id} by {points_delta:+d} "
f"(reason: {reason}, balance: {card.points_balance})"
)
return {
"success": True,
"message": "Points adjusted successfully",
"points_delta": points_delta,
"card_id": card.id,
"card_number": card.card_number,
"points_balance": card.points_balance,
}
# Singleton instance
points_service = PointsService()

View File

@@ -0,0 +1,379 @@
# app/modules/loyalty/services/program_service.py
"""
Loyalty program service.
Handles CRUD operations for loyalty programs including:
- Program creation and configuration
- Program updates
- Program activation/deactivation
- Statistics retrieval
"""
import logging
from datetime import UTC, datetime
from sqlalchemy.orm import Session
from app.modules.loyalty.exceptions import (
LoyaltyProgramAlreadyExistsException,
LoyaltyProgramNotFoundException,
)
from app.modules.loyalty.models import LoyaltyProgram, LoyaltyType
from app.modules.loyalty.schemas.program import (
ProgramCreate,
ProgramUpdate,
)
logger = logging.getLogger(__name__)
class ProgramService:
"""Service for loyalty program operations."""
# =========================================================================
# Read Operations
# =========================================================================
def get_program(self, db: Session, program_id: int) -> LoyaltyProgram | None:
"""Get a loyalty program by ID."""
return (
db.query(LoyaltyProgram)
.filter(LoyaltyProgram.id == program_id)
.first()
)
def get_program_by_vendor(self, db: Session, vendor_id: int) -> LoyaltyProgram | None:
"""Get a vendor's loyalty program."""
return (
db.query(LoyaltyProgram)
.filter(LoyaltyProgram.vendor_id == vendor_id)
.first()
)
def get_active_program_by_vendor(self, db: Session, vendor_id: int) -> LoyaltyProgram | None:
"""Get a vendor's active loyalty program."""
return (
db.query(LoyaltyProgram)
.filter(
LoyaltyProgram.vendor_id == vendor_id,
LoyaltyProgram.is_active == True,
)
.first()
)
def require_program(self, db: Session, program_id: int) -> LoyaltyProgram:
"""Get a program or raise exception if not found."""
program = self.get_program(db, program_id)
if not program:
raise LoyaltyProgramNotFoundException(str(program_id))
return program
def require_program_by_vendor(self, db: Session, vendor_id: int) -> LoyaltyProgram:
"""Get a vendor's program or raise exception if not found."""
program = self.get_program_by_vendor(db, vendor_id)
if not program:
raise LoyaltyProgramNotFoundException(f"vendor:{vendor_id}")
return program
def list_programs(
self,
db: Session,
*,
skip: int = 0,
limit: int = 100,
is_active: bool | None = None,
) -> tuple[list[LoyaltyProgram], int]:
"""List all loyalty programs (admin)."""
query = db.query(LoyaltyProgram)
if is_active is not None:
query = query.filter(LoyaltyProgram.is_active == is_active)
total = query.count()
programs = query.offset(skip).limit(limit).all()
return programs, total
# =========================================================================
# Write Operations
# =========================================================================
def create_program(
self,
db: Session,
vendor_id: int,
data: ProgramCreate,
) -> LoyaltyProgram:
"""
Create a new loyalty program for a vendor.
Args:
db: Database session
vendor_id: Vendor ID
data: Program configuration
Returns:
Created program
Raises:
LoyaltyProgramAlreadyExistsException: If vendor already has a program
"""
# Check if vendor already has a program
existing = self.get_program_by_vendor(db, vendor_id)
if existing:
raise LoyaltyProgramAlreadyExistsException(vendor_id)
# Convert points_rewards to dict list for JSON storage
points_rewards_data = [r.model_dump() for r in data.points_rewards]
program = LoyaltyProgram(
vendor_id=vendor_id,
loyalty_type=data.loyalty_type,
# Stamps
stamps_target=data.stamps_target,
stamps_reward_description=data.stamps_reward_description,
stamps_reward_value_cents=data.stamps_reward_value_cents,
# Points
points_per_euro=data.points_per_euro,
points_rewards=points_rewards_data,
# Anti-fraud
cooldown_minutes=data.cooldown_minutes,
max_daily_stamps=data.max_daily_stamps,
require_staff_pin=data.require_staff_pin,
# Branding
card_name=data.card_name,
card_color=data.card_color,
card_secondary_color=data.card_secondary_color,
logo_url=data.logo_url,
hero_image_url=data.hero_image_url,
# Terms
terms_text=data.terms_text,
privacy_url=data.privacy_url,
# Status
is_active=True,
activated_at=datetime.now(UTC),
)
db.add(program)
db.commit()
db.refresh(program)
logger.info(
f"Created loyalty program {program.id} for vendor {vendor_id} "
f"(type: {program.loyalty_type})"
)
return program
def update_program(
self,
db: Session,
program_id: int,
data: ProgramUpdate,
) -> LoyaltyProgram:
"""
Update a loyalty program.
Args:
db: Database session
program_id: Program ID
data: Update data
Returns:
Updated program
"""
program = self.require_program(db, program_id)
update_data = data.model_dump(exclude_unset=True)
# Handle points_rewards specially (convert to dict list)
if "points_rewards" in update_data and update_data["points_rewards"] is not None:
update_data["points_rewards"] = [
r.model_dump() if hasattr(r, "model_dump") else r
for r in update_data["points_rewards"]
]
for field, value in update_data.items():
setattr(program, field, value)
db.commit()
db.refresh(program)
logger.info(f"Updated loyalty program {program_id}")
return program
def activate_program(self, db: Session, program_id: int) -> LoyaltyProgram:
"""Activate a loyalty program."""
program = self.require_program(db, program_id)
program.activate()
db.commit()
db.refresh(program)
logger.info(f"Activated loyalty program {program_id}")
return program
def deactivate_program(self, db: Session, program_id: int) -> LoyaltyProgram:
"""Deactivate a loyalty program."""
program = self.require_program(db, program_id)
program.deactivate()
db.commit()
db.refresh(program)
logger.info(f"Deactivated loyalty program {program_id}")
return program
def delete_program(self, db: Session, program_id: int) -> None:
"""Delete a loyalty program and all associated data."""
program = self.require_program(db, program_id)
vendor_id = program.vendor_id
db.delete(program)
db.commit()
logger.info(f"Deleted loyalty program {program_id} for vendor {vendor_id}")
# =========================================================================
# Statistics
# =========================================================================
def get_program_stats(self, db: Session, program_id: int) -> dict:
"""
Get statistics for a loyalty program.
Returns dict with:
- total_cards, active_cards
- total_stamps_issued, total_stamps_redeemed
- total_points_issued, total_points_redeemed
- etc.
"""
from datetime import timedelta
from sqlalchemy import func
from app.modules.loyalty.models import LoyaltyCard, LoyaltyTransaction
program = self.require_program(db, program_id)
# Card counts
total_cards = (
db.query(func.count(LoyaltyCard.id))
.filter(LoyaltyCard.program_id == program_id)
.scalar()
or 0
)
active_cards = (
db.query(func.count(LoyaltyCard.id))
.filter(
LoyaltyCard.program_id == program_id,
LoyaltyCard.is_active == True,
)
.scalar()
or 0
)
# Stamp totals from cards
stamp_stats = (
db.query(
func.sum(LoyaltyCard.total_stamps_earned),
func.sum(LoyaltyCard.stamps_redeemed),
)
.filter(LoyaltyCard.program_id == program_id)
.first()
)
total_stamps_issued = stamp_stats[0] or 0
total_stamps_redeemed = stamp_stats[1] or 0
# Points totals from cards
points_stats = (
db.query(
func.sum(LoyaltyCard.total_points_earned),
func.sum(LoyaltyCard.points_redeemed),
)
.filter(LoyaltyCard.program_id == program_id)
.first()
)
total_points_issued = points_stats[0] or 0
total_points_redeemed = points_stats[1] or 0
# This month's activity
month_start = datetime.now(UTC).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
stamps_this_month = (
db.query(func.count(LoyaltyTransaction.id))
.join(LoyaltyCard)
.filter(
LoyaltyCard.program_id == program_id,
LoyaltyTransaction.transaction_type == "stamp_earned",
LoyaltyTransaction.transaction_at >= month_start,
)
.scalar()
or 0
)
redemptions_this_month = (
db.query(func.count(LoyaltyTransaction.id))
.join(LoyaltyCard)
.filter(
LoyaltyCard.program_id == program_id,
LoyaltyTransaction.transaction_type == "stamp_redeemed",
LoyaltyTransaction.transaction_at >= month_start,
)
.scalar()
or 0
)
# 30-day active cards
thirty_days_ago = datetime.now(UTC) - timedelta(days=30)
cards_with_activity_30d = (
db.query(func.count(func.distinct(LoyaltyTransaction.card_id)))
.join(LoyaltyCard)
.filter(
LoyaltyCard.program_id == program_id,
LoyaltyTransaction.transaction_at >= thirty_days_ago,
)
.scalar()
or 0
)
# Averages
avg_stamps = total_stamps_issued / total_cards if total_cards > 0 else 0
avg_points = total_points_issued / total_cards if total_cards > 0 else 0
# Estimated liability (unredeemed value)
current_stamps = (
db.query(func.sum(LoyaltyCard.stamp_count))
.filter(LoyaltyCard.program_id == program_id)
.scalar()
or 0
)
stamp_value = program.stamps_reward_value_cents or 0
current_points = (
db.query(func.sum(LoyaltyCard.points_balance))
.filter(LoyaltyCard.program_id == program_id)
.scalar()
or 0
)
# Rough estimate: assume 100 points = €1
points_value_cents = current_points // 100 * 100
estimated_liability = (
(current_stamps * stamp_value // program.stamps_target) + points_value_cents
)
return {
"total_cards": total_cards,
"active_cards": active_cards,
"total_stamps_issued": total_stamps_issued,
"total_stamps_redeemed": total_stamps_redeemed,
"stamps_this_month": stamps_this_month,
"redemptions_this_month": redemptions_this_month,
"total_points_issued": total_points_issued,
"total_points_redeemed": total_points_redeemed,
"cards_with_activity_30d": cards_with_activity_30d,
"average_stamps_per_card": round(avg_stamps, 2),
"average_points_per_card": round(avg_points, 2),
"estimated_liability_cents": estimated_liability,
}
# Singleton instance
program_service = ProgramService()

View File

@@ -0,0 +1,279 @@
# app/modules/loyalty/services/stamp_service.py
"""
Stamp service.
Handles stamp operations including:
- Adding stamps with anti-fraud checks
- Redeeming stamps for rewards
- Daily limit tracking
"""
import logging
from datetime import UTC, datetime, timedelta
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
DailyStampLimitException,
InsufficientStampsException,
LoyaltyCardInactiveException,
LoyaltyProgramInactiveException,
StampCooldownException,
StaffPinRequiredException,
)
from app.modules.loyalty.models import LoyaltyCard, LoyaltyTransaction, TransactionType
from app.modules.loyalty.services.card_service import card_service
from app.modules.loyalty.services.pin_service import pin_service
logger = logging.getLogger(__name__)
class StampService:
"""Service for stamp operations."""
def add_stamp(
self,
db: Session,
*,
card_id: int | None = None,
qr_code: str | None = None,
card_number: str | None = None,
staff_pin: str | None = None,
ip_address: str | None = None,
user_agent: str | None = None,
notes: str | None = None,
) -> dict:
"""
Add a stamp to a loyalty card.
Performs all anti-fraud checks:
- Staff PIN verification (if required)
- Cooldown period check
- Daily limit check
Args:
db: Database session
card_id: Card ID
qr_code: QR code data
card_number: Card number
staff_pin: Staff PIN for verification
ip_address: Request IP for audit
user_agent: Request user agent for audit
notes: Optional notes
Returns:
Dict with operation result
Raises:
LoyaltyCardNotFoundException: Card not found
LoyaltyCardInactiveException: Card is inactive
LoyaltyProgramInactiveException: Program is inactive
StaffPinRequiredException: PIN required but not provided
InvalidStaffPinException: PIN is invalid
StampCooldownException: Cooldown period not elapsed
DailyStampLimitException: Daily limit reached
"""
# Look up the card
card = card_service.lookup_card(
db,
card_id=card_id,
qr_code=qr_code,
card_number=card_number,
)
# Validate card and program
if not card.is_active:
raise LoyaltyCardInactiveException(card.id)
program = card.program
if not program.is_active:
raise LoyaltyProgramInactiveException(program.id)
# Check if stamps are enabled
if not program.is_stamps_enabled:
logger.warning(f"Stamp attempted on points-only program {program.id}")
raise LoyaltyCardInactiveException(card.id)
# Verify staff PIN if required
verified_pin = None
if program.require_staff_pin:
if not staff_pin:
raise StaffPinRequiredException()
verified_pin = pin_service.verify_pin(db, program.id, staff_pin)
# Check cooldown
now = datetime.now(UTC)
if card.last_stamp_at:
cooldown_ends = card.last_stamp_at + timedelta(minutes=program.cooldown_minutes)
if now < cooldown_ends:
raise StampCooldownException(
cooldown_ends.isoformat(),
program.cooldown_minutes,
)
# Check daily limit
stamps_today = card_service.get_stamps_today(db, card.id)
if stamps_today >= program.max_daily_stamps:
raise DailyStampLimitException(program.max_daily_stamps, stamps_today)
# Add the stamp
card.stamp_count += 1
card.total_stamps_earned += 1
card.last_stamp_at = now
# Check if reward earned
reward_earned = card.stamp_count >= program.stamps_target
# Create transaction
transaction = LoyaltyTransaction(
card_id=card.id,
vendor_id=card.vendor_id,
staff_pin_id=verified_pin.id if verified_pin else None,
transaction_type=TransactionType.STAMP_EARNED.value,
stamps_delta=1,
stamps_balance_after=card.stamp_count,
points_balance_after=card.points_balance,
ip_address=ip_address,
user_agent=user_agent,
notes=notes,
transaction_at=now,
)
db.add(transaction)
db.commit()
db.refresh(card)
stamps_today += 1
logger.info(
f"Added stamp to card {card.id} "
f"(stamps: {card.stamp_count}/{program.stamps_target}, "
f"today: {stamps_today}/{program.max_daily_stamps})"
)
# Calculate next stamp availability
next_stamp_at = now + timedelta(minutes=program.cooldown_minutes)
return {
"success": True,
"message": "Stamp added successfully",
"card_id": card.id,
"card_number": card.card_number,
"stamp_count": card.stamp_count,
"stamps_target": program.stamps_target,
"stamps_until_reward": max(0, program.stamps_target - card.stamp_count),
"reward_earned": reward_earned,
"reward_description": program.stamps_reward_description if reward_earned else None,
"next_stamp_available_at": next_stamp_at,
"stamps_today": stamps_today,
"stamps_remaining_today": max(0, program.max_daily_stamps - stamps_today),
}
def redeem_stamps(
self,
db: Session,
*,
card_id: int | None = None,
qr_code: str | None = None,
card_number: str | None = None,
staff_pin: str | None = None,
ip_address: str | None = None,
user_agent: str | None = None,
notes: str | None = None,
) -> dict:
"""
Redeem stamps for a reward.
Args:
db: Database session
card_id: Card ID
qr_code: QR code data
card_number: Card number
staff_pin: Staff PIN for verification
ip_address: Request IP for audit
user_agent: Request user agent for audit
notes: Optional notes
Returns:
Dict with operation result
Raises:
LoyaltyCardNotFoundException: Card not found
InsufficientStampsException: Not enough stamps
StaffPinRequiredException: PIN required but not provided
"""
# Look up the card
card = card_service.lookup_card(
db,
card_id=card_id,
qr_code=qr_code,
card_number=card_number,
)
# Validate card and program
if not card.is_active:
raise LoyaltyCardInactiveException(card.id)
program = card.program
if not program.is_active:
raise LoyaltyProgramInactiveException(program.id)
# Check if enough stamps
if card.stamp_count < program.stamps_target:
raise InsufficientStampsException(card.stamp_count, program.stamps_target)
# Verify staff PIN if required
verified_pin = None
if program.require_staff_pin:
if not staff_pin:
raise StaffPinRequiredException()
verified_pin = pin_service.verify_pin(db, program.id, staff_pin)
# Redeem stamps
now = datetime.now(UTC)
stamps_redeemed = program.stamps_target
card.stamp_count -= stamps_redeemed
card.stamps_redeemed += 1
card.last_redemption_at = now
# Create transaction
transaction = LoyaltyTransaction(
card_id=card.id,
vendor_id=card.vendor_id,
staff_pin_id=verified_pin.id if verified_pin else None,
transaction_type=TransactionType.STAMP_REDEEMED.value,
stamps_delta=-stamps_redeemed,
stamps_balance_after=card.stamp_count,
points_balance_after=card.points_balance,
reward_description=program.stamps_reward_description,
ip_address=ip_address,
user_agent=user_agent,
notes=notes,
transaction_at=now,
)
db.add(transaction)
db.commit()
db.refresh(card)
logger.info(
f"Redeemed stamps from card {card.id} "
f"(reward: {program.stamps_reward_description}, "
f"total redemptions: {card.stamps_redeemed})"
)
return {
"success": True,
"message": "Reward redeemed successfully",
"card_id": card.id,
"card_number": card.card_number,
"stamp_count": card.stamp_count,
"stamps_target": program.stamps_target,
"reward_description": program.stamps_reward_description,
"total_redemptions": card.stamps_redeemed,
}
# Singleton instance
stamp_service = StampService()

View File

@@ -0,0 +1,144 @@
# app/modules/loyalty/services/wallet_service.py
"""
Unified wallet service.
Provides a unified interface for wallet operations across
Google Wallet and Apple Wallet.
"""
import logging
from sqlalchemy.orm import Session
from app.modules.loyalty.models import LoyaltyCard, LoyaltyProgram
logger = logging.getLogger(__name__)
class WalletService:
"""Unified service for wallet operations."""
def get_add_to_wallet_urls(
self,
db: Session,
card: LoyaltyCard,
) -> dict[str, str | None]:
"""
Get URLs for adding card to wallets.
Args:
db: Database session
card: Loyalty card
Returns:
Dict with google_wallet_url and apple_wallet_url
"""
from app.modules.loyalty.services.apple_wallet_service import apple_wallet_service
from app.modules.loyalty.services.google_wallet_service import google_wallet_service
urls = {
"google_wallet_url": None,
"apple_wallet_url": None,
}
program = card.program
# Google Wallet
if program.google_issuer_id or program.google_class_id:
try:
urls["google_wallet_url"] = google_wallet_service.get_save_url(db, card)
except Exception as e:
logger.warning(f"Failed to get Google Wallet URL for card {card.id}: {e}")
# Apple Wallet
if program.apple_pass_type_id:
try:
urls["apple_wallet_url"] = apple_wallet_service.get_pass_url(card)
except Exception as e:
logger.warning(f"Failed to get Apple Wallet URL for card {card.id}: {e}")
return urls
def sync_card_to_wallets(self, db: Session, card: LoyaltyCard) -> dict[str, bool]:
"""
Sync card data to all configured wallets.
Called after stamp/points operations to update wallet passes.
Args:
db: Database session
card: Loyalty card to sync
Returns:
Dict with success status for each wallet
"""
from app.modules.loyalty.services.apple_wallet_service import apple_wallet_service
from app.modules.loyalty.services.google_wallet_service import google_wallet_service
results = {
"google_wallet": False,
"apple_wallet": False,
}
program = card.program
# Sync to Google Wallet
if card.google_object_id:
try:
google_wallet_service.update_object(db, card)
results["google_wallet"] = True
except Exception as e:
logger.error(f"Failed to sync card {card.id} to Google Wallet: {e}")
# Sync to Apple Wallet (via push notification)
if card.apple_serial_number:
try:
apple_wallet_service.send_push_updates(db, card)
results["apple_wallet"] = True
except Exception as e:
logger.error(f"Failed to send Apple Wallet push for card {card.id}: {e}")
return results
def create_wallet_objects(self, db: Session, card: LoyaltyCard) -> dict[str, bool]:
"""
Create wallet objects for a new card.
Called during enrollment to set up wallet passes.
Args:
db: Database session
card: Newly created loyalty card
Returns:
Dict with success status for each wallet
"""
from app.modules.loyalty.services.google_wallet_service import google_wallet_service
results = {
"google_wallet": False,
"apple_wallet": False,
}
program = card.program
# Create Google Wallet object
if program.google_issuer_id:
try:
google_wallet_service.create_object(db, card)
results["google_wallet"] = True
except Exception as e:
logger.error(f"Failed to create Google Wallet object for card {card.id}: {e}")
# Apple Wallet objects are created on-demand when user downloads pass
# No pre-creation needed, but we set up the serial number
if program.apple_pass_type_id:
card.apple_serial_number = f"card_{card.id}_{card.qr_code_data[:8]}"
db.commit()
results["apple_wallet"] = True
return results
# Singleton instance
wallet_service = WalletService()