# app/modules/loyalty/services/google_wallet_service.py """ Google Wallet service. Handles Google Wallet integration including: - Creating LoyaltyClass for programs - Creating LoyaltyObject for cards - Updating objects on balance changes - Generating "Add to Wallet" URLs - Startup config validation - Retry logic for transient API failures """ import json import logging import time from datetime import UTC, datetime, timedelta from typing import Any import requests from sqlalchemy.orm import Session from app.modules.loyalty.config import config from app.modules.loyalty.exceptions import ( GoogleWalletNotConfiguredException, WalletIntegrationException, ) from app.modules.loyalty.models import LoyaltyCard, LoyaltyProgram logger = logging.getLogger(__name__) # Retry configuration MAX_RETRIES = 3 RETRY_BACKOFF_BASE = 1 # seconds def _retry_on_failure(func): """Decorator that retries Google Wallet API calls on transient failures. Only retries on 5xx/network errors. 4xx errors (bad request, not found) are not retryable and fail immediately. """ def wrapper(*args, **kwargs): last_exception = None for attempt in range(MAX_RETRIES): try: return func(*args, **kwargs) except WalletIntegrationException as exc: last_exception = exc # Don't retry client errors (400, 401, 403, 404, 409) exc_msg = str(exc) if any(f":{code}" in exc_msg or f" {code} " in exc_msg for code in ("400", "401", "403", "404", "409")): logger.error("Google Wallet API client error (not retryable): %s", exc) break if attempt < MAX_RETRIES - 1: wait = RETRY_BACKOFF_BASE * (2**attempt) logger.warning( "Google Wallet API failed (attempt %d/%d), retrying in %ds: %s", attempt + 1, MAX_RETRIES, wait, exc, ) # Safe: all loyalty routes are sync def, so FastAPI runs them # in a threadpool. time.sleep() only blocks the worker thread, # not the async event loop. time.sleep(wait) else: logger.error( "Google Wallet API failed after %d attempts: %s", MAX_RETRIES, exc, ) raise last_exception # type: ignore[misc] return wrapper class GoogleWalletService: """Service for Google Wallet integration.""" def __init__(self): """Initialize the Google Wallet service.""" self._credentials = None self._http_client = None self._signer = None self._private_key: str | None = None @property def is_configured(self) -> bool: """Check if Google Wallet is configured.""" return bool( config.google_issuer_id and config.google_service_account_json ) def validate_config(self) -> dict[str, Any]: """ Validate Google Wallet configuration at startup. Returns: Dict with validation results including any errors found. """ import os result: dict[str, Any] = { "configured": self.is_configured, "issuer_id": config.google_issuer_id, "service_account_path": config.google_service_account_json, "credentials_valid": False, "errors": [], } if not self.is_configured: return result sa_path = config.google_service_account_json if not os.path.isfile(sa_path): result["errors"].append(f"Service account file not found: {sa_path}") return result try: with open(sa_path) as f: sa_data = json.load(f) required_fields = ["type", "project_id", "private_key", "client_email"] for field in required_fields: if field not in sa_data: result["errors"].append( f"Missing field in service account JSON: {field}" ) if sa_data.get("type") != "service_account": result["errors"].append( f"Invalid credential type: {sa_data.get('type')} " f"(expected 'service_account')" ) if not result["errors"]: self._get_credentials() result["credentials_valid"] = True result["service_account_email"] = sa_data.get("client_email") result["project_id"] = sa_data.get("project_id") except json.JSONDecodeError as exc: result["errors"].append(f"Invalid JSON in service account file: {exc}") except (OSError, ValueError) as exc: result["errors"].append(f"Failed to load credentials: {exc}") # Check logo URL reachability (warning only, not a blocking error) logo_url = config.default_logo_url if logo_url: result["warnings"] = result.get("warnings", []) try: resp = requests.head(logo_url, timeout=5, allow_redirects=True) if resp.status_code >= 400: result["warnings"].append( f"Default logo URL returned HTTP {resp.status_code}: {logo_url}" ) except requests.RequestException as exc: result["warnings"].append( f"Default logo URL unreachable: {logo_url} ({exc})" ) return result def _get_credentials(self): """Get Google service account credentials.""" if self._credentials: return self._credentials if not config.google_service_account_json: raise GoogleWalletNotConfiguredException() try: from google.oauth2 import service_account scopes = ["https://www.googleapis.com/auth/wallet_object.issuer"] self._credentials = ( service_account.Credentials.from_service_account_file( config.google_service_account_json, scopes=scopes, ) ) return self._credentials except (ValueError, OSError) as exc: logger.error("Failed to load Google credentials: %s", exc) raise WalletIntegrationException("google", str(exc)) def _get_signer(self): """Get RSA signer from service account for JWT signing.""" if self._signer: return self._signer try: from google.auth.crypt import RSASigner self._signer = RSASigner.from_service_account_file( config.google_service_account_json, ) return self._signer except (ValueError, OSError, KeyError) as exc: logger.error("Failed to create RSA signer: %s", exc) raise WalletIntegrationException("google", str(exc)) from exc def _get_private_key(self) -> str: """Get the private key from the service account JSON, with caching.""" if self._private_key: return self._private_key with open(config.google_service_account_json) as f: sa_data = json.load(f) self._private_key = sa_data["private_key"] return self._private_key def _get_http_client(self): """Get authenticated HTTP client.""" if self._http_client: return self._http_client try: from google.auth.transport.requests import AuthorizedSession credentials = self._get_credentials() self._http_client = AuthorizedSession(credentials) return self._http_client except (ValueError, TypeError, AttributeError) as exc: logger.error("Failed to create Google HTTP client: %s", exc) raise WalletIntegrationException("google", str(exc)) from exc # ========================================================================= # LoyaltyClass Operations (Program-level) # ========================================================================= @_retry_on_failure def create_class(self, db: Session, program: LoyaltyProgram) -> str: """ Create a LoyaltyClass for a loyalty program. Args: db: Database session program: Loyalty program Returns: Google Wallet class ID """ if not self.is_configured: raise GoogleWalletNotConfiguredException() issuer_id = config.google_issuer_id class_id = f"{issuer_id}.loyalty_program_{program.id}" # issuerName is required by Google Wallet API issuer_name = program.merchant.name if program.merchant else program.display_name class_data = { "id": class_id, "issuerId": issuer_id, "issuerName": issuer_name, "reviewStatus": "DRAFT", "programName": program.display_name, "hexBackgroundColor": program.card_color or "#4285F4", "localizedProgramName": { "defaultValue": { "language": "en", "value": program.display_name, }, }, } # programLogo is required by Google Wallet API # Google must be able to fetch the image, so it needs a public URL logo_url = program.logo_url if not logo_url: logo_url = config.default_logo_url class_data["programLogo"] = { "sourceUri": {"uri": logo_url}, } # Add hero image if configured if program.hero_image_url: class_data["heroImage"] = { "sourceUri": {"uri": program.hero_image_url}, } try: http = self._get_http_client() response = http.post( "https://walletobjects.googleapis.com/walletobjects/v1/loyaltyClass", json=class_data, ) if response.status_code in (200, 201): program.google_class_id = class_id db.commit() logger.info( "Created Google Wallet class %s for program %s", class_id, program.id, ) return class_id if response.status_code == 409: program.google_class_id = class_id db.commit() return class_id error = response.json() if response.text else {} raise WalletIntegrationException( "google", f"Failed to create class: {response.status_code} - {error}", ) except WalletIntegrationException: raise except (requests.RequestException, ValueError, AttributeError) as exc: logger.error("Failed to create Google Wallet class: %s", exc) raise WalletIntegrationException("google", str(exc)) from exc @_retry_on_failure def update_class(self, db: Session, program: LoyaltyProgram) -> bool: """Update a LoyaltyClass when program settings change. Returns: True if update succeeded, False otherwise. """ if not program.google_class_id: return False class_data = { "programName": program.display_name, "hexBackgroundColor": program.card_color, } if program.logo_url: class_data["programLogo"] = { "sourceUri": {"uri": program.logo_url}, } try: http = self._get_http_client() response = http.patch( "https://walletobjects.googleapis.com/walletobjects/v1/loyaltyClass/" f"{program.google_class_id}", json=class_data, ) if response.status_code in (200, 201): return True logger.error( "Failed to update Google Wallet class %s: %s", program.google_class_id, response.status_code, ) return False except (requests.RequestException, ValueError, AttributeError) as exc: logger.error("Failed to update Google Wallet class: %s", exc) return False # ========================================================================= # LoyaltyObject Operations (Card-level) # ========================================================================= @_retry_on_failure def create_object(self, db: Session, card: LoyaltyCard) -> str: """ Create a LoyaltyObject for a loyalty card. Args: db: Database session card: Loyalty card Returns: Google Wallet object ID """ if not self.is_configured: raise GoogleWalletNotConfiguredException() program = card.program if not program.google_class_id: self.create_class(db, program) issuer_id = config.google_issuer_id object_id = f"{issuer_id}.loyalty_card_{card.id}" object_data = self._build_object_data(card, object_id) try: http = self._get_http_client() response = http.post( "https://walletobjects.googleapis.com/walletobjects/v1/loyaltyObject", json=object_data, ) if response.status_code in (200, 201): card.google_object_id = object_id db.commit() logger.info( "Created Google Wallet object %s for card %s", object_id, card.id, ) return object_id if response.status_code == 409: card.google_object_id = object_id db.commit() return object_id error = response.json() if response.text else {} raise WalletIntegrationException( "google", f"Failed to create object: {response.status_code} - {error}", ) except WalletIntegrationException: raise except (requests.RequestException, ValueError, AttributeError) as exc: logger.error("Failed to create Google Wallet object: %s", exc) raise WalletIntegrationException("google", str(exc)) from exc @_retry_on_failure def update_object(self, db: Session, card: LoyaltyCard) -> None: """Update a LoyaltyObject when card balance changes.""" if not card.google_object_id: return object_data = self._build_object_data(card, card.google_object_id) try: http = self._get_http_client() response = http.patch( "https://walletobjects.googleapis.com/walletobjects/v1/loyaltyObject/" f"{card.google_object_id}", json=object_data, ) if response.status_code in (200, 201): logger.debug( "Updated Google Wallet object for card %s", card.id ) else: logger.warning( "Failed to update Google Wallet object %s: %s", card.google_object_id, response.status_code, ) except (requests.RequestException, ValueError, AttributeError) as exc: logger.error("Failed to update Google Wallet object: %s", exc) def _build_object_data( self, card: LoyaltyCard, object_id: str ) -> dict[str, Any]: """Build the LoyaltyObject data structure.""" program = card.program object_data: dict[str, Any] = { "id": object_id, "classId": program.google_class_id, "state": "ACTIVE" if card.is_active else "INACTIVE", "accountId": card.card_number, "accountName": card.card_number, "barcode": { "type": "CODE_128", "value": card.card_number.replace("-", ""), "alternateText": card.card_number, }, } if program.is_stamps_enabled: object_data["loyaltyPoints"] = { "label": "Stamps", "balance": { "int": card.stamp_count, }, } object_data["secondaryLoyaltyPoints"] = { "label": f"of {program.stamps_target}", "balance": { "int": program.stamps_target, }, } elif program.is_points_enabled: object_data["loyaltyPoints"] = { "label": "Points", "balance": { "int": card.points_balance, }, } return object_data # ========================================================================= # Save URL Generation # ========================================================================= def get_save_url(self, db: Session, card: LoyaltyCard) -> str: """ Get the "Add to Google Wallet" URL for a card. Uses google.auth.crypt.RSASigner (public API) for JWT signing instead of accessing private signer internals. Args: db: Database session card: Loyalty card Returns: URL for adding pass to Google Wallet """ if not self.is_configured: raise GoogleWalletNotConfiguredException() # Try to create the object via API. If it fails (e.g. demo mode # where DRAFT classes reject object creation), fall back to # embedding the full object data in the JWT ("fat JWT"). if not card.google_object_id: try: self.create_object(db, card) except WalletIntegrationException: logger.info( "Object creation failed for card %s, using fat JWT", card.id, ) try: import jwt credentials = self._get_credentials() now = datetime.now(tz=UTC) origins = config.google_wallet_origins or [] issuer_id = config.google_issuer_id object_id = card.google_object_id or f"{issuer_id}.loyalty_card_{card.id}" if card.google_object_id: # Object exists in Google — reference by ID only payload = { "loyaltyObjects": [{"id": card.google_object_id}], } else: # Object not created — embed full object data in JWT object_data = self._build_object_data(card, object_id) payload = { "loyaltyObjects": [object_data], } claims = { "iss": credentials.service_account_email, "aud": "google", "origins": origins, "typ": "savetowallet", "payload": payload, "iat": now, "exp": now + timedelta(hours=1), } # PyJWT needs the PEM string; cached after first load private_key = self._get_private_key() token = jwt.encode( claims, private_key, algorithm="RS256", ) card.google_object_jwt = token db.commit() return f"https://pay.google.com/gp/v/save/{token}" except (AttributeError, ValueError, KeyError, OSError) as exc: logger.error( "Failed to generate Google Wallet save URL: %s", exc ) raise WalletIntegrationException("google", str(exc)) from exc # ========================================================================= # Class Approval # ========================================================================= def get_class_status(self, class_id: str) -> dict[str, Any] | None: """ Check the review status of a LoyaltyClass. Args: class_id: Google Wallet class ID Returns: Dict with class status info or None if not found. """ if not self.is_configured: return None try: http = self._get_http_client() response = http.get( "https://walletobjects.googleapis.com/walletobjects/v1/loyaltyClass/" f"{class_id}", ) if response.status_code == 200: data = response.json() return { "class_id": class_id, "review_status": data.get("reviewStatus"), "program_name": data.get("programName"), } return None except (requests.RequestException, ValueError, AttributeError) as exc: logger.error( "Failed to get Google Wallet class status: %s", exc ) return None # Singleton instance google_wallet_service = GoogleWalletService()