Files
orion/app/modules/loyalty/services/apple_wallet_service.py
Samir Boulahtit 4b56eb7ab1
Some checks failed
CI / ruff (push) Successful in 18s
CI / pytest (push) Failing after 2h37m39s
CI / validate (push) Successful in 30s
CI / dependency-scanning (push) Successful in 32s
CI / docs (push) Has been skipped
CI / deploy (push) Has been skipped
feat(loyalty): Phase 1 production launch hardening
Phase 1 of the loyalty production launch plan: config & security
hardening, dropped-data fix, DB integrity guards, rate limiting, and
constant-time auth compare. 362 tests pass.

- 1.4 Persist customer birth_date (new column + migration). Enrollment
  form was collecting it but the value was silently dropped because
  create_customer_for_enrollment never received it. Backfills existing
  customers without overwriting.
- 1.1 LOYALTY_GOOGLE_SERVICE_ACCOUNT_JSON validated at startup (file
  must exist and be readable; ~ expanded). Adds is_google_wallet_enabled
  and is_apple_wallet_enabled derived flags. Prod path documented as
  ~/apps/orion/google-wallet-sa.json.
- 1.5 CHECK constraints on loyalty_cards (points_balance, stamp_count
  non-negative) and loyalty_programs (min_purchase, points_per_euro,
  welcome_bonus non-negative; stamps_target >= 1). Mirrored as
  CheckConstraint in models. Pre-flight scan showed zero violations.
- 1.3 @rate_limit on store mutating endpoints: stamp 60/min,
  redeem/points-earn 30-60/min, void/adjust 20/min, pin unlock 10/min.
- 1.2 Constant-time hmac.compare_digest for Apple Wallet auth token
  (pulled forward from Phase 9 — code is safe whenever Apple ships).

See app/modules/loyalty/docs/production-launch-plan.md for the full
launch plan and remaining phases.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 23:36:34 +02:00

734 lines
25 KiB
Python

