feat: complete analytics module self-containment

Migrate analytics module to fully self-contained structure:

- routes/api/vendor.py - API endpoints
- routes/pages/vendor.py - Page routes with full implementation
- services/stats_service.py - Business logic (moved from app/services)
- services/usage_service.py - Usage tracking (moved from app/services)
- schemas/stats.py - Pydantic schemas (moved from models/schema)
- models/__init__.py - Model exports
- templates/analytics/vendor/ - Templates (moved from app/templates)
- static/vendor/js/ - JavaScript (moved from static/vendor)
- locales/ - Translations (en, de, fr, lu)
- exceptions.py - Module exceptions

Removed legacy files:
- app/modules/analytics/routes/vendor.py (replaced by routes/pages/)
- static/admin/js/analytics.js (unused)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-28 22:21:21 +01:00
parent 2466dfd7ed
commit bd2c99a775
22 changed files with 1870 additions and 50 deletions

View File

@@ -2,21 +2,34 @@
"""
Analytics Module - Reporting and analytics.
This module provides:
This is a self-contained module providing:
- Dashboard analytics
- Custom reports
- Data exports
- Performance metrics
Routes:
- Vendor: /api/v1/vendor/analytics/*
- (Admin uses dashboard for analytics)
Menu Items:
- Admin: (uses dashboard)
- Vendor: analytics
Module Structure:
- models/ - Database models (none - uses data from other modules)
- services/ - Business logic (StatsService, UsageService)
- schemas/ - Pydantic DTOs
- routes/ - API routes
- exceptions.py - Module-specific exceptions
"""
from app.modules.analytics.definition import analytics_module
# Use lazy imports to avoid circular import issues
__all__ = ["analytics_module"]
def __getattr__(name: str):
"""Lazy import module components to avoid circular imports."""
if name == "analytics_module":
from app.modules.analytics.definition import analytics_module
return analytics_module
elif name == "get_analytics_module_with_routers":
from app.modules.analytics.definition import get_analytics_module_with_routers
return get_analytics_module_with_routers
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
__all__ = ["analytics_module", "get_analytics_module_with_routers"]

View File

@@ -3,18 +3,25 @@
Analytics module definition.
Defines the analytics module including its features, menu items,
and route configurations.
route configurations, and self-contained module settings.
"""
from app.modules.base import ModuleDefinition
from models.database.admin_menu_config import FrontendType
def _get_vendor_router():
"""Lazy import of vendor router to avoid circular imports."""
from app.modules.analytics.routes.vendor import vendor_router
def _get_vendor_api_router():
"""Lazy import of vendor API router to avoid circular imports."""
from app.modules.analytics.routes.api.vendor import router
return vendor_router
return router
def _get_vendor_page_router():
"""Lazy import of vendor page router to avoid circular imports."""
from app.modules.analytics.routes.pages.vendor import router
return router
# Analytics module definition
@@ -22,11 +29,13 @@ analytics_module = ModuleDefinition(
code="analytics",
name="Analytics & Reporting",
description="Dashboard analytics, custom reports, and data exports.",
version="1.0.0",
features=[
"basic_reports", # Basic reporting
"analytics_dashboard", # Analytics dashboard
"custom_reports", # Custom report builder
"export_reports", # Export to CSV/Excel
"usage_metrics", # Usage and performance metrics
],
menu_items={
FrontendType.ADMIN: [
@@ -37,6 +46,18 @@ analytics_module = ModuleDefinition(
],
},
is_core=False,
# =========================================================================
# Self-Contained Module Configuration
# =========================================================================
is_self_contained=True,
services_path="app.modules.analytics.services",
models_path="app.modules.analytics.models",
schemas_path="app.modules.analytics.schemas",
exceptions_path="app.modules.analytics.exceptions",
# Module templates (namespaced as analytics/admin/*.html and analytics/vendor/*.html)
templates_path="templates",
# Module-specific translations (accessible via analytics.* keys)
locales_path="locales",
)
@@ -46,8 +67,13 @@ def get_analytics_module_with_routers() -> ModuleDefinition:
This function attaches the routers lazily to avoid circular imports
during module initialization.
Routers:
- vendor_api_router: API endpoints for vendor analytics
- vendor_page_router: Page routes for vendor analytics dashboard
"""
analytics_module.vendor_router = _get_vendor_router()
analytics_module.vendor_api_router = _get_vendor_api_router()
analytics_module.vendor_page_router = _get_vendor_page_router()
return analytics_module

View File

@@ -0,0 +1,39 @@
# app/modules/analytics/exceptions.py
"""
Analytics module exceptions.
Module-specific exceptions for analytics functionality.
"""
from app.exceptions.base import (
BusinessLogicException,
ValidationException,
)
class ReportGenerationException(BusinessLogicException):
"""Raised when report generation fails."""
def __init__(self, report_type: str, reason: str):
super().__init__(
message=f"Failed to generate {report_type} report: {reason}",
error_code="REPORT_GENERATION_FAILED",
)
class InvalidDateRangeException(ValidationException):
"""Raised when an invalid date range is provided."""
def __init__(self, start_date: str, end_date: str):
super().__init__(
message=f"Invalid date range: {start_date} to {end_date}",
field="date_range",
value=f"{start_date} - {end_date}",
)
self.error_code = "INVALID_DATE_RANGE"
__all__ = [
"ReportGenerationException",
"InvalidDateRangeException",
]

View File

@@ -0,0 +1,17 @@
{
"analytics": {
"page_title": "Analysen",
"dashboard_title": "Analyse-Dashboard",
"dashboard_subtitle": "Sehen Sie Ihre Shop-Leistungskennzahlen und Einblicke",
"period_7d": "Letzte 7 Tage",
"period_30d": "Letzte 30 Tage",
"period_90d": "Letzte 90 Tage",
"period_1y": "Letztes Jahr",
"imports_count": "Importe",
"products_added": "Hinzugefügte Produkte",
"inventory_locations": "Lagerstandorte",
"data_since": "Daten seit",
"loading": "Analysen werden geladen...",
"error_loading": "Analysedaten konnten nicht geladen werden"
}
}

View File

@@ -0,0 +1,17 @@
{
"analytics": {
"page_title": "Analytics",
"dashboard_title": "Analytics Dashboard",
"dashboard_subtitle": "View your store performance metrics and insights",
"period_7d": "Last 7 days",
"period_30d": "Last 30 days",
"period_90d": "Last 90 days",
"period_1y": "Last year",
"imports_count": "Imports",
"products_added": "Products Added",
"inventory_locations": "Inventory Locations",
"data_since": "Data since",
"loading": "Loading analytics...",
"error_loading": "Failed to load analytics data"
}
}

