Files
orion/app/modules/loyalty/services/apple_wallet_service.py
Samir Boulahtit 8c8975239a feat(loyalty): fix Google Wallet integration and improve enrollment flow
- Fix Google Wallet class creation: add required issuerName field (merchant name),
  programLogo with default logo fallback, hexBackgroundColor default
- Add default loyalty logo assets (200px + 512px) for programs without custom logos
- Smart retry: skip retries on 400/401/403/404 client errors (not transient)
- Fix enrollment success page: use sessionStorage for wallet URLs instead of
  authenticated API call (self-enrolled customers have no session)
- Hide wallet section on success page when no wallet URLs available
- Wire up T&C modal on enrollment page with program.terms_text
- Add startup validation for Google/Apple Wallet configs in lifespan
- Add admin wallet status dashboard endpoint and UI (moved to service layer)
- Fix Apple Wallet push notifications with real APNs HTTP/2 implementation
- Fix docs: correct enrollment URLs (port, path segments, /v1 prefix)
- Fix test assertion: !loyalty-enroll! → !enrollment!

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 17:32:55 +01:00

728 lines
25 KiB
Python

# app/modules/loyalty/services/apple_wallet_service.py
"""
Apple Wallet service.
Handles Apple Wallet integration including:
- Generating .pkpass files
- Apple Web Service for device registration
- Push notifications for pass updates
"""
import hashlib
import io
import json
import logging
import zipfile
from typing import Any
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
ApplePassGenerationException,
AppleWalletNotConfiguredException,
DeviceRegistrationException,
InvalidAppleAuthTokenException,
WalletIntegrationException,
)
from app.modules.loyalty.models import (
AppleDeviceRegistration,
LoyaltyCard,
LoyaltyProgram,
)
logger = logging.getLogger(__name__)
class AppleWalletService:
"""Service for Apple Wallet integration."""
@property
def is_configured(self) -> bool:
"""Check if Apple Wallet is configured."""
return bool(
config.apple_pass_type_id
and config.apple_team_id
and config.apple_wwdr_cert_path
and config.apple_signer_cert_path
and config.apple_signer_key_path
)
def validate_config(self) -> dict[str, Any]:
"""Validate Apple Wallet configuration."""
import os
result: dict[str, Any] = {
"configured": self.is_configured,
"pass_type_id": config.apple_pass_type_id,
"team_id": config.apple_team_id,
"credentials_valid": False,
"errors": [],
}
if not self.is_configured:
return result
for label, path in [
("WWDR certificate", config.apple_wwdr_cert_path),
("Signer certificate", config.apple_signer_cert_path),
("Signer key", config.apple_signer_key_path),
]:
if not os.path.isfile(path):
result["errors"].append(f"{label} not found: {path}")
if not result["errors"]:
result["credentials_valid"] = True
return result
# =========================================================================
# Auth Verification
# =========================================================================
def verify_auth_token(self, card: LoyaltyCard, authorization: str | None) -> None:
"""
Verify the Apple Wallet authorization token for a card.
Args:
card: Loyalty card
authorization: Authorization header value (e.g. "ApplePass <token>")
Raises:
InvalidAppleAuthTokenException: If token is missing or invalid
"""
auth_token = None
if authorization and authorization.startswith("ApplePass "):
auth_token = authorization.split(" ", 1)[1]
if not auth_token or auth_token != card.apple_auth_token:
raise InvalidAppleAuthTokenException()
def generate_pass_safe(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate an Apple Wallet pass, wrapping LoyaltyException into
ApplePassGenerationException.
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
Raises:
ApplePassGenerationException: If pass generation fails
"""
from app.modules.loyalty.exceptions import LoyaltyException
try:
return self.generate_pass(db, card)
except LoyaltyException as e:
logger.error(f"Failed to generate Apple pass for card {card.id}: {e}")
raise ApplePassGenerationException(card.id)
def get_device_registrations(self, db: Session, device_id: str) -> list:
"""
Get all device registrations for a device library identifier.
Args:
db: Database session
device_id: Device library identifier
Returns:
List of AppleDeviceRegistration objects
"""
return (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.device_library_identifier == device_id)
.all()
)
def get_updated_cards_for_device(
self,
db: Session,
device_id: str,
updated_since: str | None = None,
) -> list[LoyaltyCard] | None:
"""
Get cards registered to a device, optionally filtered by update time.
Args:
db: Database session
device_id: Device library identifier
updated_since: ISO timestamp to filter by
Returns:
List of LoyaltyCard objects, or None if no registrations found
"""
from datetime import datetime
registrations = self.get_device_registrations(db, device_id)
if not registrations:
return None
card_ids = [r.card_id for r in registrations]
query = db.query(LoyaltyCard).filter(LoyaltyCard.id.in_(card_ids))
if updated_since:
try:
since = datetime.fromisoformat(updated_since.replace("Z", "+00:00"))
query = query.filter(LoyaltyCard.updated_at > since)
except ValueError:
pass
cards = query.all()
return cards if cards else None
def register_device_safe(
self,
db: Session,
card: LoyaltyCard,
device_id: str,
push_token: str,
) -> None:
"""
Register a device, wrapping exceptions into DeviceRegistrationException.
Args:
db: Database session
card: Loyalty card
device_id: Device library identifier
push_token: Push token
Raises:
DeviceRegistrationException: If registration fails
"""
try:
self.register_device(db, card, device_id, push_token)
except Exception as e: # noqa: EXC003
logger.error(f"Failed to register device: {e}")
raise DeviceRegistrationException(device_id, "register")
def unregister_device_safe(
self,
db: Session,
card: LoyaltyCard,
device_id: str,
) -> None:
"""
Unregister a device, wrapping exceptions into DeviceRegistrationException.
Args:
db: Database session
card: Loyalty card
device_id: Device library identifier
Raises:
DeviceRegistrationException: If unregistration fails
"""
try:
self.unregister_device(db, card, device_id)
except Exception as e: # noqa: EXC003
logger.error(f"Failed to unregister device: {e}")
raise DeviceRegistrationException(device_id, "unregister")
# =========================================================================
# Pass Generation
# =========================================================================
def generate_pass(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate a .pkpass file for a loyalty card.
The .pkpass is a ZIP file containing:
- pass.json: Pass configuration
- icon.png, icon@2x.png: App icon
- logo.png, logo@2x.png: Logo on pass
- manifest.json: SHA-1 hashes of all files
- signature: PKCS#7 signature
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
"""
if not self.is_configured:
raise AppleWalletNotConfiguredException()
program = card.program
# Ensure serial number is set
if not card.apple_serial_number:
card.apple_serial_number = f"card_{card.id}_{card.qr_code_data[:8]}"
db.commit()
# Build pass.json
pass_data = self._build_pass_json(card, program)
# Create the pass package
pass_files = {
"pass.json": json.dumps(pass_data).encode("utf-8"),
}
# Add pass images (icon and logo)
pass_files["icon.png"] = self._get_icon_bytes(program)
pass_files["icon@2x.png"] = self._get_icon_bytes(program, scale=2)
pass_files["logo.png"] = self._get_logo_bytes(program)
pass_files["logo@2x.png"] = self._get_logo_bytes(program, scale=2)
# Create manifest
manifest = {}
for filename, content in pass_files.items():
manifest[filename] = hashlib.sha1(content).hexdigest() # noqa: SEC041
pass_files["manifest.json"] = json.dumps(manifest).encode("utf-8")
# Sign the manifest
try:
signature = self._sign_manifest(pass_files["manifest.json"])
pass_files["signature"] = signature
except (OSError, ValueError) as e:
logger.error(f"Failed to sign pass: {e}")
raise WalletIntegrationException("apple", f"Failed to sign pass: {e}")
# Create ZIP file
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for filename, content in pass_files.items():
zf.writestr(filename, content)
return buffer.getvalue()
def _build_pass_json(self, card: LoyaltyCard, program: LoyaltyProgram) -> dict[str, Any]:
"""Build the pass.json structure for a loyalty card."""
pass_data = {
"formatVersion": 1,
"passTypeIdentifier": config.apple_pass_type_id,
"serialNumber": card.apple_serial_number,
"teamIdentifier": config.apple_team_id,
"organizationName": program.display_name,
"description": f"{program.display_name} Loyalty Card",
"backgroundColor": self._hex_to_rgb(program.card_color),
"foregroundColor": "rgb(255, 255, 255)",
"labelColor": "rgb(255, 255, 255)",
"authenticationToken": card.apple_auth_token,
"webServiceURL": self._get_web_service_url(),
"barcode": {
"message": card.card_number.replace("-", ""),
"format": "PKBarcodeFormatCode128",
"messageEncoding": "iso-8859-1",
"altText": card.card_number,
},
"barcodes": [
{
"message": card.card_number.replace("-", ""),
"format": "PKBarcodeFormatCode128",
"messageEncoding": "iso-8859-1",
"altText": card.card_number,
},
{
"message": card.qr_code_data,
"format": "PKBarcodeFormatQR",
"messageEncoding": "iso-8859-1",
},
],
}
# Add loyalty-specific fields
if program.is_stamps_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "stamps",
"label": "STAMPS",
"value": f"{card.stamp_count}/{program.stamps_target}",
}
],
"primaryFields": [
{
"key": "reward",
"label": "NEXT REWARD",
"value": program.stamps_reward_description,
}
],
"secondaryFields": [
{
"key": "progress",
"label": "PROGRESS",
"value": f"{card.stamp_count} stamps collected",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalStamps",
"label": "Total Stamps Earned",
"value": str(card.total_stamps_earned),
},
{
"key": "redemptions",
"label": "Total Rewards",
"value": str(card.stamps_redeemed),
},
],
}
elif program.is_points_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "points",
"label": "POINTS",
"value": str(card.points_balance),
}
],
"primaryFields": [
{
"key": "balance",
"label": "BALANCE",
"value": f"{card.points_balance} points",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalPoints",
"label": "Total Points Earned",
"value": str(card.total_points_earned),
},
{
"key": "redeemed",
"label": "Points Redeemed",
"value": str(card.points_redeemed),
},
],
}
return pass_data
def _get_icon_bytes(self, program: LoyaltyProgram, scale: int = 1) -> bytes:
"""
Generate icon image for Apple Wallet pass.
Apple icon dimensions: 29x29 (@1x), 58x58 (@2x).
Uses program logo if available, otherwise generates a colored square
with the program initial.
"""
from PIL import Image, ImageDraw, ImageFont
size = 29 * scale
if program.logo_url:
try:
import httpx
resp = httpx.get(program.logo_url, timeout=10, follow_redirects=True)
resp.raise_for_status()
img = Image.open(io.BytesIO(resp.content))
img = img.convert("RGBA")
img = img.resize((size, size), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
except Exception:
logger.warning("Failed to fetch logo for icon, using fallback")
# Fallback: colored square with initial
hex_color = (program.card_color or "#4F46E5").lstrip("#")
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
img = Image.new("RGBA", (size, size), (r, g, b, 255))
draw = ImageDraw.Draw(img)
initial = (program.display_name or "L")[0].upper()
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", size // 2)
except OSError:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), initial, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(((size - tw) / 2, (size - th) / 2 - bbox[1]), initial, fill=(255, 255, 255, 255), font=font)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
def _get_logo_bytes(self, program: LoyaltyProgram, scale: int = 1) -> bytes:
"""
Generate logo image for Apple Wallet pass.
Apple logo dimensions: 160x50 (@1x), 320x100 (@2x).
Uses program logo if available, otherwise generates a colored rectangle
with the program initial.
"""
from PIL import Image, ImageDraw, ImageFont
width, height = 160 * scale, 50 * scale
if program.logo_url:
try:
import httpx
resp = httpx.get(program.logo_url, timeout=10, follow_redirects=True)
resp.raise_for_status()
img = Image.open(io.BytesIO(resp.content))
img = img.convert("RGBA")
# Fit within dimensions preserving aspect ratio
img.thumbnail((width, height), Image.LANCZOS)
# Center on transparent canvas
canvas = Image.new("RGBA", (width, height), (0, 0, 0, 0))
x = (width - img.width) // 2
y = (height - img.height) // 2
canvas.paste(img, (x, y))
buf = io.BytesIO()
canvas.save(buf, format="PNG")
return buf.getvalue()
except Exception:
logger.warning("Failed to fetch logo for pass logo, using fallback")
# Fallback: colored rectangle with initial
hex_color = (program.card_color or "#4F46E5").lstrip("#")
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
img = Image.new("RGBA", (width, height), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
initial = (program.display_name or "L")[0].upper()
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", height // 2)
except OSError:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), initial, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
# Draw initial centered
draw.text(((width - tw) / 2, (height - th) / 2 - bbox[1]), initial, fill=(r, g, b, 255), font=font)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
def _hex_to_rgb(self, hex_color: str) -> str:
"""Convert hex color to RGB format for Apple Wallet."""
hex_color = hex_color.lstrip("#")
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
return f"rgb({r}, {g}, {b})"
def _get_web_service_url(self) -> str:
"""Get the base URL for Apple Web Service endpoints."""
# This should be configured based on your deployment
# For now, return a placeholder
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/apple"
def _sign_manifest(self, manifest_data: bytes) -> bytes:
"""
Sign the manifest using PKCS#7.
This requires the Apple WWDR certificate and your
pass signing certificate and key.
"""
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7
# Load certificates
with open(config.apple_wwdr_cert_path, "rb") as f:
wwdr_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_cert_path, "rb") as f:
signer_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_key_path, "rb") as f:
signer_key = serialization.load_pem_private_key(f.read(), password=None)
# Create PKCS#7 signature
signature = (
pkcs7.PKCS7SignatureBuilder()
.set_data(manifest_data)
.add_signer(signer_cert, signer_key, hashes.SHA256())
.add_certificate(wwdr_cert)
.sign(serialization.Encoding.DER, [pkcs7.PKCS7Options.DetachedSignature])
)
return signature
except FileNotFoundError as e:
raise WalletIntegrationException("apple", f"Certificate file not found: {e}")
except (OSError, ValueError) as e:
raise WalletIntegrationException("apple", f"Failed to sign manifest: {e}")
# =========================================================================
# Pass URLs
# =========================================================================
def get_pass_url(self, card: LoyaltyCard) -> str:
"""Get the URL to download the .pkpass file."""
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/passes/apple/{card.apple_serial_number}.pkpass"
# =========================================================================
# Device Registration (Apple Web Service)
# =========================================================================
def register_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
push_token: str,
) -> None:
"""
Register a device for push notifications.
Called by Apple when user adds pass to their wallet.
"""
# Check if already registered
existing = (
db.query(AppleDeviceRegistration)
.filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
)
.first()
)
if existing:
# Update push token
existing.push_token = push_token
else:
# Create new registration
registration = AppleDeviceRegistration(
card_id=card.id,
device_library_identifier=device_library_id,
push_token=push_token,
)
db.add(registration)
db.commit()
logger.info(f"Registered device {device_library_id[:8]}... for card {card.id}")
def unregister_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
) -> None:
"""
Unregister a device.
Called by Apple when user removes pass from their wallet.
"""
db.query(AppleDeviceRegistration).filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
).delete()
db.commit()
logger.info(f"Unregistered device {device_library_id[:8]}... for card {card.id}")
def send_push_updates(self, db: Session, card: LoyaltyCard) -> None:
"""
Send push notifications to all registered devices for a card.
This tells Apple Wallet to fetch the updated pass.
"""
registrations = (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.card_id == card.id)
.all()
)
if not registrations:
return
# Send push notification to each device
for registration in registrations:
try:
self._send_push(registration.push_token)
except Exception as e: # noqa: EXC003
logger.warning(
f"Failed to send push to device {registration.device_library_identifier[:8]}...: {e}"
)
def _send_push(self, push_token: str) -> None:
"""
Send an empty push notification to trigger pass update.
Apple Wallet will call our web service to fetch the updated pass.
Uses APNs HTTP/2 API with certificate-based authentication.
"""
if not self.is_configured:
logger.debug("Apple Wallet not configured, skipping push")
return
import ssl
import httpx
# APNs endpoint (use sandbox for dev, production for prod)
from app.core.config import is_production
if is_production():
apns_host = "https://api.push.apple.com"
else:
apns_host = "https://api.sandbox.push.apple.com"
url = f"{apns_host}/3/device/{push_token}"
# Create SSL context with client certificate
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain(
certfile=config.apple_signer_cert_path,
keyfile=config.apple_signer_key_path,
)
# APNs requires empty payload for pass updates
headers = {
"apns-topic": config.apple_pass_type_id,
"apns-push-type": "background",
"apns-priority": "5",
}
try:
with httpx.Client(
http2=True,
verify=ssl_context,
timeout=10.0,
) as client:
response = client.post(
url,
headers=headers,
content=b"{}",
)
if response.status_code == 200:
logger.debug(
"APNs push sent to token %s...", push_token[:8]
)
elif response.status_code == 410:
logger.info(
"APNs token %s... is no longer valid (device unregistered)",
push_token[:8],
)
else:
logger.warning(
"APNs push failed for token %s...: %s %s",
push_token[:8],
response.status_code,
response.text,
)
except Exception as exc: # noqa: BLE001
logger.error("APNs push error for token %s...: %s", push_token[:8], exc)
# Singleton instance
apple_wallet_service = AppleWalletService()