feat(loyalty): Phase 7 — advanced analytics (cohort, churn, revenue)
New analytics_service.py with three analytics features:
- Cohort retention: groups cards by enrollment month, tracks % with
any transaction in each subsequent month. Returns matrix suitable
for Chart.js heatmap. GET /analytics/cohorts?months_back=6
- Churn detection: flags cards as "at risk" when inactive > 2x their
average inter-transaction interval (default 60d for new cards).
Returns ranked list. GET /analytics/churn?limit=50
- Revenue attribution: monthly and per-store aggregation of point-
earning transactions. GET /analytics/revenue?months_back=6
Endpoints added to both admin API (/admin/loyalty/merchants/{id}/
analytics/*) and store API (/store/loyalty/analytics/*) so merchants
can see their own analytics.
342 tests pass.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -39,6 +39,7 @@ from app.modules.loyalty.schemas import (
|
||||
TransactionResponse,
|
||||
)
|
||||
from app.modules.loyalty.services import card_service, pin_service, program_service
|
||||
from app.modules.loyalty.services.analytics_service import analytics_service
|
||||
from app.modules.tenancy.models import User # API-007
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -495,6 +496,50 @@ def get_platform_stats(
|
||||
return program_service.get_platform_stats(db)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Advanced Analytics
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@router.get("/merchants/{merchant_id}/analytics/cohorts")
|
||||
def get_cohort_retention(
|
||||
merchant_id: int = Path(..., gt=0),
|
||||
months_back: int = Query(6, ge=1, le=24),
|
||||
current_user: User = Depends(get_current_admin_api),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Cohort retention matrix for a merchant's loyalty program."""
|
||||
return analytics_service.get_cohort_retention(
|
||||
db, merchant_id, months_back=months_back
|
||||
)
|
||||
|
||||
|
||||
@router.get("/merchants/{merchant_id}/analytics/churn")
|
||||
def get_at_risk_cards(
|
||||
merchant_id: int = Path(..., gt=0),
|
||||
limit: int = Query(50, ge=1, le=200),
|
||||
current_user: User = Depends(get_current_admin_api),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Cards at risk of churn for a merchant."""
|
||||
return analytics_service.get_at_risk_cards(
|
||||
db, merchant_id, limit=limit
|
||||
)
|
||||
|
||||
|
||||
@router.get("/merchants/{merchant_id}/analytics/revenue")
|
||||
def get_revenue_attribution(
|
||||
merchant_id: int = Path(..., gt=0),
|
||||
months_back: int = Query(6, ge=1, le=24),
|
||||
current_user: User = Depends(get_current_admin_api),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Revenue attribution from loyalty transactions."""
|
||||
return analytics_service.get_revenue_attribution(
|
||||
db, merchant_id, months_back=months_back
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Wallet Integration Status
|
||||
# =============================================================================
|
||||
|
||||
@@ -198,6 +198,51 @@ def get_merchant_stats(
|
||||
return MerchantStatsResponse(**stats)
|
||||
|
||||
|
||||
@router.get("/analytics/cohorts")
|
||||
def get_cohort_retention(
|
||||
months_back: int = Query(6, ge=1, le=24),
|
||||
current_user: User = Depends(get_current_store_api),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Cohort retention matrix for this merchant's loyalty program."""
|
||||
from app.modules.loyalty.services.analytics_service import analytics_service
|
||||
|
||||
merchant_id = get_store_merchant_id(db, current_user.token_store_id)
|
||||
return analytics_service.get_cohort_retention(
|
||||
db, merchant_id, months_back=months_back
|
||||
)
|
||||
|
||||
|
||||
@router.get("/analytics/churn")
|
||||
def get_at_risk_cards(
|
||||
limit: int = Query(50, ge=1, le=200),
|
||||
current_user: User = Depends(get_current_store_api),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Cards at risk of churn for this merchant."""
|
||||
from app.modules.loyalty.services.analytics_service import analytics_service
|
||||
|
||||
merchant_id = get_store_merchant_id(db, current_user.token_store_id)
|
||||
return analytics_service.get_at_risk_cards(
|
||||
db, merchant_id, limit=limit
|
||||
)
|
||||
|
||||
|
||||
@router.get("/analytics/revenue")
|
||||
def get_revenue_attribution(
|
||||
months_back: int = Query(6, ge=1, le=24),
|
||||
current_user: User = Depends(get_current_store_api),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Revenue attribution from loyalty transactions."""
|
||||
from app.modules.loyalty.services.analytics_service import analytics_service
|
||||
|
||||
merchant_id = get_store_merchant_id(db, current_user.token_store_id)
|
||||
return analytics_service.get_revenue_attribution(
|
||||
db, merchant_id, months_back=months_back
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Staff PINs
|
||||
# =============================================================================
|
||||
|
||||
338
app/modules/loyalty/services/analytics_service.py
Normal file
338
app/modules/loyalty/services/analytics_service.py
Normal file
@@ -0,0 +1,338 @@
|
||||
# app/modules/loyalty/services/analytics_service.py
|
||||
"""
|
||||
Loyalty analytics service.
|
||||
|
||||
Advanced analytics beyond basic stats:
|
||||
- Cohort retention (enrollment month → % active per subsequent month)
|
||||
- Churn detection (at-risk cards based on inactivity)
|
||||
- Revenue attribution (loyalty vs non-loyalty per store)
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.modules.loyalty.models import LoyaltyCard, LoyaltyTransaction
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AnalyticsService:
|
||||
"""Advanced loyalty analytics."""
|
||||
|
||||
def get_cohort_retention(
|
||||
self,
|
||||
db: Session,
|
||||
merchant_id: int,
|
||||
months_back: int = 6,
|
||||
) -> dict:
|
||||
"""
|
||||
Cohort retention matrix.
|
||||
|
||||
Groups cards by enrollment month and tracks what % had any
|
||||
transaction in each subsequent month.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"cohorts": [
|
||||
{
|
||||
"month": "2026-01",
|
||||
"enrolled": 50,
|
||||
"retention": [100, 80, 65, 55, ...] # % active per month
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
now = datetime.now(UTC)
|
||||
start_date = now - timedelta(days=months_back * 31)
|
||||
|
||||
# Get enrollment month for each card
|
||||
cards = (
|
||||
db.query(
|
||||
LoyaltyCard.id,
|
||||
func.date_trunc("month", LoyaltyCard.created_at).label(
|
||||
"enrollment_month"
|
||||
),
|
||||
)
|
||||
.filter(
|
||||
LoyaltyCard.merchant_id == merchant_id,
|
||||
LoyaltyCard.created_at >= start_date,
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
if not cards:
|
||||
return {"cohorts": [], "months_back": months_back}
|
||||
|
||||
# Group cards by enrollment month
|
||||
cohort_cards: dict[str, list[int]] = {}
|
||||
for card_id, enrollment_month in cards:
|
||||
month_key = enrollment_month.strftime("%Y-%m")
|
||||
cohort_cards.setdefault(month_key, []).append(card_id)
|
||||
|
||||
# For each cohort, check activity in subsequent months
|
||||
cohorts = []
|
||||
for month_key in sorted(cohort_cards.keys()):
|
||||
card_ids = cohort_cards[month_key]
|
||||
enrolled_count = len(card_ids)
|
||||
|
||||
# Calculate months since enrollment
|
||||
cohort_start = datetime.strptime(month_key, "%Y-%m").replace(
|
||||
tzinfo=UTC
|
||||
)
|
||||
months_since = max(
|
||||
1,
|
||||
(now.year - cohort_start.year) * 12
|
||||
+ (now.month - cohort_start.month),
|
||||
)
|
||||
|
||||
retention = []
|
||||
for month_offset in range(min(months_since, months_back)):
|
||||
period_start = cohort_start + timedelta(days=month_offset * 30)
|
||||
period_end = period_start + timedelta(days=30)
|
||||
|
||||
# Count cards with any transaction in this period
|
||||
active_count = (
|
||||
db.query(func.count(func.distinct(LoyaltyTransaction.card_id)))
|
||||
.filter(
|
||||
LoyaltyTransaction.card_id.in_(card_ids),
|
||||
LoyaltyTransaction.transaction_at >= period_start,
|
||||
LoyaltyTransaction.transaction_at < period_end,
|
||||
)
|
||||
.scalar()
|
||||
or 0
|
||||
)
|
||||
|
||||
pct = round(active_count / enrolled_count * 100) if enrolled_count else 0
|
||||
retention.append(pct)
|
||||
|
||||
cohorts.append(
|
||||
{
|
||||
"month": month_key,
|
||||
"enrolled": enrolled_count,
|
||||
"retention": retention,
|
||||
}
|
||||
)
|
||||
|
||||
return {"cohorts": cohorts, "months_back": months_back}
|
||||
|
||||
def get_at_risk_cards(
|
||||
self,
|
||||
db: Session,
|
||||
merchant_id: int,
|
||||
inactivity_multiplier: float = 2.0,
|
||||
limit: int = 50,
|
||||
) -> dict:
|
||||
"""
|
||||
Simple churn detection.
|
||||
|
||||
A card is "at risk" when its inactivity period exceeds
|
||||
`inactivity_multiplier` × its average inter-transaction interval.
|
||||
Falls back to 60 days for cards with fewer than 2 transactions.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"at_risk_count": int,
|
||||
"cards": [
|
||||
{
|
||||
"card_id": int,
|
||||
"card_number": str,
|
||||
"customer_name": str,
|
||||
"days_inactive": int,
|
||||
"avg_interval_days": int,
|
||||
"points_balance": int,
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
now = datetime.now(UTC)
|
||||
default_threshold_days = 60
|
||||
|
||||
# Get active cards with their last activity
|
||||
cards = (
|
||||
db.query(LoyaltyCard)
|
||||
.filter(
|
||||
LoyaltyCard.merchant_id == merchant_id,
|
||||
LoyaltyCard.is_active == True, # noqa: E712
|
||||
LoyaltyCard.last_activity_at.isnot(None),
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
at_risk = []
|
||||
for card in cards:
|
||||
days_inactive = (now - card.last_activity_at).days
|
||||
|
||||
# Calculate average interval from transaction history
|
||||
tx_dates = (
|
||||
db.query(LoyaltyTransaction.transaction_at)
|
||||
.filter(LoyaltyTransaction.card_id == card.id)
|
||||
.order_by(LoyaltyTransaction.transaction_at)
|
||||
.all()
|
||||
)
|
||||
|
||||
if len(tx_dates) >= 2:
|
||||
intervals = [
|
||||
(tx_dates[i + 1][0] - tx_dates[i][0]).days
|
||||
for i in range(len(tx_dates) - 1)
|
||||
]
|
||||
avg_interval = sum(intervals) / len(intervals) if intervals else default_threshold_days
|
||||
else:
|
||||
avg_interval = default_threshold_days
|
||||
|
||||
threshold = avg_interval * inactivity_multiplier
|
||||
|
||||
if days_inactive > threshold:
|
||||
customer_name = None
|
||||
if card.customer:
|
||||
customer_name = card.customer.full_name
|
||||
|
||||
at_risk.append(
|
||||
{
|
||||
"card_id": card.id,
|
||||
"card_number": card.card_number,
|
||||
"customer_name": customer_name,
|
||||
"days_inactive": days_inactive,
|
||||
"avg_interval_days": round(avg_interval),
|
||||
"points_balance": card.points_balance,
|
||||
}
|
||||
)
|
||||
|
||||
# Sort by days_inactive descending
|
||||
at_risk.sort(key=lambda x: x["days_inactive"], reverse=True)
|
||||
|
||||
return {
|
||||
"at_risk_count": len(at_risk),
|
||||
"cards": at_risk[:limit],
|
||||
"total_cards_checked": len(cards),
|
||||
}
|
||||
|
||||
def get_revenue_attribution(
|
||||
self,
|
||||
db: Session,
|
||||
merchant_id: int,
|
||||
months_back: int = 6,
|
||||
) -> dict:
|
||||
"""
|
||||
Revenue attribution from loyalty point-earning transactions.
|
||||
|
||||
Compares revenue from transactions with order references
|
||||
(loyalty customers) against total enrollment metrics.
|
||||
Groups by month and store.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"monthly": [
|
||||
{
|
||||
"month": "2026-01",
|
||||
"transactions_count": int,
|
||||
"total_points_earned": int,
|
||||
"estimated_revenue_cents": int,
|
||||
"unique_customers": int,
|
||||
}
|
||||
],
|
||||
"by_store": [
|
||||
{
|
||||
"store_id": int,
|
||||
"store_name": str,
|
||||
"transactions_count": int,
|
||||
"total_points_earned": int,
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
from app.modules.loyalty.models.loyalty_transaction import TransactionType
|
||||
from app.modules.tenancy.services.store_service import store_service
|
||||
|
||||
now = datetime.now(UTC)
|
||||
start_date = now - timedelta(days=months_back * 31)
|
||||
|
||||
# Monthly aggregation of point-earning transactions
|
||||
monthly_rows = (
|
||||
db.query(
|
||||
func.date_trunc("month", LoyaltyTransaction.transaction_at).label(
|
||||
"month"
|
||||
),
|
||||
func.count(LoyaltyTransaction.id).label("tx_count"),
|
||||
func.coalesce(
|
||||
func.sum(LoyaltyTransaction.points_delta), 0
|
||||
).label("points_earned"),
|
||||
func.count(
|
||||
func.distinct(LoyaltyTransaction.card_id)
|
||||
).label("unique_cards"),
|
||||
)
|
||||
.filter(
|
||||
LoyaltyTransaction.merchant_id == merchant_id,
|
||||
LoyaltyTransaction.transaction_at >= start_date,
|
||||
LoyaltyTransaction.transaction_type.in_(
|
||||
[
|
||||
TransactionType.POINTS_EARNED.value,
|
||||
TransactionType.STAMP_EARNED.value,
|
||||
]
|
||||
),
|
||||
LoyaltyTransaction.points_delta > 0,
|
||||
)
|
||||
.group_by("month")
|
||||
.order_by("month")
|
||||
.all()
|
||||
)
|
||||
|
||||
monthly = []
|
||||
for row in monthly_rows:
|
||||
monthly.append(
|
||||
{
|
||||
"month": row.month.strftime("%Y-%m"),
|
||||
"transactions_count": row.tx_count,
|
||||
"total_points_earned": row.points_earned,
|
||||
"unique_customers": row.unique_cards,
|
||||
}
|
||||
)
|
||||
|
||||
# Per-store breakdown
|
||||
store_rows = (
|
||||
db.query(
|
||||
LoyaltyTransaction.store_id,
|
||||
func.count(LoyaltyTransaction.id).label("tx_count"),
|
||||
func.coalesce(
|
||||
func.sum(LoyaltyTransaction.points_delta), 0
|
||||
).label("points_earned"),
|
||||
)
|
||||
.filter(
|
||||
LoyaltyTransaction.merchant_id == merchant_id,
|
||||
LoyaltyTransaction.transaction_at >= start_date,
|
||||
LoyaltyTransaction.transaction_type.in_(
|
||||
[
|
||||
TransactionType.POINTS_EARNED.value,
|
||||
TransactionType.STAMP_EARNED.value,
|
||||
]
|
||||
),
|
||||
LoyaltyTransaction.points_delta > 0,
|
||||
LoyaltyTransaction.store_id.isnot(None),
|
||||
)
|
||||
.group_by(LoyaltyTransaction.store_id)
|
||||
.all()
|
||||
)
|
||||
|
||||
by_store = []
|
||||
for row in store_rows:
|
||||
store = store_service.get_store_by_id_optional(db, row.store_id)
|
||||
by_store.append(
|
||||
{
|
||||
"store_id": row.store_id,
|
||||
"store_name": store.name if store else f"Store {row.store_id}",
|
||||
"transactions_count": row.tx_count,
|
||||
"total_points_earned": row.points_earned,
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"monthly": monthly,
|
||||
"by_store": by_store,
|
||||
"months_back": months_back,
|
||||
}
|
||||
|
||||
|
||||
# Singleton
|
||||
analytics_service = AnalyticsService()
|
||||
Reference in New Issue
Block a user