Files
orion/app/modules/loyalty/services/apple_wallet_service.py
Samir Boulahtit 481deaa67d refactor: fix all 177 architecture validator warnings
- Replace 153 broad `except Exception` with specific types (SQLAlchemyError,
  TemplateError, OSError, SMTPException, ClientError, etc.) across 37 services
- Break catalog↔inventory circular dependency (IMPORT-004)
- Create 19 skeleton test files for MOD-024 coverage
- Exclude aggregator services from MOD-024 (false positives)
- Update test mocks to match narrowed exception types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 11:59:44 +01:00

549 lines
19 KiB
Python

# app/modules/loyalty/services/apple_wallet_service.py
"""
Apple Wallet service.
Handles Apple Wallet integration including:
- Generating .pkpass files
- Apple Web Service for device registration
- Push notifications for pass updates
"""
import hashlib
import io
import json
import logging
import zipfile
from typing import Any
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
ApplePassGenerationException,
AppleWalletNotConfiguredException,
DeviceRegistrationException,
InvalidAppleAuthTokenException,
WalletIntegrationException,
)
from app.modules.loyalty.models import (
AppleDeviceRegistration,
LoyaltyCard,
LoyaltyProgram,
)
logger = logging.getLogger(__name__)
class AppleWalletService:
"""Service for Apple Wallet integration."""
@property
def is_configured(self) -> bool:
"""Check if Apple Wallet is configured."""
return bool(
config.apple_pass_type_id
and config.apple_team_id
and config.apple_wwdr_cert_path
and config.apple_signer_cert_path
and config.apple_signer_key_path
)
# =========================================================================
# Auth Verification
# =========================================================================
def verify_auth_token(self, card: LoyaltyCard, authorization: str | None) -> None:
"""
Verify the Apple Wallet authorization token for a card.
Args:
card: Loyalty card
authorization: Authorization header value (e.g. "ApplePass <token>")
Raises:
InvalidAppleAuthTokenException: If token is missing or invalid
"""
auth_token = None
if authorization and authorization.startswith("ApplePass "):
auth_token = authorization.split(" ", 1)[1]
if not auth_token or auth_token != card.apple_auth_token:
raise InvalidAppleAuthTokenException()
def generate_pass_safe(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate an Apple Wallet pass, wrapping LoyaltyException into
ApplePassGenerationException.
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
Raises:
ApplePassGenerationException: If pass generation fails
"""
from app.modules.loyalty.exceptions import LoyaltyException
try:
return self.generate_pass(db, card)
except LoyaltyException as e:
logger.error(f"Failed to generate Apple pass for card {card.id}: {e}")
raise ApplePassGenerationException(card.id)
def get_device_registrations(self, db: Session, device_id: str) -> list:
"""
Get all device registrations for a device library identifier.
Args:
db: Database session
device_id: Device library identifier
Returns:
List of AppleDeviceRegistration objects
"""
return (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.device_library_identifier == device_id)
.all()
)
def get_updated_cards_for_device(
self,
db: Session,
device_id: str,
updated_since: str | None = None,
) -> list[LoyaltyCard] | None:
"""
Get cards registered to a device, optionally filtered by update time.
Args:
db: Database session
device_id: Device library identifier
updated_since: ISO timestamp to filter by
Returns:
List of LoyaltyCard objects, or None if no registrations found
"""
from datetime import datetime
registrations = self.get_device_registrations(db, device_id)
if not registrations:
return None
card_ids = [r.card_id for r in registrations]
query = db.query(LoyaltyCard).filter(LoyaltyCard.id.in_(card_ids))
if updated_since:
try:
since = datetime.fromisoformat(updated_since.replace("Z", "+00:00"))
query = query.filter(LoyaltyCard.updated_at > since)
except ValueError:
pass
cards = query.all()
return cards if cards else None
def register_device_safe(
self,
db: Session,
card: LoyaltyCard,
device_id: str,
push_token: str,
) -> None:
"""
Register a device, wrapping exceptions into DeviceRegistrationException.
Args:
db: Database session
card: Loyalty card
device_id: Device library identifier
push_token: Push token
Raises:
DeviceRegistrationException: If registration fails
"""
try:
self.register_device(db, card, device_id, push_token)
except Exception as e: # noqa: EXC-003
logger.error(f"Failed to register device: {e}")
raise DeviceRegistrationException(device_id, "register")
def unregister_device_safe(
self,
db: Session,
card: LoyaltyCard,
device_id: str,
) -> None:
"""
Unregister a device, wrapping exceptions into DeviceRegistrationException.
Args:
db: Database session
card: Loyalty card
device_id: Device library identifier
Raises:
DeviceRegistrationException: If unregistration fails
"""
try:
self.unregister_device(db, card, device_id)
except Exception as e: # noqa: EXC-003
logger.error(f"Failed to unregister device: {e}")
raise DeviceRegistrationException(device_id, "unregister")
# =========================================================================
# Pass Generation
# =========================================================================
def generate_pass(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate a .pkpass file for a loyalty card.
The .pkpass is a ZIP file containing:
- pass.json: Pass configuration
- icon.png, icon@2x.png: App icon
- logo.png, logo@2x.png: Logo on pass
- manifest.json: SHA-1 hashes of all files
- signature: PKCS#7 signature
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
"""
if not self.is_configured:
raise AppleWalletNotConfiguredException()
program = card.program
# Ensure serial number is set
if not card.apple_serial_number:
card.apple_serial_number = f"card_{card.id}_{card.qr_code_data[:8]}"
db.commit()
# Build pass.json
pass_data = self._build_pass_json(card, program)
# Create the pass package
pass_files = {
"pass.json": json.dumps(pass_data).encode("utf-8"),
}
# Add placeholder images (in production, these would be actual images)
# For now, we'll skip images and use the pass.json only
# pass_files["icon.png"] = self._get_icon_bytes(program)
# pass_files["icon@2x.png"] = self._get_icon_bytes(program, scale=2)
# pass_files["logo.png"] = self._get_logo_bytes(program)
# pass_files["logo@2x.png"] = self._get_logo_bytes(program, scale=2)
# Create manifest
manifest = {}
for filename, content in pass_files.items():
manifest[filename] = hashlib.sha1(content).hexdigest()
pass_files["manifest.json"] = json.dumps(manifest).encode("utf-8")
# Sign the manifest
try:
signature = self._sign_manifest(pass_files["manifest.json"])
pass_files["signature"] = signature
except (OSError, ValueError) as e:
logger.error(f"Failed to sign pass: {e}")
raise WalletIntegrationException("apple", f"Failed to sign pass: {e}")
# Create ZIP file
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for filename, content in pass_files.items():
zf.writestr(filename, content)
return buffer.getvalue()
def _build_pass_json(self, card: LoyaltyCard, program: LoyaltyProgram) -> dict[str, Any]:
"""Build the pass.json structure for a loyalty card."""
pass_data = {
"formatVersion": 1,
"passTypeIdentifier": config.apple_pass_type_id,
"serialNumber": card.apple_serial_number,
"teamIdentifier": config.apple_team_id,
"organizationName": program.display_name,
"description": f"{program.display_name} Loyalty Card",
"backgroundColor": self._hex_to_rgb(program.card_color),
"foregroundColor": "rgb(255, 255, 255)",
"labelColor": "rgb(255, 255, 255)",
"authenticationToken": card.apple_auth_token,
"webServiceURL": self._get_web_service_url(),
"barcode": {
"message": card.card_number.replace("-", ""),
"format": "PKBarcodeFormatCode128",
"messageEncoding": "iso-8859-1",
"altText": card.card_number,
},
"barcodes": [
{
"message": card.card_number.replace("-", ""),
"format": "PKBarcodeFormatCode128",
"messageEncoding": "iso-8859-1",
"altText": card.card_number,
},
{
"message": card.qr_code_data,
"format": "PKBarcodeFormatQR",
"messageEncoding": "iso-8859-1",
},
],
}
# Add loyalty-specific fields
if program.is_stamps_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "stamps",
"label": "STAMPS",
"value": f"{card.stamp_count}/{program.stamps_target}",
}
],
"primaryFields": [
{
"key": "reward",
"label": "NEXT REWARD",
"value": program.stamps_reward_description,
}
],
"secondaryFields": [
{
"key": "progress",
"label": "PROGRESS",
"value": f"{card.stamp_count} stamps collected",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalStamps",
"label": "Total Stamps Earned",
"value": str(card.total_stamps_earned),
},
{
"key": "redemptions",
"label": "Total Rewards",
"value": str(card.stamps_redeemed),
},
],
}
elif program.is_points_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "points",
"label": "POINTS",
"value": str(card.points_balance),
}
],
"primaryFields": [
{
"key": "balance",
"label": "BALANCE",
"value": f"{card.points_balance} points",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalPoints",
"label": "Total Points Earned",
"value": str(card.total_points_earned),
},
{
"key": "redeemed",
"label": "Points Redeemed",
"value": str(card.points_redeemed),
},
],
}
return pass_data
def _hex_to_rgb(self, hex_color: str) -> str:
"""Convert hex color to RGB format for Apple Wallet."""
hex_color = hex_color.lstrip("#")
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
return f"rgb({r}, {g}, {b})"
def _get_web_service_url(self) -> str:
"""Get the base URL for Apple Web Service endpoints."""
# This should be configured based on your deployment
# For now, return a placeholder
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/apple"
def _sign_manifest(self, manifest_data: bytes) -> bytes:
"""
Sign the manifest using PKCS#7.
This requires the Apple WWDR certificate and your
pass signing certificate and key.
"""
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7
# Load certificates
with open(config.apple_wwdr_cert_path, "rb") as f:
wwdr_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_cert_path, "rb") as f:
signer_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_key_path, "rb") as f:
signer_key = serialization.load_pem_private_key(f.read(), password=None)
# Create PKCS#7 signature
signature = (
pkcs7.PKCS7SignatureBuilder()
.set_data(manifest_data)
.add_signer(signer_cert, signer_key, hashes.SHA256())
.add_certificate(wwdr_cert)
.sign(serialization.Encoding.DER, [pkcs7.PKCS7Options.DetachedSignature])
)
return signature
except FileNotFoundError as e:
raise WalletIntegrationException("apple", f"Certificate file not found: {e}")
except (OSError, ValueError) as e:
raise WalletIntegrationException("apple", f"Failed to sign manifest: {e}")
# =========================================================================
# Pass URLs
# =========================================================================
def get_pass_url(self, card: LoyaltyCard) -> str:
"""Get the URL to download the .pkpass file."""
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/passes/apple/{card.apple_serial_number}.pkpass"
# =========================================================================
# Device Registration (Apple Web Service)
# =========================================================================
def register_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
push_token: str,
) -> None:
"""
Register a device for push notifications.
Called by Apple when user adds pass to their wallet.
"""
# Check if already registered
existing = (
db.query(AppleDeviceRegistration)
.filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
)
.first()
)
if existing:
# Update push token
existing.push_token = push_token
else:
# Create new registration
registration = AppleDeviceRegistration(
card_id=card.id,
device_library_identifier=device_library_id,
push_token=push_token,
)
db.add(registration)
db.commit()
logger.info(f"Registered device {device_library_id[:8]}... for card {card.id}")
def unregister_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
) -> None:
"""
Unregister a device.
Called by Apple when user removes pass from their wallet.
"""
db.query(AppleDeviceRegistration).filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
).delete()
db.commit()
logger.info(f"Unregistered device {device_library_id[:8]}... for card {card.id}")
def send_push_updates(self, db: Session, card: LoyaltyCard) -> None:
"""
Send push notifications to all registered devices for a card.
This tells Apple Wallet to fetch the updated pass.
"""
registrations = (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.card_id == card.id)
.all()
)
if not registrations:
return
# Send push notification to each device
for registration in registrations:
try:
self._send_push(registration.push_token)
except Exception as e: # noqa: EXC-003
logger.warning(
f"Failed to send push to device {registration.device_library_identifier[:8]}...: {e}"
)
def _send_push(self, push_token: str) -> None:
"""
Send an empty push notification to trigger pass update.
Apple Wallet will then call our web service to fetch the updated pass.
"""
# This would use APNs to send the push notification
# For now, we'll log and skip the actual push
logger.debug(f"Would send push to token {push_token[:8]}...")
# In production, you would use something like:
# from apns2.client import APNsClient
# from apns2.payload import Payload
# client = APNsClient(config.apple_signer_cert_path, use_sandbox=True)
# payload = Payload()
# client.send_notification(push_token, payload, "pass.com.example.loyalty")
# Singleton instance
apple_wallet_service = AppleWalletService()