# app/modules/loyalty/services/apple_wallet_service.py """ Apple Wallet service. Handles Apple Wallet integration including: - Generating .pkpass files - Apple Web Service for device registration - Push notifications for pass updates """ import hashlib import hmac import io import json import logging import zipfile from typing import Any from sqlalchemy.orm import Session from app.modules.loyalty.config import config from app.modules.loyalty.exceptions import ( ApplePassGenerationException, AppleWalletNotConfiguredException, DeviceRegistrationException, InvalidAppleAuthTokenException, WalletIntegrationException, ) from app.modules.loyalty.models import ( AppleDeviceRegistration, LoyaltyCard, LoyaltyProgram, ) logger = logging.getLogger(__name__) class AppleWalletService: """Service for Apple Wallet integration.""" @property def is_configured(self) -> bool: """Check if Apple Wallet is configured.""" return bool( config.apple_pass_type_id and config.apple_team_id and config.apple_wwdr_cert_path and config.apple_signer_cert_path and config.apple_signer_key_path ) def validate_config(self) -> dict[str, Any]: """Validate Apple Wallet configuration.""" import os result: dict[str, Any] = { "configured": self.is_configured, "pass_type_id": config.apple_pass_type_id, "team_id": config.apple_team_id, "credentials_valid": False, "errors": [], } if not self.is_configured: return result for label, path in [ ("WWDR certificate", config.apple_wwdr_cert_path), ("Signer certificate", config.apple_signer_cert_path), ("Signer key", config.apple_signer_key_path), ]: if not os.path.isfile(path): result["errors"].append(f"{label} not found: {path}") if not result["errors"]: result["credentials_valid"] = True return result # ========================================================================= # Auth Verification # ========================================================================= def verify_auth_token(self, card: LoyaltyCard, authorization: str | None) -> None: """ Verify the Apple Wallet authorization token for a card. Args: card: Loyalty card authorization: Authorization header value (e.g. "ApplePass ") Raises: InvalidAppleAuthTokenException: If token is missing or invalid """ auth_token = None if authorization and authorization.startswith("ApplePass "): auth_token = authorization.split(" ", 1)[1] # Constant-time compare to avoid leaking the token via timing. if ( not auth_token or not card.apple_auth_token or not hmac.compare_digest(auth_token, card.apple_auth_token) ): raise InvalidAppleAuthTokenException() def generate_pass_safe(self, db: Session, card: LoyaltyCard) -> bytes: """ Generate an Apple Wallet pass, wrapping LoyaltyException into ApplePassGenerationException. Args: db: Database session card: Loyalty card Returns: Bytes of the .pkpass file Raises: ApplePassGenerationException: If pass generation fails """ from app.modules.loyalty.exceptions import LoyaltyException try: return self.generate_pass(db, card) except LoyaltyException as e: logger.error(f"Failed to generate Apple pass for card {card.id}: {e}") raise ApplePassGenerationException(card.id) def get_device_registrations(self, db: Session, device_id: str) -> list: """ Get all device registrations for a device library identifier. Args: db: Database session device_id: Device library identifier Returns: List of AppleDeviceRegistration objects """ return ( db.query(AppleDeviceRegistration) .filter(AppleDeviceRegistration.device_library_identifier == device_id) .all() ) def get_updated_cards_for_device( self, db: Session, device_id: str, updated_since: str | None = None, ) -> list[LoyaltyCard] | None: """ Get cards registered to a device, optionally filtered by update time. Args: db: Database session device_id: Device library identifier updated_since: ISO timestamp to filter by Returns: List of LoyaltyCard objects, or None if no registrations found """ from datetime import datetime registrations = self.get_device_registrations(db, device_id) if not registrations: return None card_ids = [r.card_id for r in registrations] query = db.query(LoyaltyCard).filter(LoyaltyCard.id.in_(card_ids)) if updated_since: try: since = datetime.fromisoformat(updated_since.replace("Z", "+00:00")) query = query.filter(LoyaltyCard.updated_at > since) except ValueError: pass cards = query.all() return cards if cards else None def register_device_safe( self, db: Session, card: LoyaltyCard, device_id: str, push_token: str, ) -> None: """ Register a device, wrapping exceptions into DeviceRegistrationException. Args: db: Database session card: Loyalty card device_id: Device library identifier push_token: Push token Raises: DeviceRegistrationException: If registration fails """ try: self.register_device(db, card, device_id, push_token) except Exception as e: # noqa: EXC003 logger.error(f"Failed to register device: {e}") raise DeviceRegistrationException(device_id, "register") def unregister_device_safe( self, db: Session, card: LoyaltyCard, device_id: str, ) -> None: """ Unregister a device, wrapping exceptions into DeviceRegistrationException. Args: db: Database session card: Loyalty card device_id: Device library identifier Raises: DeviceRegistrationException: If unregistration fails """ try: self.unregister_device(db, card, device_id) except Exception as e: # noqa: EXC003 logger.error(f"Failed to unregister device: {e}") raise DeviceRegistrationException(device_id, "unregister") # ========================================================================= # Pass Generation # ========================================================================= def generate_pass(self, db: Session, card: LoyaltyCard) -> bytes: """ Generate a .pkpass file for a loyalty card. The .pkpass is a ZIP file containing: - pass.json: Pass configuration - icon.png, icon@2x.png: App icon - logo.png, logo@2x.png: Logo on pass - manifest.json: SHA-1 hashes of all files - signature: PKCS#7 signature Args: db: Database session card: Loyalty card Returns: Bytes of the .pkpass file """ if not self.is_configured: raise AppleWalletNotConfiguredException() program = card.program # Ensure serial number is set if not card.apple_serial_number: card.apple_serial_number = f"card_{card.id}_{card.qr_code_data[:8]}" db.commit() # Build pass.json pass_data = self._build_pass_json(card, program) # Create the pass package pass_files = { "pass.json": json.dumps(pass_data).encode("utf-8"), } # Add pass images (icon and logo) pass_files["icon.png"] = self._get_icon_bytes(program) pass_files["icon@2x.png"] = self._get_icon_bytes(program, scale=2) pass_files["logo.png"] = self._get_logo_bytes(program) pass_files["logo@2x.png"] = self._get_logo_bytes(program, scale=2) # Create manifest manifest = {} for filename, content in pass_files.items(): manifest[filename] = hashlib.sha1(content).hexdigest() # noqa: SEC041 pass_files["manifest.json"] = json.dumps(manifest).encode("utf-8") # Sign the manifest try: signature = self._sign_manifest(pass_files["manifest.json"]) pass_files["signature"] = signature except (OSError, ValueError) as e: logger.error(f"Failed to sign pass: {e}") raise WalletIntegrationException("apple", f"Failed to sign pass: {e}") # Create ZIP file buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf: for filename, content in pass_files.items(): zf.writestr(filename, content) return buffer.getvalue() def _build_pass_json(self, card: LoyaltyCard, program: LoyaltyProgram) -> dict[str, Any]: """Build the pass.json structure for a loyalty card.""" pass_data = { "formatVersion": 1, "passTypeIdentifier": config.apple_pass_type_id, "serialNumber": card.apple_serial_number, "teamIdentifier": config.apple_team_id, "organizationName": program.display_name, "description": f"{program.display_name} Loyalty Card", "backgroundColor": self._hex_to_rgb(program.card_color), "foregroundColor": "rgb(255, 255, 255)", "labelColor": "rgb(255, 255, 255)", "authenticationToken": card.apple_auth_token, "webServiceURL": self._get_web_service_url(), "barcode": { "message": card.card_number.replace("-", ""), "format": "PKBarcodeFormatCode128", "messageEncoding": "iso-8859-1", "altText": card.card_number, }, "barcodes": [ { "message": card.card_number.replace("-", ""), "format": "PKBarcodeFormatCode128", "messageEncoding": "iso-8859-1", "altText": card.card_number, }, { "message": card.qr_code_data, "format": "PKBarcodeFormatQR", "messageEncoding": "iso-8859-1", }, ], } # Add loyalty-specific fields if program.is_stamps_enabled: pass_data["storeCard"] = { "headerFields": [ { "key": "stamps", "label": "STAMPS", "value": f"{card.stamp_count}/{program.stamps_target}", } ], "primaryFields": [ { "key": "reward", "label": "NEXT REWARD", "value": program.stamps_reward_description, } ], "secondaryFields": [ { "key": "progress", "label": "PROGRESS", "value": f"{card.stamp_count} stamps collected", } ], "backFields": [ { "key": "cardNumber", "label": "Card Number", "value": card.card_number, }, { "key": "totalStamps", "label": "Total Stamps Earned", "value": str(card.total_stamps_earned), }, { "key": "redemptions", "label": "Total Rewards", "value": str(card.stamps_redeemed), }, ], } elif program.is_points_enabled: pass_data["storeCard"] = { "headerFields": [ { "key": "points", "label": "POINTS", "value": str(card.points_balance), } ], "primaryFields": [ { "key": "balance", "label": "BALANCE", "value": f"{card.points_balance} points", } ], "backFields": [ { "key": "cardNumber", "label": "Card Number", "value": card.card_number, }, { "key": "totalPoints", "label": "Total Points Earned", "value": str(card.total_points_earned), }, { "key": "redeemed", "label": "Points Redeemed", "value": str(card.points_redeemed), }, ], } return pass_data def _get_icon_bytes(self, program: LoyaltyProgram, scale: int = 1) -> bytes: """ Generate icon image for Apple Wallet pass. Apple icon dimensions: 29x29 (@1x), 58x58 (@2x). Uses program logo if available, otherwise generates a colored square with the program initial. """ from PIL import Image, ImageDraw, ImageFont size = 29 * scale if program.logo_url: import httpx try: resp = httpx.get(program.logo_url, timeout=10, follow_redirects=True) resp.raise_for_status() img = Image.open(io.BytesIO(resp.content)) img = img.convert("RGBA") img = img.resize((size, size), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="PNG") return buf.getvalue() except (httpx.HTTPError, OSError, ValueError): logger.warning("Failed to fetch logo for icon, using fallback") # Fallback: colored square with initial hex_color = (program.card_color or "#4F46E5").lstrip("#") r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16) img = Image.new("RGBA", (size, size), (r, g, b, 255)) draw = ImageDraw.Draw(img) initial = (program.display_name or "L")[0].upper() try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", size // 2) except OSError: font = ImageFont.load_default() bbox = draw.textbbox((0, 0), initial, font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] draw.text(((size - tw) / 2, (size - th) / 2 - bbox[1]), initial, fill=(255, 255, 255, 255), font=font) buf = io.BytesIO() img.save(buf, format="PNG") return buf.getvalue() def _get_logo_bytes(self, program: LoyaltyProgram, scale: int = 1) -> bytes: """ Generate logo image for Apple Wallet pass. Apple logo dimensions: 160x50 (@1x), 320x100 (@2x). Uses program logo if available, otherwise generates a colored rectangle with the program initial. """ from PIL import Image, ImageDraw, ImageFont width, height = 160 * scale, 50 * scale if program.logo_url: import httpx try: resp = httpx.get(program.logo_url, timeout=10, follow_redirects=True) resp.raise_for_status() img = Image.open(io.BytesIO(resp.content)) img = img.convert("RGBA") # Fit within dimensions preserving aspect ratio img.thumbnail((width, height), Image.LANCZOS) # Center on transparent canvas canvas = Image.new("RGBA", (width, height), (0, 0, 0, 0)) x = (width - img.width) // 2 y = (height - img.height) // 2 canvas.paste(img, (x, y)) buf = io.BytesIO() canvas.save(buf, format="PNG") return buf.getvalue() except (httpx.HTTPError, OSError, ValueError): logger.warning("Failed to fetch logo for pass logo, using fallback") # Fallback: colored rectangle with initial hex_color = (program.card_color or "#4F46E5").lstrip("#") r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16) img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) initial = (program.display_name or "L")[0].upper() try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", height // 2) except OSError: font = ImageFont.load_default() bbox = draw.textbbox((0, 0), initial, font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] # Draw initial centered draw.text(((width - tw) / 2, (height - th) / 2 - bbox[1]), initial, fill=(r, g, b, 255), font=font) buf = io.BytesIO() img.save(buf, format="PNG") return buf.getvalue() def _hex_to_rgb(self, hex_color: str) -> str: """Convert hex color to RGB format for Apple Wallet.""" hex_color = hex_color.lstrip("#") r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) return f"rgb({r}, {g}, {b})" def _get_web_service_url(self) -> str: """Get the base URL for Apple Web Service endpoints.""" # This should be configured based on your deployment # For now, return a placeholder from app.core.config import settings base_url = getattr(settings, "BASE_URL", "https://api.example.com") return f"{base_url}/api/v1/loyalty/apple" def _sign_manifest(self, manifest_data: bytes) -> bytes: """ Sign the manifest using PKCS#7. This requires the Apple WWDR certificate and your pass signing certificate and key. """ try: from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.serialization import pkcs7 # Load certificates with open(config.apple_wwdr_cert_path, "rb") as f: wwdr_cert = x509.load_pem_x509_certificate(f.read()) with open(config.apple_signer_cert_path, "rb") as f: signer_cert = x509.load_pem_x509_certificate(f.read()) with open(config.apple_signer_key_path, "rb") as f: signer_key = serialization.load_pem_private_key(f.read(), password=None) # Create PKCS#7 signature signature = ( pkcs7.PKCS7SignatureBuilder() .set_data(manifest_data) .add_signer(signer_cert, signer_key, hashes.SHA256()) .add_certificate(wwdr_cert) .sign(serialization.Encoding.DER, [pkcs7.PKCS7Options.DetachedSignature]) ) return signature except FileNotFoundError as e: raise WalletIntegrationException("apple", f"Certificate file not found: {e}") except (OSError, ValueError) as e: raise WalletIntegrationException("apple", f"Failed to sign manifest: {e}") # ========================================================================= # Pass URLs # ========================================================================= def get_pass_url(self, card: LoyaltyCard) -> str: """Get the URL to download the .pkpass file.""" from app.core.config import settings base_url = getattr(settings, "BASE_URL", "https://api.example.com") return f"{base_url}/api/v1/loyalty/passes/apple/{card.apple_serial_number}.pkpass" # ========================================================================= # Device Registration (Apple Web Service) # ========================================================================= def register_device( self, db: Session, card: LoyaltyCard, device_library_id: str, push_token: str, ) -> None: """ Register a device for push notifications. Called by Apple when user adds pass to their wallet. """ # Check if already registered existing = ( db.query(AppleDeviceRegistration) .filter( AppleDeviceRegistration.card_id == card.id, AppleDeviceRegistration.device_library_identifier == device_library_id, ) .first() ) if existing: # Update push token existing.push_token = push_token else: # Create new registration registration = AppleDeviceRegistration( card_id=card.id, device_library_identifier=device_library_id, push_token=push_token, ) db.add(registration) db.commit() logger.info(f"Registered device {device_library_id[:8]}... for card {card.id}") def unregister_device( self, db: Session, card: LoyaltyCard, device_library_id: str, ) -> None: """ Unregister a device. Called by Apple when user removes pass from their wallet. """ db.query(AppleDeviceRegistration).filter( AppleDeviceRegistration.card_id == card.id, AppleDeviceRegistration.device_library_identifier == device_library_id, ).delete() db.commit() logger.info(f"Unregistered device {device_library_id[:8]}... for card {card.id}") def send_push_updates(self, db: Session, card: LoyaltyCard) -> None: """ Send push notifications to all registered devices for a card. This tells Apple Wallet to fetch the updated pass. """ registrations = ( db.query(AppleDeviceRegistration) .filter(AppleDeviceRegistration.card_id == card.id) .all() ) if not registrations: return # Send push notification to each device for registration in registrations: try: self._send_push(registration.push_token) except Exception as e: # noqa: EXC003 logger.warning( f"Failed to send push to device {registration.device_library_identifier[:8]}...: {e}" ) def _send_push(self, push_token: str) -> None: """ Send an empty push notification to trigger pass update. Apple Wallet will call our web service to fetch the updated pass. Uses APNs HTTP/2 API with certificate-based authentication. """ if not self.is_configured: logger.debug("Apple Wallet not configured, skipping push") return import ssl import httpx # APNs endpoint (use sandbox for dev, production for prod) from app.core.config import is_production if is_production(): apns_host = "https://api.push.apple.com" else: apns_host = "https://api.sandbox.push.apple.com" url = f"{apns_host}/3/device/{push_token}" # Create SSL context with client certificate ssl_context = ssl.create_default_context() ssl_context.load_cert_chain( certfile=config.apple_signer_cert_path, keyfile=config.apple_signer_key_path, ) # APNs requires empty payload for pass updates headers = { "apns-topic": config.apple_pass_type_id, "apns-push-type": "background", "apns-priority": "5", } try: with httpx.Client( http2=True, verify=ssl_context, timeout=10.0, ) as client: response = client.post( url, headers=headers, content=b"{}", ) if response.status_code == 200: logger.debug( "APNs push sent to token %s...", push_token[:8] ) elif response.status_code == 410: logger.info( "APNs token %s... is no longer valid (device unregistered)", push_token[:8], ) else: logger.warning( "APNs push failed for token %s...: %s %s", push_token[:8], response.status_code, response.text, ) except (httpx.HTTPError, ssl.SSLError, OSError) as exc: logger.error("APNs push error for token %s...: %s", push_token[:8], exc) # Singleton instance apple_wallet_service = AppleWalletService()