Files
orion/app/modules/billing/services/admin_subscription_service.py
Samir Boulahtit 7c43d6f4a2 refactor: fix all architecture validator findings (202 → 0)
Eliminate all 103 errors and 96 warnings from the architecture validator:

Phase 1 - Validator rules & YAML:
- Add NAM-001/NAM-002 exceptions for module-scoped router/service files
- Fix API-004 to detect # public comments on decorator lines
- Add module-specific exception bases to EXC-004 valid_bases
- Exclude storefront files from AUTH-004 store context check
- Add SVC-006 exceptions for loyalty service atomic commits
- Fix _get_rule() to search naming_rules and auth_rules categories
- Use plain # CODE comments instead of # noqa: CODE for custom rules

Phase 2 - Billing module (5 route files):
- Move _resolve_store_to_merchant to subscription_service
- Move tier/feature queries to feature_service, admin_subscription_service
- Extract 22 inline Pydantic schemas to billing/schemas/billing.py
- Replace all HTTPException with domain exceptions

Phase 3 - Loyalty module (4 routes + points_service):
- Add 7 domain exceptions (Apple auth, enrollment, device registration)
- Add service methods to card_service, program_service, apple_wallet_service
- Move all db.query() from routes to service layer
- Fix SVC-001: replace HTTPException in points_service with domain exception

Phase 4 - Remaining modules:
- tenancy: move store stats queries to admin_service
- cms: move platform resolution to content_page_service, add NoPlatformSubscriptionException
- messaging: move user/customer lookups to messaging_service
- Add ConfigDict(from_attributes=True) to ContentPageResponse

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 18:49:24 +01:00

377 lines
12 KiB
Python

