Files
orion/app/modules/loyalty/tasks/point_expiration.py
Samir Boulahtit fde58bea06
Some checks failed
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / validate (push) Has been cancelled
CI / ruff (push) Successful in 12s
CI / dependency-scanning (push) Has been cancelled
CI / pytest (push) Has started running
perf(loyalty): Phase 3 — batched expiration + wallet sync backoff
Phase 3 of the production launch plan: task reliability improvements
to prevent DB lock issues at scale and handle transient wallet API
failures gracefully.

- 3.1 Batched point expiration: rewrite per-card Python loop to chunked
  processing (LIMIT 500 FOR UPDATE SKIP LOCKED). Each chunk commits
  independently, releasing row locks before processing the next batch.
  Notifications sent after commit (outside lock window). Warning emails
  also chunked with same pattern.
- 3.2 Wallet sync exponential backoff: replace time.sleep(2) single
  retry with 4 attempts using [1s, 4s, 16s] backoff delays. Per-card
  try/except ensures one failing card doesn't block the batch.
  Failed card IDs logged for observability.

342 tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 19:55:39 +02:00

330 lines
10 KiB
Python

# app/modules/loyalty/tasks/point_expiration.py
"""
Point expiration task.
Handles expiring points that are older than the configured
expiration period based on card inactivity.
Runs daily at 02:00 via the scheduled task configuration in definition.py.
Processing is chunked (LIMIT 500 + FOR UPDATE SKIP LOCKED) to avoid
holding long-running row locks on the loyalty_cards table.
"""
import logging
from datetime import UTC, datetime, timedelta
from celery import shared_task
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.modules.loyalty.models import LoyaltyCard, LoyaltyProgram, LoyaltyTransaction
from app.modules.loyalty.models.loyalty_transaction import TransactionType
logger = logging.getLogger(__name__)
CHUNK_SIZE = 500
@shared_task(name="loyalty.expire_points")
def expire_points() -> dict:
"""
Expire points that are past their expiration date based on card inactivity.
For each program with points_expiration_days configured:
1. Send 14-day warning emails to cards approaching expiry
2. Expire points in chunks of 500, committing after each chunk
3. Send expired notifications
Returns:
Summary of expired points
"""
logger.info("Starting point expiration task...")
db: Session = SessionLocal()
try:
result = _process_point_expiration(db)
logger.info(
f"Point expiration complete: {result['cards_processed']} cards, "
f"{result['points_expired']} points expired, "
f"{result['warnings_sent']} warnings sent"
)
return result
except Exception as e:
db.rollback()
logger.error(f"Point expiration task failed: {e}", exc_info=True)
return {
"status": "error",
"error": str(e),
"cards_processed": 0,
"points_expired": 0,
"warnings_sent": 0,
}
finally:
db.close()
def _process_point_expiration(db: Session) -> dict:
"""Process point expiration for all programs."""
total_cards = 0
total_points = 0
total_warnings = 0
programs_processed = 0
programs = (
db.query(LoyaltyProgram)
.filter(
LoyaltyProgram.is_active == True, # noqa: E712
LoyaltyProgram.points_expiration_days.isnot(None),
LoyaltyProgram.points_expiration_days > 0,
)
.all()
)
logger.info(f"Found {len(programs)} programs with point expiration configured")
for program in programs:
cards, points, warnings = _process_program(db, program)
total_cards += cards
total_points += points
total_warnings += warnings
programs_processed += 1
return {
"status": "success",
"programs_processed": programs_processed,
"cards_processed": total_cards,
"points_expired": total_points,
"warnings_sent": total_warnings,
}
def _process_program(
db: Session, program: LoyaltyProgram
) -> tuple[int, int, int]:
"""Process warnings + expiration for a single program.
Returns (cards_expired, points_expired, warnings_sent).
"""
if not program.points_expiration_days:
return 0, 0, 0
now = datetime.now(UTC)
expiration_threshold = now - timedelta(days=program.points_expiration_days)
# --- Phase 1: Send 14-day warning emails (chunked) ---
warning_days = 14
warning_threshold = now - timedelta(
days=program.points_expiration_days - warning_days
)
warnings_sent = _send_expiration_warnings_chunked(
db, program, warning_threshold, expiration_threshold, warning_days, now
)
# --- Phase 2: Expire points (chunked) ---
cards_expired, points_expired = _expire_points_chunked(
db, program, expiration_threshold, now
)
return cards_expired, points_expired, warnings_sent
# =========================================================================
# Chunked expiration
# =========================================================================
def _expire_points_chunked(
db: Session,
program: LoyaltyProgram,
expiration_threshold: datetime,
now: datetime,
) -> tuple[int, int]:
"""Expire points in chunks to avoid long-held row locks.
Each chunk:
1. SELECT ... LIMIT 500 FOR UPDATE SKIP LOCKED
2. Create POINTS_EXPIRED transactions
3. Update card balances
4. Commit (releases locks for this chunk)
Returns (total_cards, total_points).
"""
total_cards = 0
total_points = 0
while True:
# Fetch next chunk with row-level locks; SKIP LOCKED means
# concurrent workers won't block on the same rows.
card_ids_and_balances = (
db.query(LoyaltyCard.id, LoyaltyCard.points_balance, LoyaltyCard.stamp_count)
.filter(
LoyaltyCard.merchant_id == program.merchant_id,
LoyaltyCard.points_balance > 0,
LoyaltyCard.last_activity_at < expiration_threshold,
LoyaltyCard.is_active == True, # noqa: E712
)
.limit(CHUNK_SIZE)
.with_for_update(skip_locked=True)
.all()
)
if not card_ids_and_balances:
break
chunk_cards = 0
chunk_points = 0
for card_id, balance, stamp_count in card_ids_and_balances:
if balance <= 0:
continue
# Create expiration transaction
db.add(
LoyaltyTransaction(
card_id=card_id,
merchant_id=program.merchant_id,
store_id=None,
transaction_type=TransactionType.POINTS_EXPIRED.value,
points_delta=-balance,
points_balance_after=0,
stamps_delta=0,
stamps_balance_after=stamp_count,
notes=(
f"Points expired after {program.points_expiration_days} "
f"days of inactivity"
),
transaction_at=now,
)
)
# Bulk-update the card in the same transaction
db.query(LoyaltyCard).filter(LoyaltyCard.id == card_id).update(
{
LoyaltyCard.points_balance: 0,
LoyaltyCard.total_points_voided: (
LoyaltyCard.total_points_voided + balance
),
},
synchronize_session=False,
)
chunk_cards += 1
chunk_points += balance
# Commit this chunk — releases row locks
db.commit()
# Send notifications AFTER commit (outside the lock window)
for card_id, balance, _stamp_count in card_ids_and_balances:
if balance <= 0:
continue
try:
card = db.query(LoyaltyCard).get(card_id)
if card:
from app.modules.loyalty.services.notification_service import (
notification_service,
)
notification_service.send_points_expired(db, card, balance)
except Exception:
logger.warning(
f"Failed to queue expiration notification for card {card_id}",
exc_info=True,
)
total_cards += chunk_cards
total_points += chunk_points
logger.info(
f"Program {program.id}: expired chunk of {chunk_cards} cards "
f"({chunk_points} pts), total so far: {total_cards} cards"
)
return total_cards, total_points
# =========================================================================
# Chunked expiration warnings
# =========================================================================
def _send_expiration_warnings_chunked(
db: Session,
program: LoyaltyProgram,
warning_threshold: datetime,
expiration_threshold: datetime,
warning_days: int,
now: datetime,
) -> int:
"""Send expiration warning emails in chunks.
Only sends one warning per expiration cycle (tracked via
last_expiration_warning_at on the card).
"""
total_warnings = 0
expiration_date = (now + timedelta(days=warning_days)).strftime("%Y-%m-%d")
while True:
cards = (
db.query(LoyaltyCard)
.filter(
LoyaltyCard.merchant_id == program.merchant_id,
LoyaltyCard.points_balance > 0,
LoyaltyCard.is_active == True, # noqa: E712
LoyaltyCard.last_activity_at < warning_threshold,
LoyaltyCard.last_activity_at >= expiration_threshold,
or_(
LoyaltyCard.last_expiration_warning_at.is_(None),
LoyaltyCard.last_expiration_warning_at < warning_threshold,
),
)
.limit(CHUNK_SIZE)
.all()
)
if not cards:
break
chunk_warnings = 0
for card in cards:
try:
from app.modules.loyalty.services.notification_service import (
notification_service,
)
notification_service.send_points_expiration_warning(
db,
card,
expiring_points=card.points_balance,
days_remaining=warning_days,
expiration_date=expiration_date,
)
card.last_expiration_warning_at = now
chunk_warnings += 1
except Exception:
logger.warning(
f"Failed to queue expiration warning for card {card.id}",
exc_info=True,
)
db.commit()
total_warnings += chunk_warnings
if total_warnings:
logger.info(
f"Sent {total_warnings} expiration warnings for program {program.id}"
)
return total_warnings
# Allow running directly for testing
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.DEBUG)
result = expire_points()
print(f"Result: {result}")
sys.exit(0 if result["status"] == "success" else 1)