diff --git a/app/modules/loyalty/tasks/point_expiration.py b/app/modules/loyalty/tasks/point_expiration.py index e95c5f5a..83951615 100644 --- a/app/modules/loyalty/tasks/point_expiration.py +++ b/app/modules/loyalty/tasks/point_expiration.py @@ -6,12 +6,16 @@ Handles expiring points that are older than the configured expiration period based on card inactivity. Runs daily at 02:00 via the scheduled task configuration in definition.py. + +Processing is chunked (LIMIT 500 + FOR UPDATE SKIP LOCKED) to avoid +holding long-running row locks on the loyalty_cards table. """ import logging from datetime import UTC, datetime, timedelta from celery import shared_task +from sqlalchemy import or_ from sqlalchemy.orm import Session from app.core.database import SessionLocal @@ -20,6 +24,8 @@ from app.modules.loyalty.models.loyalty_transaction import TransactionType logger = logging.getLogger(__name__) +CHUNK_SIZE = 500 + @shared_task(name="loyalty.expire_points") def expire_points() -> dict: @@ -27,10 +33,9 @@ def expire_points() -> dict: Expire points that are past their expiration date based on card inactivity. For each program with points_expiration_days configured: - 1. Find cards that haven't had activity in the expiration period - 2. Expire all points on those cards - 3. Create POINTS_EXPIRED transaction records - 4. Update card balances + 1. Send 14-day warning emails to cards approaching expiry + 2. Expire points in chunks of 500, committing after each chunk + 3. Send expired notifications Returns: Summary of expired points @@ -40,10 +45,10 @@ def expire_points() -> dict: db: Session = SessionLocal() try: result = _process_point_expiration(db) - db.commit() logger.info( f"Point expiration complete: {result['cards_processed']} cards, " - f"{result['points_expired']} points expired" + f"{result['points_expired']} points expired, " + f"{result['warnings_sent']} warnings sent" ) return result except Exception as e: @@ -54,30 +59,23 @@ def expire_points() -> dict: "error": str(e), "cards_processed": 0, "points_expired": 0, + "warnings_sent": 0, } finally: db.close() def _process_point_expiration(db: Session) -> dict: - """ - Process point expiration for all programs. - - Args: - db: Database session - - Returns: - Summary of expired points - """ - total_cards_processed = 0 - total_points_expired = 0 + """Process point expiration for all programs.""" + total_cards = 0 + total_points = 0 + total_warnings = 0 programs_processed = 0 - # Find all active programs with point expiration configured programs = ( db.query(LoyaltyProgram) .filter( - LoyaltyProgram.is_active == True, + LoyaltyProgram.is_active == True, # noqa: E712 LoyaltyProgram.points_expiration_days.isnot(None), LoyaltyProgram.points_expiration_days > 0, ) @@ -87,207 +85,238 @@ def _process_point_expiration(db: Session) -> dict: logger.info(f"Found {len(programs)} programs with point expiration configured") for program in programs: - cards_count, points_count = _expire_points_for_program(db, program) - total_cards_processed += cards_count - total_points_expired += points_count + cards, points, warnings = _process_program(db, program) + total_cards += cards + total_points += points + total_warnings += warnings programs_processed += 1 - logger.debug( - f"Program {program.id} (merchant {program.merchant_id}): " - f"{cards_count} cards, {points_count} points expired" - ) - return { "status": "success", "programs_processed": programs_processed, - "cards_processed": total_cards_processed, - "points_expired": total_points_expired, + "cards_processed": total_cards, + "points_expired": total_points, + "warnings_sent": total_warnings, } -def _expire_points_for_program(db: Session, program: LoyaltyProgram) -> tuple[int, int]: - """ - Expire points for a specific loyalty program. +def _process_program( + db: Session, program: LoyaltyProgram +) -> tuple[int, int, int]: + """Process warnings + expiration for a single program. - Also sends warning emails to cards approaching their expiration date - (14 days before) and expired notifications after points are zeroed. - - Args: - db: Database session - program: Loyalty program to process - - Returns: - Tuple of (cards_processed, points_expired) + Returns (cards_expired, points_expired, warnings_sent). """ if not program.points_expiration_days: - return 0, 0 + return 0, 0, 0 now = datetime.now(UTC) - - # Calculate expiration threshold expiration_threshold = now - timedelta(days=program.points_expiration_days) - logger.debug( - f"Processing program {program.id}: expiration after {program.points_expiration_days} days " - f"(threshold: {expiration_threshold})" - ) - - # --- Phase 1: Send 14-day warning emails --- + # --- Phase 1: Send 14-day warning emails (chunked) --- warning_days = 14 warning_threshold = now - timedelta( days=program.points_expiration_days - warning_days ) - _send_expiration_warnings( - db, program, warning_threshold, warning_days, now + warnings_sent = _send_expiration_warnings_chunked( + db, program, warning_threshold, expiration_threshold, warning_days, now ) - # --- Phase 2: Expire points --- - # Find cards with: - # - Points balance > 0 - # - Last activity before expiration threshold - # - Belonging to this program's merchant - cards_to_expire = ( - db.query(LoyaltyCard) - .filter( - LoyaltyCard.merchant_id == program.merchant_id, - LoyaltyCard.points_balance > 0, - LoyaltyCard.last_activity_at < expiration_threshold, - LoyaltyCard.is_active == True, - ) - .all() + # --- Phase 2: Expire points (chunked) --- + cards_expired, points_expired = _expire_points_chunked( + db, program, expiration_threshold, now ) - if not cards_to_expire: - logger.debug(f"No cards to expire for program {program.id}") - return 0, 0 + return cards_expired, points_expired, warnings_sent - logger.info(f"Found {len(cards_to_expire)} cards to expire for program {program.id}") - cards_processed = 0 - points_expired = 0 +# ========================================================================= +# Chunked expiration +# ========================================================================= - for card in cards_to_expire: - if card.points_balance <= 0: - continue - expired_points = card.points_balance +def _expire_points_chunked( + db: Session, + program: LoyaltyProgram, + expiration_threshold: datetime, + now: datetime, +) -> tuple[int, int]: + """Expire points in chunks to avoid long-held row locks. - # Create expiration transaction - transaction = LoyaltyTransaction( - card_id=card.id, - merchant_id=program.merchant_id, - store_id=None, # System action, no store - transaction_type=TransactionType.POINTS_EXPIRED.value, - points_delta=-expired_points, - points_balance_after=0, - stamps_delta=0, - stamps_balance_after=card.stamp_count, - notes=f"Points expired after {program.points_expiration_days} days of inactivity", - transaction_at=now, - ) - db.add(transaction) # noqa: PERF006 + Each chunk: + 1. SELECT ... LIMIT 500 FOR UPDATE SKIP LOCKED + 2. Create POINTS_EXPIRED transactions + 3. Update card balances + 4. Commit (releases locks for this chunk) - # Update card balance and voided tracking - card.expire_points(expired_points) - # Note: We don't update last_activity_at for expiration + Returns (total_cards, total_points). + """ + total_cards = 0 + total_points = 0 - # Send expired notification - try: - from app.modules.loyalty.services.notification_service import ( - notification_service, + while True: + # Fetch next chunk with row-level locks; SKIP LOCKED means + # concurrent workers won't block on the same rows. + card_ids_and_balances = ( + db.query(LoyaltyCard.id, LoyaltyCard.points_balance, LoyaltyCard.stamp_count) + .filter( + LoyaltyCard.merchant_id == program.merchant_id, + LoyaltyCard.points_balance > 0, + LoyaltyCard.last_activity_at < expiration_threshold, + LoyaltyCard.is_active == True, # noqa: E712 ) - - notification_service.send_points_expired(db, card, expired_points) - except Exception: - logger.warning( - f"Failed to queue expiration notification for card {card.id}", - exc_info=True, - ) - - cards_processed += 1 - points_expired += expired_points - - logger.debug( - f"Expired {expired_points} points from card {card.id} " - f"(last activity: {card.last_activity_at})" + .limit(CHUNK_SIZE) + .with_for_update(skip_locked=True) + .all() ) - return cards_processed, points_expired + if not card_ids_and_balances: + break + + chunk_cards = 0 + chunk_points = 0 + + for card_id, balance, stamp_count in card_ids_and_balances: + if balance <= 0: + continue + + # Create expiration transaction + db.add( + LoyaltyTransaction( + card_id=card_id, + merchant_id=program.merchant_id, + store_id=None, + transaction_type=TransactionType.POINTS_EXPIRED.value, + points_delta=-balance, + points_balance_after=0, + stamps_delta=0, + stamps_balance_after=stamp_count, + notes=( + f"Points expired after {program.points_expiration_days} " + f"days of inactivity" + ), + transaction_at=now, + ) + ) + + # Bulk-update the card in the same transaction + db.query(LoyaltyCard).filter(LoyaltyCard.id == card_id).update( + { + LoyaltyCard.points_balance: 0, + LoyaltyCard.total_points_voided: ( + LoyaltyCard.total_points_voided + balance + ), + }, + synchronize_session=False, + ) + + chunk_cards += 1 + chunk_points += balance + + # Commit this chunk — releases row locks + db.commit() + + # Send notifications AFTER commit (outside the lock window) + for card_id, balance, _stamp_count in card_ids_and_balances: + if balance <= 0: + continue + try: + card = db.query(LoyaltyCard).get(card_id) + if card: + from app.modules.loyalty.services.notification_service import ( + notification_service, + ) + + notification_service.send_points_expired(db, card, balance) + except Exception: + logger.warning( + f"Failed to queue expiration notification for card {card_id}", + exc_info=True, + ) + + total_cards += chunk_cards + total_points += chunk_points + + logger.info( + f"Program {program.id}: expired chunk of {chunk_cards} cards " + f"({chunk_points} pts), total so far: {total_cards} cards" + ) + + return total_cards, total_points -def _send_expiration_warnings( +# ========================================================================= +# Chunked expiration warnings +# ========================================================================= + + +def _send_expiration_warnings_chunked( db: Session, program: LoyaltyProgram, warning_threshold: datetime, + expiration_threshold: datetime, warning_days: int, now: datetime, ) -> int: - """Send expiration warning emails to cards approaching expiry. + """Send expiration warning emails in chunks. Only sends one warning per expiration cycle (tracked via last_expiration_warning_at on the card). - - Returns: - Number of warnings sent """ - from sqlalchemy import or_ + total_warnings = 0 + expiration_date = (now + timedelta(days=warning_days)).strftime("%Y-%m-%d") - # Find cards in the warning window: - # - Have points - # - Last activity is past the warning threshold (i.e. will expire in ~14 days) - # - But NOT yet past the full expiration threshold - # - Haven't received a warning yet in this cycle - expiration_threshold = now - timedelta(days=program.points_expiration_days) - - cards = ( - db.query(LoyaltyCard) - .filter( - LoyaltyCard.merchant_id == program.merchant_id, - LoyaltyCard.points_balance > 0, - LoyaltyCard.is_active == True, - LoyaltyCard.last_activity_at < warning_threshold, - LoyaltyCard.last_activity_at >= expiration_threshold, - or_( - LoyaltyCard.last_expiration_warning_at.is_(None), - LoyaltyCard.last_expiration_warning_at < warning_threshold, - ), + while True: + cards = ( + db.query(LoyaltyCard) + .filter( + LoyaltyCard.merchant_id == program.merchant_id, + LoyaltyCard.points_balance > 0, + LoyaltyCard.is_active == True, # noqa: E712 + LoyaltyCard.last_activity_at < warning_threshold, + LoyaltyCard.last_activity_at >= expiration_threshold, + or_( + LoyaltyCard.last_expiration_warning_at.is_(None), + LoyaltyCard.last_expiration_warning_at < warning_threshold, + ), + ) + .limit(CHUNK_SIZE) + .all() ) - .all() - ) - if not cards: - return 0 + if not cards: + break - warnings_sent = 0 - expiration_date = ( - now + timedelta(days=warning_days) - ).strftime("%Y-%m-%d") + chunk_warnings = 0 + for card in cards: + try: + from app.modules.loyalty.services.notification_service import ( + notification_service, + ) - for card in cards: - try: - from app.modules.loyalty.services.notification_service import ( - notification_service, - ) + notification_service.send_points_expiration_warning( + db, + card, + expiring_points=card.points_balance, + days_remaining=warning_days, + expiration_date=expiration_date, + ) + card.last_expiration_warning_at = now + chunk_warnings += 1 + except Exception: + logger.warning( + f"Failed to queue expiration warning for card {card.id}", + exc_info=True, + ) - notification_service.send_points_expiration_warning( - db, - card, - expiring_points=card.points_balance, - days_remaining=warning_days, - expiration_date=expiration_date, - ) - card.last_expiration_warning_at = now - warnings_sent += 1 - except Exception: - logger.warning( - f"Failed to queue expiration warning for card {card.id}", - exc_info=True, - ) + db.commit() + total_warnings += chunk_warnings - logger.info( - f"Sent {warnings_sent} expiration warnings for program {program.id}" - ) - return warnings_sent + if total_warnings: + logger.info( + f"Sent {total_warnings} expiration warnings for program {program.id}" + ) + return total_warnings # Allow running directly for testing diff --git a/app/modules/loyalty/tasks/wallet_sync.py b/app/modules/loyalty/tasks/wallet_sync.py index 01689207..9c303c36 100644 --- a/app/modules/loyalty/tasks/wallet_sync.py +++ b/app/modules/loyalty/tasks/wallet_sync.py @@ -4,14 +4,22 @@ Wallet synchronization task. Handles syncing loyalty card data to Google Wallet and Apple Wallet for cards that may have missed real-time updates. + +Uses exponential backoff (1s, 4s, 16s) per card to handle transient +API failures without blocking the entire batch. """ import logging +import time from celery import shared_task logger = logging.getLogger(__name__) +# Exponential backoff delays in seconds: 1s, 4s, 16s +_RETRY_DELAYS = [1, 4, 16] +_MAX_ATTEMPTS = len(_RETRY_DELAYS) + 1 # 4 total attempts + @shared_task(name="loyalty.sync_wallet_passes") def sync_wallet_passes() -> dict: @@ -35,7 +43,6 @@ def sync_wallet_passes() -> dict: # Find cards with transactions in the last hour that have wallet IDs one_hour_ago = datetime.now(UTC) - timedelta(hours=1) - # Get card IDs with recent transactions recent_tx_card_ids = ( db.query(LoyaltyTransaction.card_id) .filter(LoyaltyTransaction.transaction_at >= one_hour_ago) @@ -51,9 +58,9 @@ def sync_wallet_passes() -> dict: "cards_checked": 0, "google_synced": 0, "apple_synced": 0, + "failed_card_ids": [], } - # Get cards with wallet integrations cards = ( db.query(LoyaltyCard) .filter( @@ -69,31 +76,21 @@ def sync_wallet_passes() -> dict: failed_card_ids = [] for card in cards: - synced = False - for attempt in range(2): # 1 retry - try: - results = wallet_service.sync_card_to_wallets(db, card) - if results.get("google_wallet"): - google_synced += 1 - if results.get("apple_wallet"): - apple_synced += 1 - synced = True - break - except Exception as e: - if attempt == 0: - logger.warning( - f"Failed to sync card {card.id} (attempt 1/2), " - f"retrying in 2s: {e}" - ) - import time - time.sleep(2) - else: - logger.error( - f"Failed to sync card {card.id} after 2 attempts: {e}" - ) - if not synced: + success, google, apple = _sync_card_with_backoff( + wallet_service, db, card + ) + if success: + google_synced += google + apple_synced += apple + else: failed_card_ids.append(card.id) + if failed_card_ids: + logger.error( + f"Wallet sync: {len(failed_card_ids)} cards failed after " + f"{_MAX_ATTEMPTS} attempts each: {failed_card_ids}" + ) + logger.info( f"Wallet sync complete: {len(cards)} cards checked, " f"{google_synced} Google, {apple_synced} Apple, " @@ -113,6 +110,37 @@ def sync_wallet_passes() -> dict: return { "status": "error", "error": str(e), + "failed_card_ids": [], } finally: db.close() + + +def _sync_card_with_backoff(wallet_service, db, card) -> tuple[bool, int, int]: + """Sync a single card with exponential backoff. + + Returns (success, google_count, apple_count). + """ + last_error = None + + for attempt in range(_MAX_ATTEMPTS): + try: + results = wallet_service.sync_card_to_wallets(db, card) + google = 1 if results.get("google_wallet") else 0 + apple = 1 if results.get("apple_wallet") else 0 + return True, google, apple + except Exception as e: + last_error = e + if attempt < len(_RETRY_DELAYS): + delay = _RETRY_DELAYS[attempt] + logger.warning( + f"Card {card.id} sync failed (attempt {attempt + 1}/" + f"{_MAX_ATTEMPTS}), retrying in {delay}s: {e}" + ) + time.sleep(delay) + + logger.error( + f"Card {card.id} sync failed after {_MAX_ATTEMPTS} attempts: " + f"{last_error}" + ) + return False, 0, 0