# app/modules/billing/services/admin_subscription_service.py
"""
Admin Subscription Service.
Handles subscription management operations for platform administrators:
- Subscription tier CRUD
- Merchant subscription management
- Billing history queries
- Subscription analytics
"""
import logging
from math import ceil
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.exceptions import (
BusinessLogicException,
ConflictException,
ResourceNotFoundException,
)
from app.modules.billing.exceptions import TierNotFoundException
from app.modules.billing.models import (
BillingHistory,
MerchantSubscription,
SubscriptionStatus,
SubscriptionTier,
)
from app.modules.tenancy.models import Merchant
logger = logging.getLogger(__name__)
class AdminSubscriptionService:
"""Service for admin subscription management operations."""
# =========================================================================
# Subscription Tiers
# =========================================================================
def get_tiers(
self, db: Session, include_inactive: bool = False, platform_id: int | None = None
) -> list[SubscriptionTier]:
"""Get all subscription tiers, optionally filtered by platform."""
query = db.query(SubscriptionTier)
if not include_inactive:
query = query.filter(SubscriptionTier.is_active == True) # noqa: E712
if platform_id is not None:
query = query.filter(
(SubscriptionTier.platform_id == platform_id)
| (SubscriptionTier.platform_id.is_(None))
)
return query.order_by(SubscriptionTier.display_order).all()
def get_tier_by_code(self, db: Session, tier_code: str) -> SubscriptionTier:
"""Get a subscription tier by code."""
tier = (
db.query(SubscriptionTier)
.filter(SubscriptionTier.code == tier_code)
.first()
)
if not tier:
raise TierNotFoundException(tier_code)
return tier
def create_tier(self, db: Session, tier_data: dict) -> SubscriptionTier:
"""Create a new subscription tier."""
# Check for duplicate code
existing = (
db.query(SubscriptionTier)
.filter(SubscriptionTier.code == tier_data["code"])
.first()
)
if existing:
raise ConflictException(
f"Tier with code '{tier_data['code']}' already exists"
)
tier = SubscriptionTier(**tier_data)
db.add(tier)
logger.info(f"Created subscription tier: {tier.code}")
return tier
def update_tier(
self, db: Session, tier_code: str, update_data: dict
) -> SubscriptionTier:
"""Update a subscription tier."""
tier = self.get_tier_by_code(db, tier_code)
for field, value in update_data.items():
setattr(tier, field, value)
logger.info(f"Updated subscription tier: {tier.code}")
return tier
def deactivate_tier(self, db: Session, tier_code: str) -> None:
"""Soft-delete a subscription tier."""
tier = self.get_tier_by_code(db, tier_code)
# Check if any active subscriptions use this tier (by tier_id FK)
active_subs = (
db.query(MerchantSubscription)
.filter(
MerchantSubscription.tier_id == tier.id,
MerchantSubscription.status.in_([
SubscriptionStatus.ACTIVE.value,
SubscriptionStatus.TRIAL.value,
]),
)
.count()
)
if active_subs > 0:
raise BusinessLogicException(
f"Cannot delete tier: {active_subs} active subscriptions are using it",
"TIER_HAS_ACTIVE_SUBSCRIPTIONS",
)
tier.is_active = False
logger.info(f"Soft-deleted subscription tier: {tier.code}")
# =========================================================================
# Merchant Subscriptions
# =========================================================================
def list_subscriptions(
self,
db: Session,
page: int = 1,
per_page: int = 20,
status: str | None = None,
tier: str | None = None,
search: str | None = None,
) -> dict:
"""List merchant subscriptions with filtering and pagination."""
query = (
db.query(MerchantSubscription, Merchant)
.join(Merchant, MerchantSubscription.merchant_id == Merchant.id)
)
# Apply filters
if status:
query = query.filter(MerchantSubscription.status == status)
if tier:
query = query.join(
SubscriptionTier, MerchantSubscription.tier_id == SubscriptionTier.id
).filter(SubscriptionTier.code == tier)
if search:
query = query.filter(Merchant.name.ilike(f"%{search}%"))
# Count total
total = query.count()
# Paginate
offset = (page - 1) * per_page
results = (
query.order_by(MerchantSubscription.created_at.desc())
.offset(offset)
.limit(per_page)
.all()
)
return {
"results": results,
"total": total,
"page": page,
"per_page": per_page,
"pages": ceil(total / per_page) if total > 0 else 0,
}
def get_subscription(
self, db: Session, merchant_id: int, platform_id: int
) -> tuple:
"""Get subscription for a specific merchant on a platform."""
result = (
db.query(MerchantSubscription, Merchant)
.join(Merchant, MerchantSubscription.merchant_id == Merchant.id)
.filter(
MerchantSubscription.merchant_id == merchant_id,
MerchantSubscription.platform_id == platform_id,
)
.first()
)
if not result:
raise ResourceNotFoundException(
"Subscription",
f"merchant_id={merchant_id}, platform_id={platform_id}",
)
return result
def update_subscription(
self, db: Session, merchant_id: int, platform_id: int, update_data: dict
) -> tuple:
"""Update a merchant's subscription."""
result = self.get_subscription(db, merchant_id, platform_id)
sub, merchant = result
# Handle tier_code separately: resolve to tier_id
tier_code = update_data.pop("tier_code", None)
if tier_code is not None:
if sub.stripe_subscription_id:
from app.modules.billing.services.billing_service import billing_service
billing_service.change_tier(
db, merchant_id, platform_id, tier_code, sub.is_annual
)
else:
tier = self.get_tier_by_code(db, tier_code)
sub.tier_id = tier.id
for field, value in update_data.items():
setattr(sub, field, value)
logger.info(
f"Admin updated subscription for merchant {merchant_id} "
f"on platform {platform_id}: {list(update_data.keys())}"
+ (f", tier_code={tier_code}" if tier_code else "")
)
return sub, merchant
# =========================================================================
# Billing History
# =========================================================================
def list_billing_history(
self,
db: Session,
page: int = 1,
per_page: int = 20,
merchant_id: int | None = None,
status: str | None = None,
) -> dict:
"""List billing history across all merchants."""
query = (
db.query(BillingHistory, Merchant)
.join(Merchant, BillingHistory.merchant_id == Merchant.id)
)
if merchant_id:
query = query.filter(BillingHistory.merchant_id == merchant_id)
if status:
query = query.filter(BillingHistory.status == status)
total = query.count()
offset = (page - 1) * per_page
results = (
query.order_by(BillingHistory.invoice_date.desc())
.offset(offset)
.limit(per_page)
.all()
)
return {
"results": results,
"total": total,
"page": page,
"per_page": per_page,
"pages": ceil(total / per_page) if total > 0 else 0,
}
# =========================================================================
# Platform Helpers
# =========================================================================
def get_platform_names_map(self, db: Session) -> dict[int, str]:
"""Get mapping of platform_id -> platform_name."""
from app.modules.tenancy.models import Platform
return {p.id: p.name for p in db.query(Platform).all()}
def get_platform_name(self, db: Session, platform_id: int) -> str | None:
"""Get platform name by ID."""
from app.modules.tenancy.models import Platform
p = db.query(Platform).filter(Platform.id == platform_id).first()
return p.name if p else None
# =========================================================================
# Statistics
# =========================================================================
def get_stats(self, db: Session) -> dict:
"""Get subscription statistics for admin dashboard."""
# Count by status
status_counts = (
db.query(
MerchantSubscription.status,
func.count(MerchantSubscription.id),
)
.group_by(MerchantSubscription.status)
.all()
)
stats = {
"total_subscriptions": 0,
"active_count": 0,
"trial_count": 0,
"past_due_count": 0,
"cancelled_count": 0,
"expired_count": 0,
}
for sub_status, count in status_counts:
stats["total_subscriptions"] += count
if sub_status == SubscriptionStatus.ACTIVE.value:
stats["active_count"] = count
elif sub_status == SubscriptionStatus.TRIAL.value:
stats["trial_count"] = count
elif sub_status == SubscriptionStatus.PAST_DUE.value:
stats["past_due_count"] = count
elif sub_status == SubscriptionStatus.CANCELLED.value:
stats["cancelled_count"] = count
elif sub_status == SubscriptionStatus.EXPIRED.value:
stats["expired_count"] = count
# Count by tier (join with SubscriptionTier to get tier name)
tier_counts = (
db.query(SubscriptionTier.name, func.count(MerchantSubscription.id))
.join(
SubscriptionTier,
MerchantSubscription.tier_id == SubscriptionTier.id,
)
.filter(
MerchantSubscription.status.in_([
SubscriptionStatus.ACTIVE.value,
SubscriptionStatus.TRIAL.value,
])
)
.group_by(SubscriptionTier.name)
.all()
)
tier_distribution = dict(tier_counts)
# Calculate MRR (Monthly Recurring Revenue)
mrr_cents = 0
arr_cents = 0
active_subs = (
db.query(MerchantSubscription, SubscriptionTier)
.join(
SubscriptionTier,
MerchantSubscription.tier_id == SubscriptionTier.id,
)
.filter(MerchantSubscription.status == SubscriptionStatus.ACTIVE.value)
.all()
)
for sub, sub_tier in active_subs:
if sub.is_annual and sub_tier.price_annual_cents:
mrr_cents += sub_tier.price_annual_cents // 12
arr_cents += sub_tier.price_annual_cents
else:
mrr_cents += sub_tier.price_monthly_cents
arr_cents += sub_tier.price_monthly_cents * 12
stats["tier_distribution"] = tier_distribution
stats["mrr_cents"] = mrr_cents
stats["arr_cents"] = arr_cents
return stats
# Singleton instance
admin_subscription_service = AdminSubscriptionService()