Files
orion/app/modules/loyalty/services/apple_wallet_service.py
Samir Boulahtit 7c43d6f4a2 refactor: fix all architecture validator findings (202 → 0)
Eliminate all 103 errors and 96 warnings from the architecture validator:

Phase 1 - Validator rules & YAML:
- Add NAM-001/NAM-002 exceptions for module-scoped router/service files
- Fix API-004 to detect # public comments on decorator lines
- Add module-specific exception bases to EXC-004 valid_bases
- Exclude storefront files from AUTH-004 store context check
- Add SVC-006 exceptions for loyalty service atomic commits
- Fix _get_rule() to search naming_rules and auth_rules categories
- Use plain # CODE comments instead of # noqa: CODE for custom rules

Phase 2 - Billing module (5 route files):
- Move _resolve_store_to_merchant to subscription_service
- Move tier/feature queries to feature_service, admin_subscription_service
- Extract 22 inline Pydantic schemas to billing/schemas/billing.py
- Replace all HTTPException with domain exceptions

Phase 3 - Loyalty module (4 routes + points_service):
- Add 7 domain exceptions (Apple auth, enrollment, device registration)
- Add service methods to card_service, program_service, apple_wallet_service
- Move all db.query() from routes to service layer
- Fix SVC-001: replace HTTPException in points_service with domain exception

Phase 4 - Remaining modules:
- tenancy: move store stats queries to admin_service
- cms: move platform resolution to content_page_service, add NoPlatformSubscriptionException
- messaging: move user/customer lookups to messaging_service
- Add ConfigDict(from_attributes=True) to ContentPageResponse

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 18:49:24 +01:00

549 lines
18 KiB
Python

