Files
orion/app/modules/loyalty/services/card_service.py
Samir Boulahtit c9b2ecbdff
Some checks failed
CI / ruff (push) Successful in 10s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has started running
fix: use SQL func.replace instead of Python str.replace on column
LoyaltyCard.card_number is a SQLAlchemy column, not a string —
cannot call .replace() on it. Use func.replace() for the SQL query.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 22:12:49 +01:00

720 lines
23 KiB
Python

# app/modules/loyalty/services/card_service.py
"""
Loyalty card service.
Merchant-based card operations:
- Cards belong to a merchant's loyalty program
- One card per customer per merchant
- Can be used at any store within the merchant
Handles card operations including:
- Customer enrollment (with welcome bonus)
- Card lookup (by ID, QR code, card number, email, phone)
- Card management (activation, deactivation)
"""
import logging
from datetime import UTC, datetime
from sqlalchemy.orm import Session, joinedload
from app.modules.loyalty.exceptions import (
CustomerIdentifierRequiredException,
CustomerNotFoundByEmailException,
LoyaltyCardAlreadyExistsException,
LoyaltyCardNotFoundException,
LoyaltyProgramInactiveException,
LoyaltyProgramNotFoundException,
)
from app.modules.loyalty.models import (
LoyaltyCard,
LoyaltyProgram,
LoyaltyTransaction,
TransactionType,
)
logger = logging.getLogger(__name__)
class CardService:
"""Service for loyalty card operations."""
# =========================================================================
# Read Operations
# =========================================================================
def get_card(self, db: Session, card_id: int) -> LoyaltyCard | None:
"""Get a loyalty card by ID."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(LoyaltyCard.id == card_id)
.first()
)
def get_card_by_qr_code(self, db: Session, qr_code: str) -> LoyaltyCard | None:
"""Get a loyalty card by QR code data."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(LoyaltyCard.qr_code_data == qr_code)
.first()
)
def get_card_by_number(self, db: Session, card_number: str) -> LoyaltyCard | None:
"""Get a loyalty card by card number."""
from sqlalchemy import func
# Normalize card number (remove dashes/spaces)
normalized = card_number.replace("-", "").replace(" ", "")
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(
func.replace(func.replace(LoyaltyCard.card_number, "-", ""), " ", "") == normalized
)
.first()
)
def get_card_by_customer_and_merchant(
self,
db: Session,
customer_id: int,
merchant_id: int,
) -> LoyaltyCard | None:
"""Get a customer's card for a merchant's program."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(
LoyaltyCard.customer_id == customer_id,
LoyaltyCard.merchant_id == merchant_id,
)
.first()
)
def get_card_by_customer_and_program(
self,
db: Session,
customer_id: int,
program_id: int,
) -> LoyaltyCard | None:
"""Get a customer's card for a specific program."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(
LoyaltyCard.customer_id == customer_id,
LoyaltyCard.program_id == program_id,
)
.first()
)
def get_card_by_serial_number(self, db: Session, serial_number: str) -> LoyaltyCard | None:
"""Get a loyalty card by Apple serial number."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program))
.filter(LoyaltyCard.apple_serial_number == serial_number)
.first()
)
def require_card(self, db: Session, card_id: int) -> LoyaltyCard:
"""Get a card or raise exception if not found."""
card = self.get_card(db, card_id)
if not card:
raise LoyaltyCardNotFoundException(str(card_id))
return card
def require_card_by_serial_number(self, db: Session, serial_number: str) -> LoyaltyCard:
"""Get a card by Apple serial number or raise exception if not found."""
card = self.get_card_by_serial_number(db, serial_number)
if not card:
raise LoyaltyCardNotFoundException(serial_number)
return card
def resolve_customer_id(
self,
db: Session,
*,
customer_id: int | None,
email: str | None,
store_id: int,
create_if_missing: bool = False,
customer_name: str | None = None,
customer_phone: str | None = None,
customer_birthday: str | None = None,
) -> int:
"""
Resolve a customer ID from either a direct ID or email lookup.
Args:
db: Database session
customer_id: Direct customer ID (used if provided)
email: Customer email to look up
store_id: Store ID for scoping the email lookup
create_if_missing: If True, create customer when email not found
(used for self-enrollment)
customer_name: Full name for customer creation
customer_phone: Phone for customer creation
customer_birthday: Birthday (YYYY-MM-DD) for customer creation
Returns:
Resolved customer ID
Raises:
CustomerIdentifierRequiredException: If neither customer_id nor email provided
CustomerNotFoundByEmailException: If email lookup fails and create_if_missing is False
"""
if customer_id:
return customer_id
if email:
from app.modules.customers.models.customer import Customer
customer = (
db.query(Customer)
.filter(Customer.email == email, Customer.store_id == store_id)
.first()
)
if customer:
return customer.id
if create_if_missing:
import secrets
from app.modules.customers.services.customer_service import (
customer_service,
)
from app.modules.tenancy.models.store import Store
store = db.query(Store).filter(Store.id == store_id).first()
store_code = store.store_code if store else "STORE"
# Parse name into first/last
first_name = customer_name or ""
last_name = ""
if customer_name and " " in customer_name:
parts = customer_name.split(" ", 1)
first_name = parts[0]
last_name = parts[1]
# Generate unusable password hash and unique customer number
unusable_hash = f"!loyalty-enroll!{secrets.token_hex(32)}"
cust_number = customer_service._generate_customer_number(
db, store_id, store_code
)
customer = Customer(
email=email,
first_name=first_name,
last_name=last_name,
phone=customer_phone,
hashed_password=unusable_hash,
customer_number=cust_number,
store_id=store_id,
is_active=True,
)
db.add(customer)
db.flush()
logger.info(
f"Created customer {customer.id} ({email}) "
f"number={cust_number} for self-enrollment"
)
return customer.id
raise CustomerNotFoundByEmailException(email)
raise CustomerIdentifierRequiredException()
def lookup_card(
self,
db: Session,
*,
card_id: int | None = None,
qr_code: str | None = None,
card_number: str | None = None,
merchant_id: int | None = None,
) -> LoyaltyCard:
"""
Look up a card by any identifier.
Args:
db: Database session
card_id: Card ID
qr_code: QR code data
card_number: Card number (with or without dashes)
merchant_id: Optional merchant filter
Returns:
Found card
Raises:
LoyaltyCardNotFoundException: If no card found
"""
card = None
if card_id:
card = self.get_card(db, card_id)
elif qr_code:
card = self.get_card_by_qr_code(db, qr_code)
elif card_number:
card = self.get_card_by_number(db, card_number)
if not card:
identifier = card_id or qr_code or card_number or "unknown"
raise LoyaltyCardNotFoundException(str(identifier))
# Filter by merchant if specified
if merchant_id and card.merchant_id != merchant_id:
raise LoyaltyCardNotFoundException(str(card_id or qr_code or card_number))
return card
def lookup_card_for_store(
self,
db: Session,
store_id: int,
*,
card_id: int | None = None,
qr_code: str | None = None,
card_number: str | None = None,
) -> LoyaltyCard:
"""
Look up a card for a specific store (must be in same merchant).
Args:
db: Database session
store_id: Store ID (to get merchant context)
card_id: Card ID
qr_code: QR code data
card_number: Card number
Returns:
Found card (verified to be in store's merchant)
Raises:
LoyaltyCardNotFoundException: If no card found or wrong merchant
"""
from app.modules.tenancy.models import Store
store = db.query(Store).filter(Store.id == store_id).first()
if not store:
raise LoyaltyCardNotFoundException("store not found")
return self.lookup_card(
db,
card_id=card_id,
qr_code=qr_code,
card_number=card_number,
merchant_id=store.merchant_id,
)
def search_card_for_store(
self,
db: Session,
store_id: int,
query: str,
) -> LoyaltyCard | None:
"""
Search for a card by free-text query (card number or customer email).
Args:
db: Database session
store_id: Store ID (to scope merchant and customer lookup)
query: Search string — card number or customer email
Returns:
Found card or None
"""
from app.modules.customers.models import Customer
from app.modules.tenancy.models import Store
store = db.query(Store).filter(Store.id == store_id).first()
if not store:
return None
merchant_id = store.merchant_id
# Try card number
card = self.get_card_by_number(db, query)
if card and card.merchant_id == merchant_id:
return card
# Try customer email
customer = (
db.query(Customer)
.filter(Customer.email == query, Customer.store_id == store_id)
.first()
)
if customer:
card = self.get_card_by_customer_and_merchant(db, customer.id, merchant_id)
if card:
return card
return None
def list_cards(
self,
db: Session,
merchant_id: int,
*,
store_id: int | None = None,
skip: int = 0,
limit: int = 50,
is_active: bool | None = None,
search: str | None = None,
) -> tuple[list[LoyaltyCard], int]:
"""
List loyalty cards for a merchant.
Args:
db: Database session
merchant_id: Merchant ID
store_id: Optional filter by enrolled store
skip: Pagination offset
limit: Pagination limit
is_active: Filter by active status
search: Search by card number, email, or name
Returns:
(cards, total_count)
"""
from app.modules.customers.models.customer import Customer
query = (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.customer))
.filter(LoyaltyCard.merchant_id == merchant_id)
)
if store_id:
query = query.filter(LoyaltyCard.enrolled_at_store_id == store_id)
if is_active is not None:
query = query.filter(LoyaltyCard.is_active == is_active)
if search:
# Normalize search term for card number matching
search_normalized = search.replace("-", "").replace(" ", "")
query = query.join(Customer).filter(
(LoyaltyCard.card_number.replace("-", "").ilike(f"%{search_normalized}%"))
| (Customer.email.ilike(f"%{search}%"))
| (Customer.first_name.ilike(f"%{search}%"))
| (Customer.last_name.ilike(f"%{search}%"))
| (Customer.phone.ilike(f"%{search}%"))
)
total = query.count()
cards = (
query.order_by(LoyaltyCard.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return cards, total
def list_customer_cards(
self,
db: Session,
customer_id: int,
) -> list[LoyaltyCard]:
"""List all loyalty cards for a customer."""
return (
db.query(LoyaltyCard)
.options(joinedload(LoyaltyCard.program), joinedload(LoyaltyCard.merchant))
.filter(LoyaltyCard.customer_id == customer_id)
.all()
)
# =========================================================================
# Write Operations
# =========================================================================
def enroll_customer(
self,
db: Session,
customer_id: int,
merchant_id: int,
*,
enrolled_at_store_id: int | None = None,
) -> LoyaltyCard:
"""
Enroll a customer in a merchant's loyalty program.
Args:
db: Database session
customer_id: Customer ID
merchant_id: Merchant ID
enrolled_at_store_id: Store where customer enrolled (for analytics)
Returns:
Created loyalty card
Raises:
LoyaltyProgramNotFoundException: If no program exists
LoyaltyProgramInactiveException: If program is inactive
LoyaltyCardAlreadyExistsException: If customer already enrolled
"""
# Get the program
program = (
db.query(LoyaltyProgram)
.filter(LoyaltyProgram.merchant_id == merchant_id)
.first()
)
if not program:
raise LoyaltyProgramNotFoundException(f"merchant:{merchant_id}")
if not program.is_active:
raise LoyaltyProgramInactiveException(program.id)
# Check if customer already has a card
existing = self.get_card_by_customer_and_merchant(db, customer_id, merchant_id)
if existing:
raise LoyaltyCardAlreadyExistsException(customer_id, program.id)
# Create the card
card = LoyaltyCard(
merchant_id=merchant_id,
customer_id=customer_id,
program_id=program.id,
enrolled_at_store_id=enrolled_at_store_id,
)
db.add(card)
db.flush() # Get the card ID
# Create enrollment transaction
transaction = LoyaltyTransaction(
merchant_id=merchant_id,
card_id=card.id,
store_id=enrolled_at_store_id,
transaction_type=TransactionType.CARD_CREATED.value,
transaction_at=datetime.now(UTC),
)
db.add(transaction)
# Award welcome bonus if configured
if program.welcome_bonus_points > 0:
card.add_points(program.welcome_bonus_points)
bonus_transaction = LoyaltyTransaction(
merchant_id=merchant_id,
card_id=card.id,
store_id=enrolled_at_store_id,
transaction_type=TransactionType.WELCOME_BONUS.value,
points_delta=program.welcome_bonus_points,
points_balance_after=card.points_balance,
notes="Welcome bonus on enrollment",
transaction_at=datetime.now(UTC),
)
db.add(bonus_transaction)
db.commit()
db.refresh(card)
# Create wallet objects (Google Wallet, Apple Wallet)
# Lazy import to avoid circular imports; exception-safe (logs but doesn't raise)
from app.modules.loyalty.services.wallet_service import wallet_service
wallet_service.create_wallet_objects(db, card)
logger.info(
f"Enrolled customer {customer_id} in merchant {merchant_id} loyalty program "
f"(card: {card.card_number}, bonus: {program.welcome_bonus_points} pts)"
)
return card
def enroll_customer_for_store(
self,
db: Session,
customer_id: int,
store_id: int,
) -> LoyaltyCard:
"""
Enroll a customer through a specific store.
Looks up the store's merchant and enrolls in the merchant's program.
Args:
db: Database session
customer_id: Customer ID
store_id: Store ID
Returns:
Created loyalty card
"""
from app.modules.tenancy.models import Store
store = db.query(Store).filter(Store.id == store_id).first()
if not store:
raise LoyaltyProgramNotFoundException(f"store:{store_id}")
return self.enroll_customer(
db,
customer_id,
store.merchant_id,
enrolled_at_store_id=store_id,
)
def deactivate_card(
self,
db: Session,
card_id: int,
*,
store_id: int | None = None,
) -> LoyaltyCard:
"""Deactivate a loyalty card."""
card = self.require_card(db, card_id)
card.is_active = False
# Create deactivation transaction
transaction = LoyaltyTransaction(
merchant_id=card.merchant_id,
card_id=card.id,
store_id=store_id,
transaction_type=TransactionType.CARD_DEACTIVATED.value,
transaction_at=datetime.now(UTC),
)
db.add(transaction)
db.commit()
db.refresh(card)
logger.info(f"Deactivated loyalty card {card_id}")
return card
def reactivate_card(self, db: Session, card_id: int) -> LoyaltyCard:
"""Reactivate a deactivated loyalty card."""
card = self.require_card(db, card_id)
card.is_active = True
db.commit()
db.refresh(card)
logger.info(f"Reactivated loyalty card {card_id}")
return card
# =========================================================================
# Helpers
# =========================================================================
def get_stamps_today(self, db: Session, card_id: int) -> int:
"""Get number of stamps earned today for a card."""
from sqlalchemy import func
today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
count = (
db.query(func.count(LoyaltyTransaction.id))
.filter(
LoyaltyTransaction.card_id == card_id,
LoyaltyTransaction.transaction_type == TransactionType.STAMP_EARNED.value,
LoyaltyTransaction.transaction_at >= today_start,
)
.scalar()
)
return count or 0
def get_card_transactions(
self,
db: Session,
card_id: int,
*,
skip: int = 0,
limit: int = 50,
) -> tuple[list[LoyaltyTransaction], int]:
"""Get transaction history for a card."""
query = (
db.query(LoyaltyTransaction)
.options(joinedload(LoyaltyTransaction.store))
.filter(LoyaltyTransaction.card_id == card_id)
.order_by(LoyaltyTransaction.transaction_at.desc())
)
total = query.count()
transactions = query.offset(skip).limit(limit).all()
return transactions, total
def get_store_transactions(
self,
db: Session,
merchant_id: int,
*,
store_id: int | None = None,
skip: int = 0,
limit: int = 10,
) -> tuple[list[LoyaltyTransaction], int]:
"""Get recent transactions for a merchant (optionally filtered by store)."""
query = (
db.query(LoyaltyTransaction)
.join(LoyaltyCard, LoyaltyTransaction.card_id == LoyaltyCard.id)
.options(joinedload(LoyaltyTransaction.store))
.filter(LoyaltyCard.merchant_id == merchant_id)
)
if store_id:
query = query.filter(LoyaltyTransaction.store_id == store_id)
query = query.order_by(LoyaltyTransaction.transaction_at.desc())
total = query.count()
transactions = query.offset(skip).limit(limit).all()
return transactions, total
def get_customer_transactions_with_store_names(
self,
db: Session,
card_id: int,
*,
skip: int = 0,
limit: int = 20,
) -> tuple[list[dict], int]:
"""
Get transaction history for a card with store names resolved.
Returns a list of dicts with transaction data including store_name.
"""
from app.modules.tenancy.models import Store as StoreModel
query = (
db.query(LoyaltyTransaction)
.filter(LoyaltyTransaction.card_id == card_id)
.order_by(LoyaltyTransaction.transaction_at.desc())
)
total = query.count()
transactions = query.offset(skip).limit(limit).all()
tx_responses = []
for tx in transactions:
tx_data = {
"id": tx.id,
"transaction_type": tx.transaction_type.value if hasattr(tx.transaction_type, "value") else str(tx.transaction_type),
"points_delta": tx.points_delta,
"stamps_delta": tx.stamps_delta,
"points_balance_after": tx.points_balance_after,
"stamps_balance_after": tx.stamps_balance_after,
"transaction_at": tx.transaction_at.isoformat() if tx.transaction_at else None,
"notes": tx.notes,
"store_name": None,
}
if tx.store_id:
store_obj = db.query(StoreModel).filter(StoreModel.id == tx.store_id).first()
if store_obj:
tx_data["store_name"] = store_obj.name
tx_responses.append(tx_data)
return tx_responses, total
# Singleton instance
card_service = CardService()