# app/modules/loyalty/services/apple_wallet_service.py
"""
Apple Wallet service.
Handles Apple Wallet integration including:
- Generating .pkpass files
- Apple Web Service for device registration
- Push notifications for pass updates
"""
import hashlib
import hmac
import io
import json
import logging
import zipfile
from typing import Any
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
ApplePassGenerationException,
AppleWalletNotConfiguredException,
DeviceRegistrationException,
InvalidAppleAuthTokenException,
WalletIntegrationException,
)
from app.modules.loyalty.models import (
AppleDeviceRegistration,
LoyaltyCard,
LoyaltyProgram,
)
logger = logging.getLogger(__name__)
class AppleWalletService:
"""Service for Apple Wallet integration."""
@property
def is_configured(self) -> bool:
"""Check if Apple Wallet is configured."""
return bool(
config.apple_pass_type_id
and config.apple_team_id
and config.apple_wwdr_cert_path
and config.apple_signer_cert_path
and config.apple_signer_key_path
)
def validate_config(self) -> dict[str, Any]:
"""Validate Apple Wallet configuration."""
import os
result: dict[str, Any] = {
"configured": self.is_configured,
"pass_type_id": config.apple_pass_type_id,
"team_id": config.apple_team_id,
"credentials_valid": False,
"errors": [],
}
if not self.is_configured:
return result
for label, path in [
("WWDR certificate", config.apple_wwdr_cert_path),
("Signer certificate", config.apple_signer_cert_path),
("Signer key", config.apple_signer_key_path),
]:
if not os.path.isfile(path):
result["errors"].append(f"{label} not found: {path}")
if not result["errors"]:
result["credentials_valid"] = True
return result
# =========================================================================
# Auth Verification
# =========================================================================
def verify_auth_token(self, card: LoyaltyCard, authorization: str | None) -> None:
"""
Verify the Apple Wallet authorization token for a card.
Args:
card: Loyalty card
authorization: Authorization header value (e.g. "ApplePass <token>")
Raises:
InvalidAppleAuthTokenException: If token is missing or invalid
"""
auth_token = None
if authorization and authorization.startswith("ApplePass "):
auth_token = authorization.split(" ", 1)[1]
# Constant-time compare to avoid leaking the token via timing.
if (
not auth_token
or not card.apple_auth_token
or not hmac.compare_digest(auth_token, card.apple_auth_token)
):
raise InvalidAppleAuthTokenException()
def generate_pass_safe(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate an Apple Wallet pass, wrapping LoyaltyException into
ApplePassGenerationException.
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
Raises:
ApplePassGenerationException: If pass generation fails
"""
from app.modules.loyalty.exceptions import LoyaltyException
try:
return self.generate_pass(db, card)
except LoyaltyException as e:
logger.error(f"Failed to generate Apple pass for card {card.id}: {e}")
raise ApplePassGenerationException(card.id)
def get_device_registrations(self, db: Session, device_id: str) -> list:
"""
Get all device registrations for a device library identifier.
Args:
db: Database session
device_id: Device library identifier
Returns:
List of AppleDeviceRegistration objects
"""
return (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.device_library_identifier == device_id)
.all()
)
def get_updated_cards_for_device(
self,
db: Session,
device_id: str,
updated_since: str | None = None,
) -> list[LoyaltyCard] | None:
"""
Get cards registered to a device, optionally filtered by update time.
Args:
db: Database session
device_id: Device library identifier
updated_since: ISO timestamp to filter by
Returns:
List of LoyaltyCard objects, or None if no registrations found
"""
from datetime import datetime
registrations = self.get_device_registrations(db, device_id)
if not registrations:
return None
card_ids = [r.card_id for r in registrations]
query = db.query(LoyaltyCard).filter(LoyaltyCard.id.in_(card_ids))
if updated_since:
try:
since = datetime.fromisoformat(updated_since.replace("Z", "+00:00"))
query = query.filter(LoyaltyCard.updated_at > since)
except ValueError:
pass
cards = query.all()
return cards if cards else None
def register_device_safe(
self,
db: Session,
card: LoyaltyCard,
device_id: str,
push_token: str,
) -> None:
"""
Register a device, wrapping exceptions into DeviceRegistrationException.
Args:
db: Database session
card: Loyalty card
device_id: Device library identifier
push_token: Push token
Raises:
DeviceRegistrationException: If registration fails
"""
try:
self.register_device(db, card, device_id, push_token)
except Exception as e: # noqa: EXC003
logger.error(f"Failed to register device: {e}")
raise DeviceRegistrationException(device_id, "register")
def unregister_device_safe(
self,
db: Session,
card: LoyaltyCard,
device_id: str,
) -> None:
"""
Unregister a device, wrapping exceptions into DeviceRegistrationException.
Args:
db: Database session
card: Loyalty card
device_id: Device library identifier
Raises:
DeviceRegistrationException: If unregistration fails
"""
try:
self.unregister_device(db, card, device_id)
except Exception as e: # noqa: EXC003
logger.error(f"Failed to unregister device: {e}")
raise DeviceRegistrationException(device_id, "unregister")
# =========================================================================
# Pass Generation
# =========================================================================
def generate_pass(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate a .pkpass file for a loyalty card.
The .pkpass is a ZIP file containing:
- pass.json: Pass configuration
- icon.png, icon@2x.png: App icon
- logo.png, logo@2x.png: Logo on pass
- manifest.json: SHA-1 hashes of all files
- signature: PKCS#7 signature
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
"""
if not self.is_configured:
raise AppleWalletNotConfiguredException()
program = card.program
# Ensure serial number is set
if not card.apple_serial_number:
card.apple_serial_number = f"card_{card.id}_{card.qr_code_data[:8]}"
db.commit()
# Build pass.json
pass_data = self._build_pass_json(card, program)
# Create the pass package
pass_files = {
"pass.json": json.dumps(pass_data).encode("utf-8"),
}
# Add pass images (icon and logo)
pass_files["icon.png"] = self._get_icon_bytes(program)
pass_files["icon@2x.png"] = self._get_icon_bytes(program, scale=2)
pass_files["logo.png"] = self._get_logo_bytes(program)
pass_files["logo@2x.png"] = self._get_logo_bytes(program, scale=2)
# Create manifest
manifest = {}
for filename, content in pass_files.items():
manifest[filename] = hashlib.sha1(content).hexdigest() # noqa: SEC041
pass_files["manifest.json"] = json.dumps(manifest).encode("utf-8")
# Sign the manifest
try:
signature = self._sign_manifest(pass_files["manifest.json"])
pass_files["signature"] = signature
except (OSError, ValueError) as e:
logger.error(f"Failed to sign pass: {e}")
raise WalletIntegrationException("apple", f"Failed to sign pass: {e}")
# Create ZIP file
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for filename, content in pass_files.items():
zf.writestr(filename, content)
return buffer.getvalue()
def _build_pass_json(self, card: LoyaltyCard, program: LoyaltyProgram) -> dict[str, Any]:
"""Build the pass.json structure for a loyalty card."""
pass_data = {
"formatVersion": 1,
"passTypeIdentifier": config.apple_pass_type_id,
"serialNumber": card.apple_serial_number,
"teamIdentifier": config.apple_team_id,
"organizationName": program.display_name,
"description": f"{program.display_name} Loyalty Card",
"backgroundColor": self._hex_to_rgb(program.card_color),
"foregroundColor": "rgb(255, 255, 255)",
"labelColor": "rgb(255, 255, 255)",
"authenticationToken": card.apple_auth_token,
"webServiceURL": self._get_web_service_url(),
"barcode": {
"message": card.card_number.replace("-", ""),
"format": "PKBarcodeFormatCode128",
"messageEncoding": "iso-8859-1",
"altText": card.card_number,
},
"barcodes": [
{
"message": card.card_number.replace("-", ""),
"format": "PKBarcodeFormatCode128",
"messageEncoding": "iso-8859-1",
"altText": card.card_number,
},
{
"message": card.qr_code_data,
"format": "PKBarcodeFormatQR",
"messageEncoding": "iso-8859-1",
},
],
}
# Add loyalty-specific fields
if program.is_stamps_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "stamps",
"label": "STAMPS",
"value": f"{card.stamp_count}/{program.stamps_target}",
}
],
"primaryFields": [
{
"key": "reward",
"label": "NEXT REWARD",
"value": program.stamps_reward_description,
}
],
"secondaryFields": [
{
"key": "progress",
"label": "PROGRESS",
"value": f"{card.stamp_count} stamps collected",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalStamps",
"label": "Total Stamps Earned",
"value": str(card.total_stamps_earned),
},
{
"key": "redemptions",
"label": "Total Rewards",
"value": str(card.stamps_redeemed),
},
],
}
elif program.is_points_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "points",
"label": "POINTS",
"value": str(card.points_balance),
}
],
"primaryFields": [
{
"key": "balance",
"label": "BALANCE",
"value": f"{card.points_balance} points",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalPoints",
"label": "Total Points Earned",
"value": str(card.total_points_earned),
},
{
"key": "redeemed",
"label": "Points Redeemed",
"value": str(card.points_redeemed),
},
],
}
return pass_data
def _get_icon_bytes(self, program: LoyaltyProgram, scale: int = 1) -> bytes:
"""
Generate icon image for Apple Wallet pass.
Apple icon dimensions: 29x29 (@1x), 58x58 (@2x).
Uses program logo if available, otherwise generates a colored square
with the program initial.
"""
from PIL import Image, ImageDraw, ImageFont
size = 29 * scale
if program.logo_url:
import httpx
try:
resp = httpx.get(program.logo_url, timeout=10, follow_redirects=True)
resp.raise_for_status()
img = Image.open(io.BytesIO(resp.content))
img = img.convert("RGBA")
img = img.resize((size, size), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
except (httpx.HTTPError, OSError, ValueError):
logger.warning("Failed to fetch logo for icon, using fallback")
# Fallback: colored square with initial
hex_color = (program.card_color or "#4F46E5").lstrip("#")
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
img = Image.new("RGBA", (size, size), (r, g, b, 255))
draw = ImageDraw.Draw(img)
initial = (program.display_name or "L")[0].upper()
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", size // 2)
except OSError:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), initial, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(((size - tw) / 2, (size - th) / 2 - bbox[1]), initial, fill=(255, 255, 255, 255), font=font)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
def _get_logo_bytes(self, program: LoyaltyProgram, scale: int = 1) -> bytes:
"""
Generate logo image for Apple Wallet pass.
Apple logo dimensions: 160x50 (@1x), 320x100 (@2x).
Uses program logo if available, otherwise generates a colored rectangle
with the program initial.
"""
from PIL import Image, ImageDraw, ImageFont
width, height = 160 * scale, 50 * scale
if program.logo_url:
import httpx
try:
resp = httpx.get(program.logo_url, timeout=10, follow_redirects=True)
resp.raise_for_status()
img = Image.open(io.BytesIO(resp.content))
img = img.convert("RGBA")
# Fit within dimensions preserving aspect ratio
img.thumbnail((width, height), Image.LANCZOS)
# Center on transparent canvas
canvas = Image.new("RGBA", (width, height), (0, 0, 0, 0))
x = (width - img.width) // 2
y = (height - img.height) // 2
canvas.paste(img, (x, y))
buf = io.BytesIO()
canvas.save(buf, format="PNG")
return buf.getvalue()
except (httpx.HTTPError, OSError, ValueError):
logger.warning("Failed to fetch logo for pass logo, using fallback")
# Fallback: colored rectangle with initial
hex_color = (program.card_color or "#4F46E5").lstrip("#")
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
img = Image.new("RGBA", (width, height), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
initial = (program.display_name or "L")[0].upper()
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", height // 2)
except OSError:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), initial, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
# Draw initial centered
draw.text(((width - tw) / 2, (height - th) / 2 - bbox[1]), initial, fill=(r, g, b, 255), font=font)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
def _hex_to_rgb(self, hex_color: str) -> str:
"""Convert hex color to RGB format for Apple Wallet."""
hex_color = hex_color.lstrip("#")
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
return f"rgb({r}, {g}, {b})"
def _get_web_service_url(self) -> str:
"""Get the base URL for Apple Web Service endpoints."""
# This should be configured based on your deployment
# For now, return a placeholder
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/apple"
def _sign_manifest(self, manifest_data: bytes) -> bytes:
"""
Sign the manifest using PKCS#7.
This requires the Apple WWDR certificate and your
pass signing certificate and key.
"""
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7
# Load certificates
with open(config.apple_wwdr_cert_path, "rb") as f:
wwdr_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_cert_path, "rb") as f:
signer_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_key_path, "rb") as f:
signer_key = serialization.load_pem_private_key(f.read(), password=None)
# Create PKCS#7 signature
signature = (
pkcs7.PKCS7SignatureBuilder()
.set_data(manifest_data)
.add_signer(signer_cert, signer_key, hashes.SHA256())
.add_certificate(wwdr_cert)
.sign(serialization.Encoding.DER, [pkcs7.PKCS7Options.DetachedSignature])
)
return signature
except FileNotFoundError as e:
raise WalletIntegrationException("apple", f"Certificate file not found: {e}")
except (OSError, ValueError) as e:
raise WalletIntegrationException("apple", f"Failed to sign manifest: {e}")
# =========================================================================
# Pass URLs
# =========================================================================
def get_pass_url(self, card: LoyaltyCard) -> str:
"""Get the URL to download the .pkpass file."""
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/passes/apple/{card.apple_serial_number}.pkpass"
# =========================================================================
# Device Registration (Apple Web Service)
# =========================================================================
def register_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
push_token: str,
) -> None:
"""
Register a device for push notifications.
Called by Apple when user adds pass to their wallet.
"""
# Check if already registered
existing = (
db.query(AppleDeviceRegistration)
.filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
)
.first()
)
if existing:
# Update push token
existing.push_token = push_token
else:
# Create new registration
registration = AppleDeviceRegistration(
card_id=card.id,
device_library_identifier=device_library_id,
push_token=push_token,
)
db.add(registration)
db.commit()
logger.info(f"Registered device {device_library_id[:8]}... for card {card.id}")
def unregister_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
) -> None:
"""
Unregister a device.
Called by Apple when user removes pass from their wallet.
"""
db.query(AppleDeviceRegistration).filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
).delete()
db.commit()
logger.info(f"Unregistered device {device_library_id[:8]}... for card {card.id}")
def send_push_updates(self, db: Session, card: LoyaltyCard) -> None:
"""
Send push notifications to all registered devices for a card.
This tells Apple Wallet to fetch the updated pass.
"""
registrations = (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.card_id == card.id)
.all()
)
if not registrations:
return
# Send push notification to each device
for registration in registrations:
try:
self._send_push(registration.push_token)
except Exception as e: # noqa: EXC003
logger.warning(
f"Failed to send push to device {registration.device_library_identifier[:8]}...: {e}"
)
def _send_push(self, push_token: str) -> None:
"""
Send an empty push notification to trigger pass update.
Apple Wallet will call our web service to fetch the updated pass.
Uses APNs HTTP/2 API with certificate-based authentication.
"""
if not self.is_configured:
logger.debug("Apple Wallet not configured, skipping push")
return
import ssl
import httpx
# APNs endpoint (use sandbox for dev, production for prod)
from app.core.config import is_production
if is_production():
apns_host = "https://api.push.apple.com"
else:
apns_host = "https://api.sandbox.push.apple.com"
url = f"{apns_host}/3/device/{push_token}"
# Create SSL context with client certificate
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain(
certfile=config.apple_signer_cert_path,
keyfile=config.apple_signer_key_path,
)
# APNs requires empty payload for pass updates
headers = {
"apns-topic": config.apple_pass_type_id,
"apns-push-type": "background",
"apns-priority": "5",
}
try:
with httpx.Client(
http2=True,
verify=ssl_context,
timeout=10.0,
) as client:
response = client.post(
url,
headers=headers,
content=b"{}",
)
if response.status_code == 200:
logger.debug(
"APNs push sent to token %s...", push_token[:8]
)
elif response.status_code == 410:
logger.info(
"APNs token %s... is no longer valid (device unregistered)",
push_token[:8],
)
else:
logger.warning(
"APNs push failed for token %s...: %s %s",
push_token[:8],
response.status_code,
response.text,
)
except (httpx.HTTPError, ssl.SSLError, OSError) as exc:
logger.error("APNs push error for token %s...: %s", push_token[:8], exc)
# Singleton instance
apple_wallet_service = AppleWalletService()