# app/modules/loyalty/services/apple_wallet_service.py
"""
Apple Wallet service.
Handles Apple Wallet integration including:
- Generating .pkpass files
- Apple Web Service for device registration
- Push notifications for pass updates
"""
import hashlib
import io
import json
import logging
import zipfile
from typing import Any
from sqlalchemy.orm import Session
from app.modules.loyalty.config import config
from app.modules.loyalty.exceptions import (
ApplePassGenerationException,
AppleWalletNotConfiguredException,
DeviceRegistrationException,
InvalidAppleAuthTokenException,
WalletIntegrationException,
)
from app.modules.loyalty.models import (
AppleDeviceRegistration,
LoyaltyCard,
LoyaltyProgram,
)
logger = logging.getLogger(__name__)
class AppleWalletService:
"""Service for Apple Wallet integration."""
@property
def is_configured(self) -> bool:
"""Check if Apple Wallet is configured."""
return bool(
config.apple_pass_type_id
and config.apple_team_id
and config.apple_wwdr_cert_path
and config.apple_signer_cert_path
and config.apple_signer_key_path
)
# =========================================================================
# Auth Verification
# =========================================================================
def verify_auth_token(self, card: LoyaltyCard, authorization: str | None) -> None:
"""
Verify the Apple Wallet authorization token for a card.
Args:
card: Loyalty card
authorization: Authorization header value (e.g. "ApplePass <token>")
Raises:
InvalidAppleAuthTokenException: If token is missing or invalid
"""
auth_token = None
if authorization and authorization.startswith("ApplePass "):
auth_token = authorization.split(" ", 1)[1]
if not auth_token or auth_token != card.apple_auth_token:
raise InvalidAppleAuthTokenException()
def generate_pass_safe(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate an Apple Wallet pass, wrapping LoyaltyException into
ApplePassGenerationException.
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
Raises:
ApplePassGenerationException: If pass generation fails
"""
from app.modules.loyalty.exceptions import LoyaltyException
try:
return self.generate_pass(db, card)
except LoyaltyException as e:
logger.error(f"Failed to generate Apple pass for card {card.id}: {e}")
raise ApplePassGenerationException(card.id)
def get_device_registrations(self, db: Session, device_id: str) -> list:
"""
Get all device registrations for a device library identifier.
Args:
db: Database session
device_id: Device library identifier
Returns:
List of AppleDeviceRegistration objects
"""
return (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.device_library_identifier == device_id)
.all()
)
def get_updated_cards_for_device(
self,
db: Session,
device_id: str,
updated_since: str | None = None,
) -> list[LoyaltyCard] | None:
"""
Get cards registered to a device, optionally filtered by update time.
Args:
db: Database session
device_id: Device library identifier
updated_since: ISO timestamp to filter by
Returns:
List of LoyaltyCard objects, or None if no registrations found
"""
from datetime import datetime
registrations = self.get_device_registrations(db, device_id)
if not registrations:
return None
card_ids = [r.card_id for r in registrations]
query = db.query(LoyaltyCard).filter(LoyaltyCard.id.in_(card_ids))
if updated_since:
try:
since = datetime.fromisoformat(updated_since.replace("Z", "+00:00"))
query = query.filter(LoyaltyCard.updated_at > since)
except ValueError:
pass
cards = query.all()
return cards if cards else None
def register_device_safe(
self,
db: Session,
card: LoyaltyCard,
device_id: str,
push_token: str,
) -> None:
"""
Register a device, wrapping exceptions into DeviceRegistrationException.
Args:
db: Database session
card: Loyalty card
device_id: Device library identifier
push_token: Push token
Raises:
DeviceRegistrationException: If registration fails
"""
try:
self.register_device(db, card, device_id, push_token)
except Exception as e:
logger.error(f"Failed to register device: {e}")
raise DeviceRegistrationException(device_id, "register")
def unregister_device_safe(
self,
db: Session,
card: LoyaltyCard,
device_id: str,
) -> None:
"""
Unregister a device, wrapping exceptions into DeviceRegistrationException.
Args:
db: Database session
card: Loyalty card
device_id: Device library identifier
Raises:
DeviceRegistrationException: If unregistration fails
"""
try:
self.unregister_device(db, card, device_id)
except Exception as e:
logger.error(f"Failed to unregister device: {e}")
raise DeviceRegistrationException(device_id, "unregister")
# =========================================================================
# Pass Generation
# =========================================================================
def generate_pass(self, db: Session, card: LoyaltyCard) -> bytes:
"""
Generate a .pkpass file for a loyalty card.
The .pkpass is a ZIP file containing:
- pass.json: Pass configuration
- icon.png, icon@2x.png: App icon
- logo.png, logo@2x.png: Logo on pass
- manifest.json: SHA-1 hashes of all files
- signature: PKCS#7 signature
Args:
db: Database session
card: Loyalty card
Returns:
Bytes of the .pkpass file
"""
if not self.is_configured:
raise AppleWalletNotConfiguredException()
program = card.program
# Ensure serial number is set
if not card.apple_serial_number:
card.apple_serial_number = f"card_{card.id}_{card.qr_code_data[:8]}"
db.commit()
# Build pass.json
pass_data = self._build_pass_json(card, program)
# Create the pass package
pass_files = {
"pass.json": json.dumps(pass_data).encode("utf-8"),
}
# Add placeholder images (in production, these would be actual images)
# For now, we'll skip images and use the pass.json only
# pass_files["icon.png"] = self._get_icon_bytes(program)
# pass_files["icon@2x.png"] = self._get_icon_bytes(program, scale=2)
# pass_files["logo.png"] = self._get_logo_bytes(program)
# pass_files["logo@2x.png"] = self._get_logo_bytes(program, scale=2)
# Create manifest
manifest = {}
for filename, content in pass_files.items():
manifest[filename] = hashlib.sha1(content).hexdigest()
pass_files["manifest.json"] = json.dumps(manifest).encode("utf-8")
# Sign the manifest
try:
signature = self._sign_manifest(pass_files["manifest.json"])
pass_files["signature"] = signature
except Exception as e:
logger.error(f"Failed to sign pass: {e}")
raise WalletIntegrationException("apple", f"Failed to sign pass: {e}")
# Create ZIP file
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for filename, content in pass_files.items():
zf.writestr(filename, content)
return buffer.getvalue()
def _build_pass_json(self, card: LoyaltyCard, program: LoyaltyProgram) -> dict[str, Any]:
"""Build the pass.json structure for a loyalty card."""
pass_data = {
"formatVersion": 1,
"passTypeIdentifier": config.apple_pass_type_id,
"serialNumber": card.apple_serial_number,
"teamIdentifier": config.apple_team_id,
"organizationName": program.display_name,
"description": f"{program.display_name} Loyalty Card",
"backgroundColor": self._hex_to_rgb(program.card_color),
"foregroundColor": "rgb(255, 255, 255)",
"labelColor": "rgb(255, 255, 255)",
"authenticationToken": card.apple_auth_token,
"webServiceURL": self._get_web_service_url(),
"barcode": {
"message": card.card_number.replace("-", ""),
"format": "PKBarcodeFormatCode128",
"messageEncoding": "iso-8859-1",
"altText": card.card_number,
},
"barcodes": [
{
"message": card.card_number.replace("-", ""),
"format": "PKBarcodeFormatCode128",
"messageEncoding": "iso-8859-1",
"altText": card.card_number,
},
{
"message": card.qr_code_data,
"format": "PKBarcodeFormatQR",
"messageEncoding": "iso-8859-1",
},
],
}
# Add loyalty-specific fields
if program.is_stamps_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "stamps",
"label": "STAMPS",
"value": f"{card.stamp_count}/{program.stamps_target}",
}
],
"primaryFields": [
{
"key": "reward",
"label": "NEXT REWARD",
"value": program.stamps_reward_description,
}
],
"secondaryFields": [
{
"key": "progress",
"label": "PROGRESS",
"value": f"{card.stamp_count} stamps collected",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalStamps",
"label": "Total Stamps Earned",
"value": str(card.total_stamps_earned),
},
{
"key": "redemptions",
"label": "Total Rewards",
"value": str(card.stamps_redeemed),
},
],
}
elif program.is_points_enabled:
pass_data["storeCard"] = {
"headerFields": [
{
"key": "points",
"label": "POINTS",
"value": str(card.points_balance),
}
],
"primaryFields": [
{
"key": "balance",
"label": "BALANCE",
"value": f"{card.points_balance} points",
}
],
"backFields": [
{
"key": "cardNumber",
"label": "Card Number",
"value": card.card_number,
},
{
"key": "totalPoints",
"label": "Total Points Earned",
"value": str(card.total_points_earned),
},
{
"key": "redeemed",
"label": "Points Redeemed",
"value": str(card.points_redeemed),
},
],
}
return pass_data
def _hex_to_rgb(self, hex_color: str) -> str:
"""Convert hex color to RGB format for Apple Wallet."""
hex_color = hex_color.lstrip("#")
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
return f"rgb({r}, {g}, {b})"
def _get_web_service_url(self) -> str:
"""Get the base URL for Apple Web Service endpoints."""
# This should be configured based on your deployment
# For now, return a placeholder
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/apple"
def _sign_manifest(self, manifest_data: bytes) -> bytes:
"""
Sign the manifest using PKCS#7.
This requires the Apple WWDR certificate and your
pass signing certificate and key.
"""
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7
# Load certificates
with open(config.apple_wwdr_cert_path, "rb") as f:
wwdr_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_cert_path, "rb") as f:
signer_cert = x509.load_pem_x509_certificate(f.read())
with open(config.apple_signer_key_path, "rb") as f:
signer_key = serialization.load_pem_private_key(f.read(), password=None)
# Create PKCS#7 signature
signature = (
pkcs7.PKCS7SignatureBuilder()
.set_data(manifest_data)
.add_signer(signer_cert, signer_key, hashes.SHA256())
.add_certificate(wwdr_cert)
.sign(serialization.Encoding.DER, [pkcs7.PKCS7Options.DetachedSignature])
)
return signature
except FileNotFoundError as e:
raise WalletIntegrationException("apple", f"Certificate file not found: {e}")
except Exception as e:
raise WalletIntegrationException("apple", f"Failed to sign manifest: {e}")
# =========================================================================
# Pass URLs
# =========================================================================
def get_pass_url(self, card: LoyaltyCard) -> str:
"""Get the URL to download the .pkpass file."""
from app.core.config import settings
base_url = getattr(settings, "BASE_URL", "https://api.example.com")
return f"{base_url}/api/v1/loyalty/passes/apple/{card.apple_serial_number}.pkpass"
# =========================================================================
# Device Registration (Apple Web Service)
# =========================================================================
def register_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
push_token: str,
) -> None:
"""
Register a device for push notifications.
Called by Apple when user adds pass to their wallet.
"""
# Check if already registered
existing = (
db.query(AppleDeviceRegistration)
.filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
)
.first()
)
if existing:
# Update push token
existing.push_token = push_token
else:
# Create new registration
registration = AppleDeviceRegistration(
card_id=card.id,
device_library_identifier=device_library_id,
push_token=push_token,
)
db.add(registration)
db.commit()
logger.info(f"Registered device {device_library_id[:8]}... for card {card.id}")
def unregister_device(
self,
db: Session,
card: LoyaltyCard,
device_library_id: str,
) -> None:
"""
Unregister a device.
Called by Apple when user removes pass from their wallet.
"""
db.query(AppleDeviceRegistration).filter(
AppleDeviceRegistration.card_id == card.id,
AppleDeviceRegistration.device_library_identifier == device_library_id,
).delete()
db.commit()
logger.info(f"Unregistered device {device_library_id[:8]}... for card {card.id}")
def send_push_updates(self, db: Session, card: LoyaltyCard) -> None:
"""
Send push notifications to all registered devices for a card.
This tells Apple Wallet to fetch the updated pass.
"""
registrations = (
db.query(AppleDeviceRegistration)
.filter(AppleDeviceRegistration.card_id == card.id)
.all()
)
if not registrations:
return
# Send push notification to each device
for registration in registrations:
try:
self._send_push(registration.push_token)
except Exception as e:
logger.warning(
f"Failed to send push to device {registration.device_library_identifier[:8]}...: {e}"
)
def _send_push(self, push_token: str) -> None:
"""
Send an empty push notification to trigger pass update.
Apple Wallet will then call our web service to fetch the updated pass.
"""
# This would use APNs to send the push notification
# For now, we'll log and skip the actual push
logger.debug(f"Would send push to token {push_token[:8]}...")
# In production, you would use something like:
# from apns2.client import APNsClient
# from apns2.payload import Payload
# client = APNsClient(config.apple_signer_cert_path, use_sandbox=True)
# payload = Payload()
# client.send_notification(push_token, payload, "pass.com.example.loyalty")
# Singleton instance
apple_wallet_service = AppleWalletService()