View File

@@ -0,0 +1,17 @@
{
"analytics": {
"page_title": "Analytique",
"dashboard_title": "Tableau de bord analytique",
"dashboard_subtitle": "Consultez les indicateurs de performance de votre boutique",
"period_7d": "7 derniers jours",
"period_30d": "30 derniers jours",
"period_90d": "90 derniers jours",
"period_1y": "Dernière année",
"imports_count": "Importations",
"products_added": "Produits ajoutés",
"inventory_locations": "Emplacements d'inventaire",
"data_since": "Données depuis",
"loading": "Chargement des analyses...",
"error_loading": "Impossible de charger les données analytiques"
}
}

View File

@@ -0,0 +1,17 @@
{
"analytics": {
"page_title": "Analysen",
"dashboard_title": "Analyse-Dashboard",
"dashboard_subtitle": "Kuckt Är Buttek Leeschtungsmetriken an Abléck",
"period_7d": "Lescht 7 Deeg",
"period_30d": "Lescht 30 Deeg",
"period_90d": "Lescht 90 Deeg",
"period_1y": "Lescht Joer",
"imports_count": "Importer",
"products_added": "Produkter bäigesat",
"inventory_locations": "Lagerplazen",
"data_since": "Donnéeë vun",
"loading": "Analysen ginn gelueden...",
"error_loading": "Analysedonnéeën konnten net geluede ginn"
}
}

View File

@@ -0,0 +1,9 @@
# app/modules/analytics/models/__init__.py
"""
Analytics module database models.
Analytics primarily uses data from other modules for reporting.
No dedicated analytics models are defined.
"""
__all__ = []

View File

