# app/modules/loyalty/services/card_service.py """ Loyalty card service. Merchant-based card operations: - Cards belong to a merchant's loyalty program - One card per customer per merchant - Can be used at any store within the merchant Handles card operations including: - Customer enrollment (with welcome bonus) - Card lookup (by ID, QR code, card number, email, phone) - Card management (activation, deactivation) """ import logging from datetime import UTC, date, datetime from sqlalchemy.orm import Session, joinedload from app.modules.loyalty.exceptions import ( CustomerIdentifierRequiredException, CustomerNotFoundByEmailException, LoyaltyCardAlreadyExistsException, LoyaltyCardNotFoundException, LoyaltyProgramInactiveException, LoyaltyProgramNotFoundException, ) from app.modules.loyalty.models import ( LoyaltyCard, LoyaltyProgram, LoyaltyTransaction, TransactionType, ) logger = logging.getLogger(__name__) class CardService: """Service for loyalty card operations.""" # ========================================================================= # Read Operations # ========================================================================= def get_card(self, db: Session, card_id: int) -> LoyaltyCard | None: """Get a loyalty card by ID.""" return ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.program)) .filter(LoyaltyCard.id == card_id) .first() ) def get_recent_cards(self, db: Session, limit: int = 20) -> list[LoyaltyCard]: """Get the most recently created cards with program and customer loaded.""" return ( db.query(LoyaltyCard) .options( joinedload(LoyaltyCard.customer), joinedload(LoyaltyCard.program), ) .order_by(LoyaltyCard.created_at.desc()) .limit(limit) .all() ) def get_card_for_update(self, db: Session, card_id: int) -> LoyaltyCard | None: """Get a loyalty card by ID with a row-level lock (SELECT ... FOR UPDATE). Note: Does not use joinedload to avoid LEFT OUTER JOIN which is incompatible with FOR UPDATE in PostgreSQL. """ return ( db.query(LoyaltyCard) .filter(LoyaltyCard.id == card_id) .with_for_update() .first() ) def get_card_by_qr_code(self, db: Session, qr_code: str) -> LoyaltyCard | None: """Get a loyalty card by QR code data.""" return ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.program)) .filter(LoyaltyCard.qr_code_data == qr_code) .first() ) def get_card_by_number(self, db: Session, card_number: str) -> LoyaltyCard | None: """Get a loyalty card by card number.""" from sqlalchemy import func # Normalize card number (remove dashes/spaces) normalized = card_number.replace("-", "").replace(" ", "") return ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.program)) .filter( func.replace(func.replace(LoyaltyCard.card_number, "-", ""), " ", "") == normalized ) .first() ) def get_card_by_customer_and_merchant( self, db: Session, customer_id: int, merchant_id: int, ) -> LoyaltyCard | None: """Get a customer's card for a merchant's program.""" return ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.program)) .filter( LoyaltyCard.customer_id == customer_id, LoyaltyCard.merchant_id == merchant_id, ) .first() ) def get_card_by_customer_and_store( self, db: Session, customer_id: int, store_id: int, ) -> LoyaltyCard | None: """Get a customer's card for a specific store.""" return ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.program)) .filter( LoyaltyCard.customer_id == customer_id, LoyaltyCard.enrolled_at_store_id == store_id, ) .first() ) def get_card_by_customer_and_program( self, db: Session, customer_id: int, program_id: int, ) -> LoyaltyCard | None: """Get a customer's card for a specific program.""" return ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.program)) .filter( LoyaltyCard.customer_id == customer_id, LoyaltyCard.program_id == program_id, ) .first() ) def get_card_by_serial_number(self, db: Session, serial_number: str) -> LoyaltyCard | None: """Get a loyalty card by Apple serial number.""" return ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.program)) .filter(LoyaltyCard.apple_serial_number == serial_number) .first() ) def require_card(self, db: Session, card_id: int) -> LoyaltyCard: """Get a card or raise exception if not found.""" card = self.get_card(db, card_id) if not card: raise LoyaltyCardNotFoundException(str(card_id)) return card def require_card_by_serial_number(self, db: Session, serial_number: str) -> LoyaltyCard: """Get a card by Apple serial number or raise exception if not found.""" card = self.get_card_by_serial_number(db, serial_number) if not card: raise LoyaltyCardNotFoundException(serial_number) return card def resolve_customer_id( self, db: Session, *, customer_id: int | None, email: str | None, store_id: int, merchant_id: int | None = None, create_if_missing: bool = False, customer_name: str | None = None, customer_phone: str | None = None, customer_birthday: date | None = None, ) -> int: """ Resolve a customer ID from either a direct ID or email lookup. Args: db: Database session customer_id: Direct customer ID (used if provided) email: Customer email to look up store_id: Store ID for scoping the email lookup merchant_id: Merchant ID for cross-store loyalty card lookup create_if_missing: If True, create customer when email not found (used for self-enrollment) customer_name: Full name for customer creation customer_phone: Phone for customer creation customer_birthday: Date of birth for customer creation Returns: Resolved customer ID Raises: CustomerIdentifierRequiredException: If neither customer_id nor email provided CustomerNotFoundByEmailException: If email lookup fails and create_if_missing is False """ if customer_id: return customer_id if email: from app.modules.customers.models.customer import ( Customer as CustomerModel, ) from app.modules.customers.services.customer_service import ( customer_service, ) customer = customer_service.get_customer_by_email(db, store_id, email) if customer: # Backfill birthday on existing customer if they didn't have # one before — keeps the enrollment form useful for returning # customers who never previously provided a birthday. if customer_birthday and not customer.birth_date: customer.birth_date = customer_birthday db.flush() return customer.id # Customers are store-scoped, but loyalty cards are merchant-scoped. # Check if this email already has a card under the same merchant at # a different store — if so, reuse that customer_id so the duplicate # check in enroll_customer() fires correctly. if merchant_id: existing_cardholder = ( db.query(CustomerModel) .join( LoyaltyCard, CustomerModel.id == LoyaltyCard.customer_id, ) .filter( CustomerModel.email == email.lower(), LoyaltyCard.merchant_id == merchant_id, ) .first() ) if existing_cardholder: if customer_birthday and not existing_cardholder.birth_date: existing_cardholder.birth_date = customer_birthday db.flush() return existing_cardholder.id if create_if_missing: # Parse name into first/last first_name = customer_name or "" last_name = "" if customer_name and " " in customer_name: parts = customer_name.split(" ", 1) first_name = parts[0] last_name = parts[1] customer = customer_service.create_customer_for_enrollment( db, store_id=store_id, email=email, first_name=first_name, last_name=last_name, phone=customer_phone, birth_date=customer_birthday, ) logger.info( f"Created customer {customer.id} ({email}) " f"for self-enrollment" ) return customer.id raise CustomerNotFoundByEmailException(email) raise CustomerIdentifierRequiredException() def lookup_card( self, db: Session, *, card_id: int | None = None, qr_code: str | None = None, card_number: str | None = None, merchant_id: int | None = None, ) -> LoyaltyCard: """ Look up a card by any identifier. Args: db: Database session card_id: Card ID qr_code: QR code data card_number: Card number (with or without dashes) merchant_id: Optional merchant filter Returns: Found card Raises: LoyaltyCardNotFoundException: If no card found """ card = None if card_id: card = self.get_card(db, card_id) elif qr_code: card = self.get_card_by_qr_code(db, qr_code) elif card_number: card = self.get_card_by_number(db, card_number) if not card: identifier = card_id or qr_code or card_number or "unknown" raise LoyaltyCardNotFoundException(str(identifier)) # Filter by merchant if specified if merchant_id and card.merchant_id != merchant_id: raise LoyaltyCardNotFoundException(str(card_id or qr_code or card_number)) return card def lookup_card_for_store( self, db: Session, store_id: int, *, card_id: int | None = None, qr_code: str | None = None, card_number: str | None = None, ) -> LoyaltyCard: """ Look up a card for a specific store (must be in same merchant). Args: db: Database session store_id: Store ID (to get merchant context) card_id: Card ID qr_code: QR code data card_number: Card number Returns: Found card (verified to be in store's merchant) Raises: LoyaltyCardNotFoundException: If no card found or wrong merchant """ from app.modules.tenancy.services.store_service import store_service store = store_service.get_store_by_id_optional(db, store_id) if not store: raise LoyaltyCardNotFoundException("store not found") return self.lookup_card( db, card_id=card_id, qr_code=qr_code, card_number=card_number, merchant_id=store.merchant_id, ) def search_card_for_store( self, db: Session, store_id: int, query: str, ) -> LoyaltyCard | None: """ Search for a card by free-text query (card number or customer email). Args: db: Database session store_id: Store ID (to scope merchant and customer lookup) query: Search string — card number or customer email Returns: Found card or None """ from app.modules.customers.services.customer_service import customer_service from app.modules.tenancy.services.store_service import store_service store = store_service.get_store_by_id_optional(db, store_id) if not store: return None merchant_id = store.merchant_id # Try card number — always merchant-scoped card = self.get_card_by_number(db, query) if card and card.merchant_id == merchant_id: return card # Try customer email — first at this store customer = customer_service.get_customer_by_email(db, store_id, query) if customer: card = self.get_card_by_customer_and_merchant(db, customer.id, merchant_id) if card: return card # Cross-store email search: the customer may have enrolled at a # different store under the same merchant. Only search when # cross-location redemption is enabled. from app.modules.customers.models.customer import Customer as CustomerModel from app.modules.loyalty.services.program_service import program_service settings = program_service.get_merchant_settings(db, merchant_id) cross_location_enabled = ( settings.allow_cross_location_redemption if settings else True ) if cross_location_enabled: cross_store_customer = ( db.query(CustomerModel) .join(LoyaltyCard, CustomerModel.id == LoyaltyCard.customer_id) .filter( CustomerModel.email == query.lower(), LoyaltyCard.merchant_id == merchant_id, ) .first() ) if cross_store_customer: card = self.get_card_by_customer_and_merchant( db, cross_store_customer.id, merchant_id ) if card: return card return None def list_cards( self, db: Session, merchant_id: int, *, store_id: int | None = None, skip: int = 0, limit: int = 50, is_active: bool | None = None, search: str | None = None, ) -> tuple[list[LoyaltyCard], int]: """ List loyalty cards for a merchant. Args: db: Database session merchant_id: Merchant ID store_id: Optional filter by enrolled store skip: Pagination offset limit: Pagination limit is_active: Filter by active status search: Search by card number, email, or name Returns: (cards, total_count) """ query = ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.customer)) .filter(LoyaltyCard.merchant_id == merchant_id) ) if store_id: query = query.filter(LoyaltyCard.enrolled_at_store_id == store_id) if is_active is not None: query = query.filter(LoyaltyCard.is_active == is_active) if search: from sqlalchemy import func # Normalize search term for card number matching search_normalized = search.replace("-", "").replace(" ", "") # Use relationship-based join to avoid direct Customer model import CustomerModel = LoyaltyCard.customer.property.mapper.class_ query = query.join(LoyaltyCard.customer).filter( (func.replace(LoyaltyCard.card_number, "-", "").ilike(f"%{search_normalized}%")) | (CustomerModel.email.ilike(f"%{search}%")) | (CustomerModel.first_name.ilike(f"%{search}%")) | (CustomerModel.last_name.ilike(f"%{search}%")) | (CustomerModel.phone.ilike(f"%{search}%")) ) total = query.count() cards = ( query.order_by(LoyaltyCard.created_at.desc()) .offset(skip) .limit(limit) .all() ) return cards, total def list_customer_cards( self, db: Session, customer_id: int, ) -> list[LoyaltyCard]: """List all loyalty cards for a customer.""" return ( db.query(LoyaltyCard) .options(joinedload(LoyaltyCard.program), joinedload(LoyaltyCard.merchant)) .filter(LoyaltyCard.customer_id == customer_id) .all() ) # ========================================================================= # Write Operations # ========================================================================= def enroll_customer( self, db: Session, customer_id: int, merchant_id: int, *, enrolled_at_store_id: int | None = None, ) -> LoyaltyCard: """ Enroll a customer in a merchant's loyalty program. Args: db: Database session customer_id: Customer ID merchant_id: Merchant ID enrolled_at_store_id: Store where customer enrolled (for analytics) Returns: Created loyalty card Raises: LoyaltyProgramNotFoundException: If no program exists LoyaltyProgramInactiveException: If program is inactive LoyaltyCardAlreadyExistsException: If customer already enrolled """ # Get the program program = ( db.query(LoyaltyProgram) .filter(LoyaltyProgram.merchant_id == merchant_id) .first() ) if not program: raise LoyaltyProgramNotFoundException(f"merchant:{merchant_id}") if not program.is_active: raise LoyaltyProgramInactiveException(program.id) # Check for duplicate enrollment — the scope depends on whether # cross-location redemption is enabled for this merchant. from app.modules.loyalty.services.program_service import program_service settings = program_service.get_merchant_settings(db, merchant_id) if settings and not settings.allow_cross_location_redemption: # Per-store cards: only block if the customer already has a card # at THIS specific store. Cards at other stores are allowed. if enrolled_at_store_id: existing = ( db.query(LoyaltyCard) .filter( LoyaltyCard.customer_id == customer_id, LoyaltyCard.enrolled_at_store_id == enrolled_at_store_id, ) .first() ) if existing: raise LoyaltyCardAlreadyExistsException(customer_id, program.id) else: # Cross-location enabled (default): one card per merchant existing = self.get_card_by_customer_and_merchant( db, customer_id, merchant_id ) if existing: raise LoyaltyCardAlreadyExistsException(customer_id, program.id) # Create the card card = LoyaltyCard( merchant_id=merchant_id, customer_id=customer_id, program_id=program.id, enrolled_at_store_id=enrolled_at_store_id, ) db.add(card) db.flush() # Get the card ID # Create enrollment transaction transaction = LoyaltyTransaction( merchant_id=merchant_id, card_id=card.id, store_id=enrolled_at_store_id, transaction_type=TransactionType.CARD_CREATED.value, transaction_at=datetime.now(UTC), ) db.add(transaction) # Award welcome bonus if configured if program.welcome_bonus_points > 0: card.add_points(program.welcome_bonus_points) bonus_transaction = LoyaltyTransaction( merchant_id=merchant_id, card_id=card.id, store_id=enrolled_at_store_id, transaction_type=TransactionType.WELCOME_BONUS.value, points_delta=program.welcome_bonus_points, points_balance_after=card.points_balance, notes="Welcome bonus on enrollment", transaction_at=datetime.now(UTC), ) db.add(bonus_transaction) db.commit() db.refresh(card) # Create wallet objects (Google Wallet, Apple Wallet) # Lazy import to avoid circular imports; exception-safe (logs but doesn't raise) from app.modules.loyalty.services.wallet_service import wallet_service wallet_service.create_wallet_objects(db, card) # Send notification emails (async via Celery) try: from app.modules.loyalty.services.notification_service import ( notification_service, ) notification_service.send_enrollment_confirmation(db, card) if program.welcome_bonus_points > 0: notification_service.send_welcome_bonus( db, card, program.welcome_bonus_points ) except Exception: logger.warning( f"Failed to queue enrollment notification for card {card.id}", exc_info=True, ) logger.info( f"Enrolled customer {customer_id} in merchant {merchant_id} loyalty program " f"(card: {card.card_number}, bonus: {program.welcome_bonus_points} pts)" ) return card def enroll_customer_for_store( self, db: Session, customer_id: int, store_id: int, ) -> LoyaltyCard: """ Enroll a customer through a specific store. Looks up the store's merchant and enrolls in the merchant's program. Args: db: Database session customer_id: Customer ID store_id: Store ID Returns: Created loyalty card """ from app.modules.tenancy.services.store_service import store_service store = store_service.get_store_by_id_optional(db, store_id) if not store: raise LoyaltyProgramNotFoundException(f"store:{store_id}") return self.enroll_customer( db, customer_id, store.merchant_id, enrolled_at_store_id=store_id, ) def deactivate_card( self, db: Session, card_id: int, *, store_id: int | None = None, ) -> LoyaltyCard: """Deactivate a loyalty card.""" card = self.require_card(db, card_id) card.is_active = False # Create deactivation transaction transaction = LoyaltyTransaction( merchant_id=card.merchant_id, card_id=card.id, store_id=store_id, transaction_type=TransactionType.CARD_DEACTIVATED.value, transaction_at=datetime.now(UTC), ) db.add(transaction) db.commit() db.refresh(card) logger.info(f"Deactivated loyalty card {card_id}") return card def reactivate_card(self, db: Session, card_id: int) -> LoyaltyCard: """Reactivate a deactivated loyalty card.""" card = self.require_card(db, card_id) card.is_active = True # Create reactivation transaction for audit trail transaction = LoyaltyTransaction( merchant_id=card.merchant_id, card_id=card.id, transaction_type=TransactionType.CARD_REACTIVATED.value, transaction_at=datetime.now(UTC), ) db.add(transaction) db.commit() db.refresh(card) logger.info(f"Reactivated loyalty card {card_id}") return card # ========================================================================= # Helpers # ========================================================================= def get_stamps_today(self, db: Session, card_id: int) -> int: """Get number of stamps earned today for a card.""" from sqlalchemy import func today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) count = ( db.query(func.count(LoyaltyTransaction.id)) .filter( LoyaltyTransaction.card_id == card_id, LoyaltyTransaction.transaction_type == TransactionType.STAMP_EARNED.value, LoyaltyTransaction.transaction_at >= today_start, ) .scalar() ) return count or 0 def get_card_transactions( self, db: Session, card_id: int, *, skip: int = 0, limit: int = 50, ) -> tuple[list[LoyaltyTransaction], int]: """Get transaction history for a card.""" query = ( db.query(LoyaltyTransaction) .options(joinedload(LoyaltyTransaction.store)) .filter(LoyaltyTransaction.card_id == card_id) .order_by(LoyaltyTransaction.transaction_at.desc()) ) total = query.count() transactions = query.offset(skip).limit(limit).all() return transactions, total def get_store_transactions( self, db: Session, merchant_id: int, *, store_id: int | None = None, skip: int = 0, limit: int = 10, ) -> tuple[list[LoyaltyTransaction], int]: """Get recent transactions for a merchant (optionally filtered by store).""" query = ( db.query(LoyaltyTransaction) .join(LoyaltyCard, LoyaltyTransaction.card_id == LoyaltyCard.id) .options( joinedload(LoyaltyTransaction.store), joinedload(LoyaltyTransaction.card).joinedload(LoyaltyCard.customer), ) .filter(LoyaltyCard.merchant_id == merchant_id) ) if store_id: query = query.filter(LoyaltyTransaction.store_id == store_id) query = query.order_by(LoyaltyTransaction.transaction_at.desc()) total = query.count() transactions = query.offset(skip).limit(limit).all() return transactions, total def get_customer_transactions_with_store_names( self, db: Session, card_id: int, *, skip: int = 0, limit: int = 20, ) -> tuple[list[dict], int]: """ Get transaction history for a card with store names resolved. Returns a list of dicts with transaction data including store_name. """ from app.modules.tenancy.services.store_service import store_service query = ( db.query(LoyaltyTransaction) .filter(LoyaltyTransaction.card_id == card_id) .order_by(LoyaltyTransaction.transaction_at.desc()) ) total = query.count() transactions = query.offset(skip).limit(limit).all() tx_responses = [] for tx in transactions: tx_data = { "id": tx.id, "transaction_type": tx.transaction_type.value if hasattr(tx.transaction_type, "value") else str(tx.transaction_type), "points_delta": tx.points_delta, "stamps_delta": tx.stamps_delta, "points_balance_after": tx.points_balance_after, "stamps_balance_after": tx.stamps_balance_after, "transaction_at": tx.transaction_at.isoformat() if tx.transaction_at else None, "notes": tx.notes, "store_name": None, } if tx.store_id: store_obj = store_service.get_store_by_id_optional(db, tx.store_id) if store_obj: tx_data["store_name"] = store_obj.name tx_responses.append(tx_data) return tx_responses, total # Singleton instance card_service = CardService()