@@ -6,8 +6,9 @@ This module provides functions to register analytics routes
with module-based access control.
NOTE: Routers are NOT auto-imported to avoid circular dependencies.
Import directly from vendor.py as needed:
from app.modules.analytics.routes.vendor import vendor_router
Import directly from api/ or pages/ as needed:
from app.modules.analytics.routes.api import vendor_router as vendor_api_router
from app.modules.analytics.routes.pages import vendor_router as vendor_page_router
Note: Analytics module has no admin routes - admin uses dashboard.
"""
@@ -15,12 +16,15 @@ Note: Analytics module has no admin routes - admin uses dashboard.
# Routers are imported on-demand to avoid circular dependencies
# Do NOT add auto-imports here
__all__ = ["vendor_router"]
__all__ = ["vendor_api_router", "vendor_page_router"]
def __getattr__(name: str):
"""Lazy import routers to avoid circular dependencies."""
if name == "vendor_router":
from app.modules.analytics.routes.vendor import vendor_router
if name == "vendor_api_router":
from app.modules.analytics.routes.api import vendor_router
return vendor_router
elif name == "vendor_page_router":
from app.modules.analytics.routes.pages import vendor_router
return vendor_router
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -0,0 +1,11 @@
# app/modules/analytics/routes/api/__init__.py
"""
Analytics module API routes.
Provides REST API endpoints for analytics and reporting:
- Vendor API: Vendor-scoped analytics data
"""
from app.modules.analytics.routes.api.vendor import router as vendor_router
__all__ = ["vendor_router"]

View File

@@ -0,0 +1,56 @@
# app/modules/analytics/routes/api/vendor.py
"""
Vendor Analytics API
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The get_current_vendor_api dependency guarantees token_vendor_id is present.
Feature Requirements:
- basic_reports: Basic analytics (Essential tier)
- analytics_dashboard: Advanced analytics (Business tier)
"""
import logging
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api, get_db, require_module_access
from app.core.feature_gate import RequireFeature
from app.modules.analytics.services import stats_service
from app.modules.analytics.schemas import (
VendorAnalyticsCatalog,
VendorAnalyticsImports,
VendorAnalyticsInventory,
VendorAnalyticsResponse,
)
from models.database.feature import FeatureCode
from models.database.user import User
router = APIRouter(
dependencies=[Depends(require_module_access("analytics"))],
)
logger = logging.getLogger(__name__)
@router.get("", response_model=VendorAnalyticsResponse)
def get_vendor_analytics(
period: str = Query("30d", description="Time period: 7d, 30d, 90d, 1y"),
current_user: User = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
_: None = Depends(RequireFeature(FeatureCode.BASIC_REPORTS, FeatureCode.ANALYTICS_DASHBOARD)),
):
"""Get vendor analytics data for specified time period."""
data = stats_service.get_vendor_analytics(db, current_user.token_vendor_id, period)
return VendorAnalyticsResponse(
period=data["period"],
start_date=data["start_date"],
imports=VendorAnalyticsImports(count=data["imports"]["count"]),
catalog=VendorAnalyticsCatalog(
products_added=data["catalog"]["products_added"]
),
inventory=VendorAnalyticsInventory(
total_locations=data["inventory"]["total_locations"]
),
)

View File

@@ -0,0 +1,13 @@
# app/modules/analytics/routes/pages/__init__.py
"""
Analytics module page routes.
Provides HTML page endpoints for analytics views:
- Vendor pages: Analytics dashboard for vendors
"""
from app.modules.analytics.routes.pages.vendor import router as vendor_router
# Note: Analytics has no admin pages - admin uses the main dashboard
__all__ = ["vendor_router"]

View File

@@ -0,0 +1,90 @@
# app/modules/analytics/routes/pages/vendor.py
"""
Analytics Vendor Page Routes (HTML rendering).
Vendor pages for analytics dashboard.
"""
import logging
from fastapi import APIRouter, Depends, Path, Request
from fastapi.responses import HTMLResponse
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_from_cookie_or_header, get_db
from app.services.platform_settings_service import platform_settings_service
from app.templates_config import templates
from models.database.user import User
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================================
# HELPER: Build Vendor Dashboard Context
# ============================================================================
def get_vendor_context(
request: Request,
db: Session,
current_user: User,
vendor_code: str,
**extra_context,
) -> dict:
"""
Build template context for vendor dashboard pages.
Resolves locale/currency using the platform settings service with
vendor override support.
"""
# Load vendor from database
vendor = db.query(Vendor).filter(Vendor.subdomain == vendor_code).first()
# Get platform defaults
platform_config = platform_settings_service.get_storefront_config(db)
# Resolve with vendor override
storefront_locale = platform_config["locale"]
storefront_currency = platform_config["currency"]
if vendor and vendor.storefront_locale:
storefront_locale = vendor.storefront_locale
context = {
"request": request,
"user": current_user,
"vendor": vendor,
"vendor_code": vendor_code,
"storefront_locale": storefront_locale,
"storefront_currency": storefront_currency,
**extra_context,
}
return context
# ============================================================================
# ANALYTICS PAGE
# ============================================================================
@router.get(
"/{vendor_code}/analytics", response_class=HTMLResponse, include_in_schema=False
)
async def vendor_analytics_page(
request: Request,
vendor_code: str = Path(..., description="Vendor code"),
current_user: User = Depends(get_current_vendor_from_cookie_or_header),
db: Session = Depends(get_db),
):
"""
Render analytics and reports page.
JavaScript loads analytics data via API.
"""
return templates.TemplateResponse(
"analytics/vendor/analytics.html",
get_vendor_context(request, db, current_user, vendor_code),
)

View File

@@ -1,25 +0,0 @@
# app/modules/analytics/routes/vendor.py
"""
Analytics module vendor routes.
This module wraps the existing vendor analytics routes and adds
module-based access control. Routes are re-exported from the
original location with the module access dependency.
"""
from fastapi import APIRouter, Depends
from app.api.deps import require_module_access
# Import original router (direct import to avoid circular dependency)
from app.api.v1.vendor.analytics import router as original_router
# Create module-aware router
vendor_router = APIRouter(
prefix="/analytics",
dependencies=[Depends(require_module_access("analytics"))],
)
# Re-export all routes from the original module with module access control
for route in original_router.routes:
vendor_router.routes.append(route)

View File

@@ -0,0 +1,58 @@
# app/modules/analytics/schemas/__init__.py
"""
Analytics module Pydantic schemas.
This is the canonical location for analytics schemas.
"""
from app.modules.analytics.schemas.stats import (
StatsResponse,
MarketplaceStatsResponse,
ImportStatsResponse,
UserStatsResponse,
VendorStatsResponse,
ProductStatsResponse,
PlatformStatsResponse,
OrderStatsBasicResponse,
AdminDashboardResponse,
VendorProductStats,
VendorOrderStats,
VendorCustomerStats,
VendorRevenueStats,
VendorInfo,
VendorDashboardStatsResponse,
VendorAnalyticsImports,
VendorAnalyticsCatalog,
VendorAnalyticsInventory,
VendorAnalyticsResponse,
ValidatorStats,
CodeQualityDashboardStatsResponse,
CustomerStatsResponse,
OrderStatsResponse,
)
__all__ = [
"StatsResponse",
"MarketplaceStatsResponse",
"ImportStatsResponse",
"UserStatsResponse",
"VendorStatsResponse",
"ProductStatsResponse",
"PlatformStatsResponse",
"OrderStatsBasicResponse",
"AdminDashboardResponse",
"VendorProductStats",
"VendorOrderStats",
"VendorCustomerStats",
"VendorRevenueStats",
"VendorInfo",
"VendorDashboardStatsResponse",
"VendorAnalyticsImports",
"VendorAnalyticsCatalog",
"VendorAnalyticsInventory",
"VendorAnalyticsResponse",
"ValidatorStats",
"CodeQualityDashboardStatsResponse",
"CustomerStatsResponse",
"OrderStatsResponse",
]

View File

@@ -0,0 +1,353 @@
# app/modules/analytics/schemas/stats.py
"""
Analytics module schemas for statistics and reporting.
This is the canonical location for stats schemas.
"""
from datetime import datetime
from decimal import Decimal
from typing import Any
from pydantic import BaseModel, Field
class StatsResponse(BaseModel):
"""Comprehensive platform statistics response schema."""
total_products: int
unique_brands: int
unique_categories: int
unique_marketplaces: int = 0
unique_vendors: int = 0
total_inventory_entries: int = 0
total_inventory_quantity: int = 0
class MarketplaceStatsResponse(BaseModel):
"""Statistics per marketplace response schema."""
marketplace: str
total_products: int
unique_vendors: int
unique_brands: int
# ============================================================================
# Import Statistics
# ============================================================================
class ImportStatsResponse(BaseModel):
"""Import job statistics response schema.
Used by: GET /api/v1/admin/marketplace-import-jobs/stats
"""
total: int = Field(..., description="Total number of import jobs")
pending: int = Field(..., description="Jobs waiting to start")
processing: int = Field(..., description="Jobs currently running")
completed: int = Field(..., description="Successfully completed jobs")
failed: int = Field(..., description="Failed jobs")
success_rate: float = Field(..., description="Percentage of successful imports")
# ============================================================================
# User Statistics
# ============================================================================
class UserStatsResponse(BaseModel):
"""User statistics response schema.
Used by: Platform statistics endpoints
"""
total_users: int = Field(..., description="Total number of users")
active_users: int = Field(..., description="Number of active users")
inactive_users: int = Field(..., description="Number of inactive users")
admin_users: int = Field(..., description="Number of admin users")
activation_rate: float = Field(..., description="Percentage of active users")
# ============================================================================
# Vendor Statistics (Admin)
# ============================================================================
class VendorStatsResponse(BaseModel):
"""Vendor statistics response schema for admin dashboard.
Used by: GET /api/v1/admin/vendors/stats
"""
total: int = Field(..., description="Total number of vendors")
verified: int = Field(..., description="Number of verified vendors")
pending: int = Field(..., description="Number of pending verification vendors")
inactive: int = Field(..., description="Number of inactive vendors")
# ============================================================================
# Product Statistics
# ============================================================================
class ProductStatsResponse(BaseModel):
"""Product statistics response schema.
Used by: Platform statistics endpoints
"""
total_products: int = Field(0, description="Total number of products")
active_products: int = Field(0, description="Number of active products")
out_of_stock: int = Field(0, description="Number of out-of-stock products")
# ============================================================================
# Platform Statistics (Combined)
# ============================================================================
class OrderStatsBasicResponse(BaseModel):
"""Basic order statistics (stub until Order model is fully implemented).
Used by: Platform statistics endpoints
"""
total_orders: int = Field(0, description="Total number of orders")
pending_orders: int = Field(0, description="Number of pending orders")
completed_orders: int = Field(0, description="Number of completed orders")
class PlatformStatsResponse(BaseModel):
"""Combined platform statistics response schema.
Used by: GET /api/v1/admin/dashboard/stats/platform
"""
users: UserStatsResponse
vendors: VendorStatsResponse
products: ProductStatsResponse
orders: OrderStatsBasicResponse
imports: ImportStatsResponse
# ============================================================================
# Admin Dashboard Response
# ============================================================================
class AdminDashboardResponse(BaseModel):
"""Admin dashboard response schema.
Used by: GET /api/v1/admin/dashboard
"""
platform: dict[str, Any] = Field(..., description="Platform information")
users: UserStatsResponse
vendors: VendorStatsResponse
recent_vendors: list[dict[str, Any]] = Field(
default_factory=list, description="Recent vendors"
)
recent_imports: list[dict[str, Any]] = Field(
default_factory=list, description="Recent import jobs"
)
# ============================================================================
# Vendor Dashboard Statistics
# ============================================================================
class VendorProductStats(BaseModel):
"""Vendor product statistics."""
total: int = Field(0, description="Total products in catalog")
active: int = Field(0, description="Active products")
class VendorOrderStats(BaseModel):
"""Vendor order statistics."""
total: int = Field(0, description="Total orders")
pending: int = Field(0, description="Pending orders")
completed: int = Field(0, description="Completed orders")
class VendorCustomerStats(BaseModel):
"""Vendor customer statistics."""
total: int = Field(0, description="Total customers")
active: int = Field(0, description="Active customers")
class VendorRevenueStats(BaseModel):
"""Vendor revenue statistics."""
total: float = Field(0, description="Total revenue")
this_month: float = Field(0, description="Revenue this month")
class VendorInfo(BaseModel):
"""Vendor basic info for dashboard."""
id: int
name: str
vendor_code: str
class VendorDashboardStatsResponse(BaseModel):
"""Vendor dashboard statistics response schema.
Used by: GET /api/v1/vendor/dashboard/stats
"""
vendor: VendorInfo
products: VendorProductStats
orders: VendorOrderStats
customers: VendorCustomerStats
revenue: VendorRevenueStats
# ============================================================================
# Vendor Analytics
# ============================================================================
class VendorAnalyticsImports(BaseModel):
"""Vendor import analytics."""
count: int = Field(0, description="Number of imports in period")
class VendorAnalyticsCatalog(BaseModel):
"""Vendor catalog analytics."""
products_added: int = Field(0, description="Products added in period")
class VendorAnalyticsInventory(BaseModel):
"""Vendor inventory analytics."""
total_locations: int = Field(0, description="Total inventory locations")
class VendorAnalyticsResponse(BaseModel):
"""Vendor analytics response schema.
Used by: GET /api/v1/vendor/analytics
"""
period: str = Field(..., description="Analytics period (e.g., '30d')")
start_date: str = Field(..., description="Period start date")
imports: VendorAnalyticsImports
catalog: VendorAnalyticsCatalog
inventory: VendorAnalyticsInventory
# ============================================================================
# Code Quality Dashboard Statistics
# ============================================================================
class ValidatorStats(BaseModel):
"""Statistics for a single validator type."""
total_violations: int = 0
errors: int = 0
warnings: int = 0
last_scan: str | None = None
class CodeQualityDashboardStatsResponse(BaseModel):
"""Code quality dashboard statistics response schema.
Used by: GET /api/v1/admin/code-quality/stats
Supports multiple validator types: architecture, security, performance.
When validator_type is specified, returns stats for that type only.
When not specified, returns combined stats with per-validator breakdown.
"""
total_violations: int
errors: int
warnings: int
info: int = 0
open: int
assigned: int
resolved: int
ignored: int
technical_debt_score: int
trend: list[dict[str, Any]] = Field(default_factory=list)
by_severity: dict[str, Any] = Field(default_factory=dict)
by_rule: dict[str, Any] = Field(default_factory=dict)
by_module: dict[str, Any] = Field(default_factory=dict)
top_files: list[dict[str, Any]] = Field(default_factory=list)
last_scan: str | None = None
validator_type: str | None = None # Set when filtering by type
by_validator: dict[str, ValidatorStats] = Field(
default_factory=dict,
description="Per-validator breakdown (architecture, security, performance)",
)
# ============================================================================
# Customer Statistics (Coming Soon)
# ============================================================================
class CustomerStatsResponse(BaseModel):
"""Schema for customer statistics."""
customer_id: int
total_orders: int
total_spent: Decimal
average_order_value: Decimal
last_order_date: datetime | None
first_order_date: datetime | None
lifetime_value: Decimal
# ============================================================================
# Order Statistics (Coming Soon)
# ============================================================================
class OrderStatsResponse(BaseModel):
"""Schema for order statistics."""
total_orders: int
pending_orders: int
processing_orders: int
shipped_orders: int
delivered_orders: int
cancelled_orders: int
total_revenue: Decimal
average_order_value: Decimal
__all__ = [
"StatsResponse",
"MarketplaceStatsResponse",
"ImportStatsResponse",
"UserStatsResponse",
"VendorStatsResponse",
"ProductStatsResponse",
"PlatformStatsResponse",
"OrderStatsBasicResponse",
"AdminDashboardResponse",
"VendorProductStats",
"VendorOrderStats",
"VendorCustomerStats",
"VendorRevenueStats",
"VendorInfo",
"VendorDashboardStatsResponse",
"VendorAnalyticsImports",
"VendorAnalyticsCatalog",
"VendorAnalyticsInventory",
"VendorAnalyticsResponse",
"ValidatorStats",
"CodeQualityDashboardStatsResponse",
"CustomerStatsResponse",
"OrderStatsResponse",
]

View File

@@ -0,0 +1,34 @@
# app/modules/analytics/services/__init__.py
"""
Analytics module services.
This is the canonical location for analytics services.
"""
from app.modules.analytics.services.stats_service import (
stats_service,
StatsService,
)
from app.modules.analytics.services.usage_service import (
usage_service,
UsageService,
UsageData,
UsageMetricData,
TierInfoData,
UpgradeTierData,
LimitCheckData,
)
__all__ = [
# Stats service
"stats_service",
"StatsService",
# Usage service
"usage_service",
"UsageService",
"UsageData",
"UsageMetricData",
"TierInfoData",
"UpgradeTierData",
"LimitCheckData",
]

View File

@@ -0,0 +1,625 @@
# app/modules/analytics/services/stats_service.py
"""
Statistics service for generating system analytics and metrics.
This is the canonical location for the stats service.
This module provides:
- System-wide statistics (admin)
- Vendor-specific statistics
- Marketplace analytics
- Performance metrics
"""
import logging
from datetime import datetime, timedelta
from typing import Any
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.exceptions import AdminOperationException, VendorNotFoundException
from models.database.customer import Customer
from models.database.inventory import Inventory
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.marketplace_product import MarketplaceProduct
from models.database.order import Order
from models.database.product import Product
from models.database.user import User
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
class StatsService:
"""Service for statistics operations."""
# ========================================================================
# VENDOR-SPECIFIC STATISTICS
# ========================================================================
def get_vendor_stats(self, db: Session, vendor_id: int) -> dict[str, Any]:
"""
Get statistics for a specific vendor.
Args:
db: Database session
vendor_id: Vendor ID
Returns:
Dictionary with vendor statistics
Raises:
VendorNotFoundException: If vendor doesn't exist
AdminOperationException: If database query fails
"""
# Verify vendor exists
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
try:
# Catalog statistics
total_catalog_products = (
db.query(Product)
.filter(Product.vendor_id == vendor_id, Product.is_active == True)
.count()
)
featured_products = (
db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.is_featured == True,
Product.is_active == True,
)
.count()
)
# Staging statistics
# TODO: This is fragile - MarketplaceProduct uses vendor_name (string) not vendor_id
# Should add vendor_id foreign key to MarketplaceProduct for robust querying
# For now, matching by vendor name which could fail if names don't match exactly
staging_products = (
db.query(MarketplaceProduct)
.filter(MarketplaceProduct.vendor_name == vendor.name)
.count()
)
# Inventory statistics
total_inventory = (
db.query(func.sum(Inventory.quantity))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
reserved_inventory = (
db.query(func.sum(Inventory.reserved_quantity))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
inventory_locations = (
db.query(func.count(func.distinct(Inventory.location)))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
# Import statistics
total_imports = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.vendor_id == vendor_id)
.count()
)
successful_imports = (
db.query(MarketplaceImportJob)
.filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.status == "completed",
)
.count()
)
# Orders
total_orders = db.query(Order).filter(Order.vendor_id == vendor_id).count()
# Customers
total_customers = (
db.query(Customer).filter(Customer.vendor_id == vendor_id).count()
)
# Return flat structure compatible with VendorDashboardStatsResponse schema
# The endpoint will restructure this into nested format
return {
# Product stats
"total_products": total_catalog_products,
"active_products": total_catalog_products,
"featured_products": featured_products,
# Order stats (TODO: implement when Order model has status field)
"total_orders": total_orders,
"pending_orders": 0, # TODO: filter by status
"completed_orders": 0, # TODO: filter by status
# Customer stats
"total_customers": total_customers,
"active_customers": 0, # TODO: implement active customer logic
# Revenue stats (TODO: implement when Order model has amount field)
"total_revenue": 0,
"revenue_this_month": 0,
# Import stats
"total_imports": total_imports,
"successful_imports": successful_imports,
"import_success_rate": (
(successful_imports / total_imports * 100)
if total_imports > 0
else 0
),
# Staging stats
"imported_products": staging_products,
# Inventory stats
"total_inventory_quantity": int(total_inventory),
"reserved_inventory_quantity": int(reserved_inventory),
"available_inventory_quantity": int(
total_inventory - reserved_inventory
),
"inventory_locations_count": inventory_locations,
}
except VendorNotFoundException:
raise
except Exception as e:
logger.error(
f"Failed to retrieve vendor statistics for vendor {vendor_id}: {str(e)}"
)
raise AdminOperationException(
operation="get_vendor_stats",
reason=f"Database query failed: {str(e)}",
target_type="vendor",
target_id=str(vendor_id),
)
def get_vendor_analytics(
self, db: Session, vendor_id: int, period: str = "30d"
) -> dict[str, Any]:
"""
Get a specific vendor analytics for a time period.
Args:
db: Database session
vendor_id: Vendor ID
period: Time period (7d, 30d, 90d, 1y)
Returns:
Analytics data
Raises:
VendorNotFoundException: If vendor doesn't exist
AdminOperationException: If database query fails
"""
# Verify vendor exists
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
try:
# Parse period
days = self._parse_period(period)
start_date = datetime.utcnow() - timedelta(days=days)
# Import activity
recent_imports = (
db.query(MarketplaceImportJob)
.filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.created_at >= start_date,
)
.count()
)
# Products added to catalog
products_added = (
db.query(Product)
.filter(
Product.vendor_id == vendor_id, Product.created_at >= start_date
)
.count()
)
# Inventory changes
inventory_entries = (
db.query(Inventory).filter(Inventory.vendor_id == vendor_id).count()
)
return {
"period": period,
"start_date": start_date.isoformat(),
"imports": {
"count": recent_imports,
},
"catalog": {
"products_added": products_added,
},
"inventory": {
"total_locations": inventory_entries,
},
}
except VendorNotFoundException:
raise
except Exception as e:
logger.error(
f"Failed to retrieve vendor analytics for vendor {vendor_id}: {str(e)}"
)
raise AdminOperationException(
operation="get_vendor_analytics",
reason=f"Database query failed: {str(e)}",
target_type="vendor",
target_id=str(vendor_id),
)
def get_vendor_statistics(self, db: Session) -> dict:
"""Get vendor statistics for admin dashboard.
Returns dict compatible with VendorStatsResponse schema.
Keys: total, verified, pending, inactive (mapped from internal names)
"""
try:
total_vendors = db.query(Vendor).count()
active_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
verified_vendors = (
db.query(Vendor).filter(Vendor.is_verified == True).count()
)
inactive_vendors = total_vendors - active_vendors
# Pending = active but not yet verified
pending_vendors = (
db.query(Vendor)
.filter(Vendor.is_active == True, Vendor.is_verified == False)
.count()
)
return {
# Schema-compatible fields (VendorStatsResponse)
"total": total_vendors,
"verified": verified_vendors,
"pending": pending_vendors,
"inactive": inactive_vendors,
# Legacy fields for backward compatibility
"total_vendors": total_vendors,
"active_vendors": active_vendors,
"inactive_vendors": inactive_vendors,
"verified_vendors": verified_vendors,
"pending_vendors": pending_vendors,
"verification_rate": (
(verified_vendors / total_vendors * 100) if total_vendors > 0 else 0
),
}
except Exception as e:
logger.error(f"Failed to get vendor statistics: {str(e)}")
raise AdminOperationException(
operation="get_vendor_statistics", reason="Database query failed"
)
# ========================================================================
# SYSTEM-WIDE STATISTICS (ADMIN)
# ========================================================================
def get_comprehensive_stats(self, db: Session) -> dict[str, Any]:
"""
Get comprehensive system statistics for admin dashboard.
Args:
db: Database session
Returns:
Dictionary with comprehensive statistics
Raises:
AdminOperationException: If database query fails
"""
try:
# Vendors
total_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
# Products
total_catalog_products = db.query(Product).count()
unique_brands = self._get_unique_brands_count(db)
unique_categories = self._get_unique_categories_count(db)
# Marketplaces
unique_marketplaces = (
db.query(MarketplaceProduct.marketplace)
.filter(MarketplaceProduct.marketplace.isnot(None))
.distinct()
.count()
)
# Inventory
inventory_stats = self._get_inventory_statistics(db)
return {
"total_products": total_catalog_products,
"unique_brands": unique_brands,
"unique_categories": unique_categories,
"unique_marketplaces": unique_marketplaces,
"unique_vendors": total_vendors,
"total_inventory_entries": inventory_stats.get("total_entries", 0),
"total_inventory_quantity": inventory_stats.get("total_quantity", 0),
}
except Exception as e:
logger.error(f"Failed to retrieve comprehensive statistics: {str(e)}")
raise AdminOperationException(
operation="get_comprehensive_stats",
reason=f"Database query failed: {str(e)}",
)
def get_marketplace_breakdown_stats(self, db: Session) -> list[dict[str, Any]]:
"""
Get statistics broken down by marketplace.
Args:
db: Database session
Returns:
List of marketplace statistics
Raises:
AdminOperationException: If database query fails
"""
try:
marketplace_stats = (
db.query(
MarketplaceProduct.marketplace,
func.count(MarketplaceProduct.id).label("total_products"),
func.count(func.distinct(MarketplaceProduct.vendor_name)).label(
"unique_vendors"
),
func.count(func.distinct(MarketplaceProduct.brand)).label(
"unique_brands"
),
)
.filter(MarketplaceProduct.marketplace.isnot(None))
.group_by(MarketplaceProduct.marketplace)
.all()
)
return [
{
"marketplace": stat.marketplace,
"total_products": stat.total_products,
"unique_vendors": stat.unique_vendors,
"unique_brands": stat.unique_brands,
}
for stat in marketplace_stats
]
except Exception as e:
logger.error(
f"Failed to retrieve marketplace breakdown statistics: {str(e)}"
)
raise AdminOperationException(
operation="get_marketplace_breakdown_stats",
reason=f"Database query failed: {str(e)}",
)
def get_user_statistics(self, db: Session) -> dict[str, Any]:
"""
Get user statistics for admin dashboard.
Args:
db: Database session
Returns:
Dictionary with user statistics
Raises:
AdminOperationException: If database query fails
"""
try:
total_users = db.query(User).count()
active_users = db.query(User).filter(User.is_active == True).count()
inactive_users = total_users - active_users
admin_users = db.query(User).filter(User.role == "admin").count()
return {
"total_users": total_users,
"active_users": active_users,
"inactive_users": inactive_users,
"admin_users": admin_users,
"activation_rate": (
(active_users / total_users * 100) if total_users > 0 else 0
),
}
except Exception as e:
logger.error(f"Failed to get user statistics: {str(e)}")
raise AdminOperationException(
operation="get_user_statistics", reason="Database query failed"
)
def get_import_statistics(self, db: Session) -> dict[str, Any]:
"""
Get import job statistics.
Args:
db: Database session
Returns:
Dictionary with import statistics
Raises:
AdminOperationException: If database query fails
"""
try:
total = db.query(MarketplaceImportJob).count()
pending = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.status == "pending")
.count()
)
processing = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.status == "processing")
.count()
)
completed = (
db.query(MarketplaceImportJob)
.filter(
MarketplaceImportJob.status.in_(
["completed", "completed_with_errors"]
)
)
.count()
)
failed = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.status == "failed")
.count()
)
return {
# Frontend-expected fields
"total": total,
"pending": pending,
"processing": processing,
"completed": completed,
"failed": failed,
# Legacy fields for backward compatibility
"total_imports": total,
"completed_imports": completed,
"failed_imports": failed,
"success_rate": (completed / total * 100) if total > 0 else 0,
}
except Exception as e:
logger.error(f"Failed to get import statistics: {str(e)}")
return {
"total": 0,
"pending": 0,
"processing": 0,
"completed": 0,
"failed": 0,
"total_imports": 0,
"completed_imports": 0,
"failed_imports": 0,
"success_rate": 0,
}
def get_order_statistics(self, db: Session) -> dict[str, Any]:
"""
Get order statistics.
Args:
db: Database session
Returns:
Dictionary with order statistics
Note:
TODO: Implement when Order model is fully available
"""
return {"total_orders": 0, "pending_orders": 0, "completed_orders": 0}
def get_product_statistics(self, db: Session) -> dict[str, Any]:
"""
Get product statistics.
Args:
db: Database session
Returns:
Dictionary with product statistics
Note:
TODO: Implement when Product model is fully available
"""
return {"total_products": 0, "active_products": 0, "out_of_stock": 0}
# ========================================================================
# PRIVATE HELPER METHODS
# ========================================================================
def _parse_period(self, period: str) -> int:
"""
Parse period string to days.
Args:
period: Period string (7d, 30d, 90d, 1y)
Returns:
Number of days
"""
period_map = {
"7d": 7,
"30d": 30,
"90d": 90,
"1y": 365,
}
return period_map.get(period, 30)
def _get_unique_brands_count(self, db: Session) -> int:
"""
Get count of unique brands.
Args:
db: Database session
Returns:
Count of unique brands
"""
return (
db.query(MarketplaceProduct.brand)
.filter(
MarketplaceProduct.brand.isnot(None), MarketplaceProduct.brand != ""
)
.distinct()
.count()
)
def _get_unique_categories_count(self, db: Session) -> int:
"""
Get count of unique categories.
Args:
db: Database session
Returns:
Count of unique categories
"""
return (
db.query(MarketplaceProduct.google_product_category)
.filter(
MarketplaceProduct.google_product_category.isnot(None),
MarketplaceProduct.google_product_category != "",
)
.distinct()
.count()
)
def _get_inventory_statistics(self, db: Session) -> dict[str, int]:
"""
Get inventory-related statistics.
Args:
db: Database session
Returns:
Dictionary with inventory statistics
"""
total_entries = db.query(Inventory).count()
total_quantity = db.query(func.sum(Inventory.quantity)).scalar() or 0
total_reserved = db.query(func.sum(Inventory.reserved_quantity)).scalar() or 0
return {
"total_entries": total_entries,
"total_quantity": int(total_quantity),
"total_reserved": int(total_reserved),
"total_available": int(total_quantity - total_reserved),
}
# Create service instance
stats_service = StatsService()
__all__ = ["stats_service", "StatsService"]

View File

@@ -0,0 +1,447 @@
# app/modules/analytics/services/usage_service.py
"""
Usage and limits service.
This is the canonical location for the usage service.
Provides methods for:
- Getting current usage vs limits
- Calculating upgrade recommendations
- Checking limits before actions
"""
import logging
from dataclasses import dataclass
from sqlalchemy import func
from sqlalchemy.orm import Session
from models.database.product import Product
from models.database.subscription import SubscriptionTier, VendorSubscription
from models.database.vendor import VendorUser
logger = logging.getLogger(__name__)
@dataclass
class UsageMetricData:
"""Usage metric data."""
name: str
current: int
limit: int | None
percentage: float
is_unlimited: bool
is_at_limit: bool
is_approaching_limit: bool
@dataclass
class TierInfoData:
"""Tier information."""
code: str
name: str
price_monthly_cents: int
is_highest_tier: bool
@dataclass
class UpgradeTierData:
"""Upgrade tier information."""
code: str
name: str
price_monthly_cents: int
price_increase_cents: int
benefits: list[str]
@dataclass
class UsageData:
"""Full usage data."""
tier: TierInfoData
usage: list[UsageMetricData]
has_limits_approaching: bool
has_limits_reached: bool
upgrade_available: bool
upgrade_tier: UpgradeTierData | None
upgrade_reasons: list[str]
@dataclass
class LimitCheckData:
"""Limit check result."""
limit_type: str
can_proceed: bool
current: int
limit: int | None
percentage: float
message: str | None
upgrade_tier_code: str | None
upgrade_tier_name: str | None
class UsageService:
"""Service for usage and limits management."""
def get_vendor_usage(self, db: Session, vendor_id: int) -> UsageData:
"""
Get comprehensive usage data for a vendor.
Returns current usage, limits, and upgrade recommendations.
"""
from app.services.subscription_service import subscription_service
# Get subscription
subscription = subscription_service.get_or_create_subscription(db, vendor_id)
# Get current tier
tier = self._get_tier(db, subscription)
# Calculate usage metrics
usage_metrics = self._calculate_usage_metrics(db, vendor_id, subscription)
# Check for approaching/reached limits
has_limits_approaching = any(m.is_approaching_limit for m in usage_metrics)
has_limits_reached = any(m.is_at_limit for m in usage_metrics)
# Get upgrade info
next_tier = self._get_next_tier(db, tier)
is_highest_tier = next_tier is None
# Build upgrade info
upgrade_tier_info = None
upgrade_reasons = []
if next_tier:
upgrade_tier_info = self._build_upgrade_tier_info(tier, next_tier)
upgrade_reasons = self._build_upgrade_reasons(
usage_metrics, has_limits_reached, has_limits_approaching
)
return UsageData(
tier=TierInfoData(
code=tier.code if tier else subscription.tier,
name=tier.name if tier else subscription.tier.title(),
price_monthly_cents=tier.price_monthly_cents if tier else 0,
is_highest_tier=is_highest_tier,
),
usage=usage_metrics,
has_limits_approaching=has_limits_approaching,
has_limits_reached=has_limits_reached,
upgrade_available=not is_highest_tier,
upgrade_tier=upgrade_tier_info,
upgrade_reasons=upgrade_reasons,
)
def check_limit(
self, db: Session, vendor_id: int, limit_type: str
) -> LimitCheckData:
"""
Check a specific limit before performing an action.
Args:
db: Database session
vendor_id: Vendor ID
limit_type: One of "orders", "products", "team_members"
Returns:
LimitCheckData with proceed status and upgrade info
"""
from app.services.subscription_service import subscription_service
if limit_type == "orders":
can_proceed, message = subscription_service.can_create_order(db, vendor_id)
subscription = subscription_service.get_subscription(db, vendor_id)
current = subscription.orders_this_period if subscription else 0
limit = subscription.orders_limit if subscription else 0
elif limit_type == "products":
can_proceed, message = subscription_service.can_add_product(db, vendor_id)
subscription = subscription_service.get_subscription(db, vendor_id)
current = self._get_product_count(db, vendor_id)
limit = subscription.products_limit if subscription else 0
elif limit_type == "team_members":
can_proceed, message = subscription_service.can_add_team_member(db, vendor_id)
subscription = subscription_service.get_subscription(db, vendor_id)
current = self._get_team_member_count(db, vendor_id)
limit = subscription.team_members_limit if subscription else 0
else:
return LimitCheckData(
limit_type=limit_type,
can_proceed=True,
current=0,
limit=None,
percentage=0,
message=f"Unknown limit type: {limit_type}",
upgrade_tier_code=None,
upgrade_tier_name=None,
)
# Calculate percentage
is_unlimited = limit is None or limit < 0
percentage = 0 if is_unlimited else (current / limit * 100 if limit > 0 else 100)
# Get upgrade info if at limit
upgrade_tier_code = None
upgrade_tier_name = None
if not can_proceed:
subscription = subscription_service.get_subscription(db, vendor_id)
current_tier = subscription.tier_obj if subscription else None
if current_tier:
next_tier = self._get_next_tier(db, current_tier)
if next_tier:
upgrade_tier_code = next_tier.code
upgrade_tier_name = next_tier.name
return LimitCheckData(
limit_type=limit_type,
can_proceed=can_proceed,
current=current,
limit=None if is_unlimited else limit,
percentage=percentage,
message=message,
upgrade_tier_code=upgrade_tier_code,
upgrade_tier_name=upgrade_tier_name,
)
# =========================================================================
# Private Helper Methods
# =========================================================================
def _get_tier(
self, db: Session, subscription: VendorSubscription
) -> SubscriptionTier | None:
"""Get tier from subscription or query by code."""
tier = subscription.tier_obj
if not tier:
tier = (
db.query(SubscriptionTier)
.filter(SubscriptionTier.code == subscription.tier)
.first()
)
return tier
def _get_product_count(self, db: Session, vendor_id: int) -> int:
"""Get product count for vendor."""
return (
db.query(func.count(Product.id))
.filter(Product.vendor_id == vendor_id)
.scalar()
or 0
)
def _get_team_member_count(self, db: Session, vendor_id: int) -> int:
"""Get active team member count for vendor."""
return (
db.query(func.count(VendorUser.id))
.filter(VendorUser.vendor_id == vendor_id, VendorUser.is_active == True) # noqa: E712
.scalar()
or 0
)
def _calculate_usage_metrics(
self, db: Session, vendor_id: int, subscription: VendorSubscription
) -> list[UsageMetricData]:
"""Calculate all usage metrics for a vendor."""
metrics = []
# Orders this period
orders_current = subscription.orders_this_period or 0
orders_limit = subscription.orders_limit
orders_unlimited = orders_limit is None or orders_limit < 0
orders_percentage = (
0
if orders_unlimited
else (orders_current / orders_limit * 100 if orders_limit > 0 else 100)
)
metrics.append(
UsageMetricData(
name="orders",
current=orders_current,
limit=None if orders_unlimited else orders_limit,
percentage=orders_percentage,
is_unlimited=orders_unlimited,
is_at_limit=not orders_unlimited and orders_current >= orders_limit,
is_approaching_limit=not orders_unlimited and orders_percentage >= 80,
)
)
# Products
products_count = self._get_product_count(db, vendor_id)
products_limit = subscription.products_limit
products_unlimited = products_limit is None or products_limit < 0
products_percentage = (
0
if products_unlimited
else (products_count / products_limit * 100 if products_limit > 0 else 100)
)
metrics.append(
UsageMetricData(
name="products",
current=products_count,
limit=None if products_unlimited else products_limit,
percentage=products_percentage,
is_unlimited=products_unlimited,
is_at_limit=not products_unlimited and products_count >= products_limit,
is_approaching_limit=not products_unlimited and products_percentage >= 80,
)
)
# Team members
team_count = self._get_team_member_count(db, vendor_id)
team_limit = subscription.team_members_limit
team_unlimited = team_limit is None or team_limit < 0
team_percentage = (
0
if team_unlimited
else (team_count / team_limit * 100 if team_limit > 0 else 100)
)
metrics.append(
UsageMetricData(
name="team_members",
current=team_count,
limit=None if team_unlimited else team_limit,
percentage=team_percentage,
is_unlimited=team_unlimited,
is_at_limit=not team_unlimited and team_count >= team_limit,
is_approaching_limit=not team_unlimited and team_percentage >= 80,
)
)
return metrics
def _get_next_tier(
self, db: Session, current_tier: SubscriptionTier | None
) -> SubscriptionTier | None:
"""Get next tier for upgrade."""
current_tier_order = current_tier.display_order if current_tier else 0
return (
db.query(SubscriptionTier)
.filter(
SubscriptionTier.is_active == True, # noqa: E712
SubscriptionTier.display_order > current_tier_order,
)
.order_by(SubscriptionTier.display_order)
.first()
)
def _build_upgrade_tier_info(
self, current_tier: SubscriptionTier | None, next_tier: SubscriptionTier
) -> UpgradeTierData:
"""Build upgrade tier information with benefits."""
benefits = []
# Numeric limit benefits
if next_tier.orders_per_month and (
not current_tier
or (
current_tier.orders_per_month
and next_tier.orders_per_month > current_tier.orders_per_month
)
):
if next_tier.orders_per_month < 0:
benefits.append("Unlimited orders per month")
else:
benefits.append(f"{next_tier.orders_per_month:,} orders/month")
if next_tier.products_limit and (
not current_tier
or (
current_tier.products_limit
and next_tier.products_limit > current_tier.products_limit
)
):
if next_tier.products_limit < 0:
benefits.append("Unlimited products")
else:
benefits.append(f"{next_tier.products_limit:,} products")
if next_tier.team_members and (
not current_tier
or (
current_tier.team_members
and next_tier.team_members > current_tier.team_members
)
):
if next_tier.team_members < 0:
benefits.append("Unlimited team members")
else:
benefits.append(f"{next_tier.team_members} team members")
# Feature benefits
current_features = (
set(current_tier.features) if current_tier and current_tier.features else set()
)
next_features = set(next_tier.features) if next_tier.features else set()
new_features = next_features - current_features
feature_names = {
"analytics_dashboard": "Advanced Analytics",
"api_access": "API Access",
"automation_rules": "Automation Rules",
"team_roles": "Team Roles & Permissions",
"custom_domain": "Custom Domain",
"webhooks": "Webhooks",
"accounting_export": "Accounting Export",
}
for feature in list(new_features)[:3]:
if feature in feature_names:
benefits.append(feature_names[feature])
current_price = current_tier.price_monthly_cents if current_tier else 0
return UpgradeTierData(
code=next_tier.code,
name=next_tier.name,
price_monthly_cents=next_tier.price_monthly_cents,
price_increase_cents=next_tier.price_monthly_cents - current_price,
benefits=benefits,
)
def _build_upgrade_reasons(
self,
usage_metrics: list[UsageMetricData],
has_limits_reached: bool,
has_limits_approaching: bool,
) -> list[str]:
"""Build upgrade reasons based on usage."""
reasons = []
if has_limits_reached:
for m in usage_metrics:
if m.is_at_limit:
reasons.append(f"You've reached your {m.name.replace('_', ' ')} limit")
elif has_limits_approaching:
for m in usage_metrics:
if m.is_approaching_limit:
reasons.append(
f"You're approaching your {m.name.replace('_', ' ')} limit ({int(m.percentage)}%)"
)
return reasons
# Singleton instance
usage_service = UsageService()
__all__ = [
"usage_service",
"UsageService",
"UsageData",
"UsageMetricData",
"TierInfoData",
"UpgradeTierData",
"LimitCheckData",
]

View File

@@ -1,4 +1,4 @@
// static/vendor/js/analytics.js
// app/modules/analytics/static/vendor/js/analytics.js
/**
* Vendor analytics and reports page logic
* View business metrics and performance data

View File

@@ -1,4 +1,4 @@
{# app/templates/vendor/analytics.html #}
{# app/modules/analytics/templates/analytics/vendor/analytics.html #}
{% extends "vendor/base.html" %}
{% from 'shared/macros/headers.html' import page_header_flex, refresh_button %}
{% from 'shared/macros/alerts.html' import loading_state, error_state %}
@@ -227,5 +227,5 @@
{% endblock %}
{% block extra_scripts %}
<script src="{{ url_for('static', path='vendor/js/analytics.js') }}"></script>
<script src="{{ url_for('analytics_static', path='vendor/js/analytics.js') }}"></script>
{% endblock %}

View File

@@ -1 +0,0 @@
// Admin analytics