diff --git a/app/modules/analytics/__init__.py b/app/modules/analytics/__init__.py
index 93d52214..fc20eb1d 100644
--- a/app/modules/analytics/__init__.py
+++ b/app/modules/analytics/__init__.py
@@ -2,21 +2,34 @@
"""
Analytics Module - Reporting and analytics.
-This module provides:
+This is a self-contained module providing:
- Dashboard analytics
- Custom reports
- Data exports
- Performance metrics
-Routes:
-- Vendor: /api/v1/vendor/analytics/*
-- (Admin uses dashboard for analytics)
-
-Menu Items:
-- Admin: (uses dashboard)
-- Vendor: analytics
+Module Structure:
+- models/ - Database models (none - uses data from other modules)
+- services/ - Business logic (StatsService, UsageService)
+- schemas/ - Pydantic DTOs
+- routes/ - API routes
+- exceptions.py - Module-specific exceptions
"""
-from app.modules.analytics.definition import analytics_module
+# Use lazy imports to avoid circular import issues
-__all__ = ["analytics_module"]
+
+def __getattr__(name: str):
+ """Lazy import module components to avoid circular imports."""
+ if name == "analytics_module":
+ from app.modules.analytics.definition import analytics_module
+
+ return analytics_module
+ elif name == "get_analytics_module_with_routers":
+ from app.modules.analytics.definition import get_analytics_module_with_routers
+
+ return get_analytics_module_with_routers
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
+
+
+__all__ = ["analytics_module", "get_analytics_module_with_routers"]
diff --git a/app/modules/analytics/definition.py b/app/modules/analytics/definition.py
index 42e0c1ff..2dc71d68 100644
--- a/app/modules/analytics/definition.py
+++ b/app/modules/analytics/definition.py
@@ -3,18 +3,25 @@
Analytics module definition.
Defines the analytics module including its features, menu items,
-and route configurations.
+route configurations, and self-contained module settings.
"""
from app.modules.base import ModuleDefinition
from models.database.admin_menu_config import FrontendType
-def _get_vendor_router():
- """Lazy import of vendor router to avoid circular imports."""
- from app.modules.analytics.routes.vendor import vendor_router
+def _get_vendor_api_router():
+ """Lazy import of vendor API router to avoid circular imports."""
+ from app.modules.analytics.routes.api.vendor import router
- return vendor_router
+ return router
+
+
+def _get_vendor_page_router():
+ """Lazy import of vendor page router to avoid circular imports."""
+ from app.modules.analytics.routes.pages.vendor import router
+
+ return router
# Analytics module definition
@@ -22,11 +29,13 @@ analytics_module = ModuleDefinition(
code="analytics",
name="Analytics & Reporting",
description="Dashboard analytics, custom reports, and data exports.",
+ version="1.0.0",
features=[
"basic_reports", # Basic reporting
"analytics_dashboard", # Analytics dashboard
"custom_reports", # Custom report builder
"export_reports", # Export to CSV/Excel
+ "usage_metrics", # Usage and performance metrics
],
menu_items={
FrontendType.ADMIN: [
@@ -37,6 +46,18 @@ analytics_module = ModuleDefinition(
],
},
is_core=False,
+ # =========================================================================
+ # Self-Contained Module Configuration
+ # =========================================================================
+ is_self_contained=True,
+ services_path="app.modules.analytics.services",
+ models_path="app.modules.analytics.models",
+ schemas_path="app.modules.analytics.schemas",
+ exceptions_path="app.modules.analytics.exceptions",
+ # Module templates (namespaced as analytics/admin/*.html and analytics/vendor/*.html)
+ templates_path="templates",
+ # Module-specific translations (accessible via analytics.* keys)
+ locales_path="locales",
)
@@ -46,8 +67,13 @@ def get_analytics_module_with_routers() -> ModuleDefinition:
This function attaches the routers lazily to avoid circular imports
during module initialization.
+
+ Routers:
+ - vendor_api_router: API endpoints for vendor analytics
+ - vendor_page_router: Page routes for vendor analytics dashboard
"""
- analytics_module.vendor_router = _get_vendor_router()
+ analytics_module.vendor_api_router = _get_vendor_api_router()
+ analytics_module.vendor_page_router = _get_vendor_page_router()
return analytics_module
diff --git a/app/modules/analytics/exceptions.py b/app/modules/analytics/exceptions.py
new file mode 100644
index 00000000..47ca44ee
--- /dev/null
+++ b/app/modules/analytics/exceptions.py
@@ -0,0 +1,39 @@
+# app/modules/analytics/exceptions.py
+"""
+Analytics module exceptions.
+
+Module-specific exceptions for analytics functionality.
+"""
+
+from app.exceptions.base import (
+ BusinessLogicException,
+ ValidationException,
+)
+
+
+class ReportGenerationException(BusinessLogicException):
+ """Raised when report generation fails."""
+
+ def __init__(self, report_type: str, reason: str):
+ super().__init__(
+ message=f"Failed to generate {report_type} report: {reason}",
+ error_code="REPORT_GENERATION_FAILED",
+ )
+
+
+class InvalidDateRangeException(ValidationException):
+ """Raised when an invalid date range is provided."""
+
+ def __init__(self, start_date: str, end_date: str):
+ super().__init__(
+ message=f"Invalid date range: {start_date} to {end_date}",
+ field="date_range",
+ value=f"{start_date} - {end_date}",
+ )
+ self.error_code = "INVALID_DATE_RANGE"
+
+
+__all__ = [
+ "ReportGenerationException",
+ "InvalidDateRangeException",
+]
diff --git a/app/modules/analytics/locales/de.json b/app/modules/analytics/locales/de.json
new file mode 100644
index 00000000..2f38b9ec
--- /dev/null
+++ b/app/modules/analytics/locales/de.json
@@ -0,0 +1,17 @@
+{
+ "analytics": {
+ "page_title": "Analysen",
+ "dashboard_title": "Analyse-Dashboard",
+ "dashboard_subtitle": "Sehen Sie Ihre Shop-Leistungskennzahlen und Einblicke",
+ "period_7d": "Letzte 7 Tage",
+ "period_30d": "Letzte 30 Tage",
+ "period_90d": "Letzte 90 Tage",
+ "period_1y": "Letztes Jahr",
+ "imports_count": "Importe",
+ "products_added": "Hinzugefügte Produkte",
+ "inventory_locations": "Lagerstandorte",
+ "data_since": "Daten seit",
+ "loading": "Analysen werden geladen...",
+ "error_loading": "Analysedaten konnten nicht geladen werden"
+ }
+}
diff --git a/app/modules/analytics/locales/en.json b/app/modules/analytics/locales/en.json
new file mode 100644
index 00000000..5f5667c5
--- /dev/null
+++ b/app/modules/analytics/locales/en.json
@@ -0,0 +1,17 @@
+{
+ "analytics": {
+ "page_title": "Analytics",
+ "dashboard_title": "Analytics Dashboard",
+ "dashboard_subtitle": "View your store performance metrics and insights",
+ "period_7d": "Last 7 days",
+ "period_30d": "Last 30 days",
+ "period_90d": "Last 90 days",
+ "period_1y": "Last year",
+ "imports_count": "Imports",
+ "products_added": "Products Added",
+ "inventory_locations": "Inventory Locations",
+ "data_since": "Data since",
+ "loading": "Loading analytics...",
+ "error_loading": "Failed to load analytics data"
+ }
+}
diff --git a/app/modules/analytics/locales/fr.json b/app/modules/analytics/locales/fr.json
new file mode 100644
index 00000000..4530d8a9
--- /dev/null
+++ b/app/modules/analytics/locales/fr.json
@@ -0,0 +1,17 @@
+{
+ "analytics": {
+ "page_title": "Analytique",
+ "dashboard_title": "Tableau de bord analytique",
+ "dashboard_subtitle": "Consultez les indicateurs de performance de votre boutique",
+ "period_7d": "7 derniers jours",
+ "period_30d": "30 derniers jours",
+ "period_90d": "90 derniers jours",
+ "period_1y": "Dernière année",
+ "imports_count": "Importations",
+ "products_added": "Produits ajoutés",
+ "inventory_locations": "Emplacements d'inventaire",
+ "data_since": "Données depuis",
+ "loading": "Chargement des analyses...",
+ "error_loading": "Impossible de charger les données analytiques"
+ }
+}
diff --git a/app/modules/analytics/locales/lu.json b/app/modules/analytics/locales/lu.json
new file mode 100644
index 00000000..0041a40b
--- /dev/null
+++ b/app/modules/analytics/locales/lu.json
@@ -0,0 +1,17 @@
+{
+ "analytics": {
+ "page_title": "Analysen",
+ "dashboard_title": "Analyse-Dashboard",
+ "dashboard_subtitle": "Kuckt Är Buttek Leeschtungsmetriken an Abléck",
+ "period_7d": "Lescht 7 Deeg",
+ "period_30d": "Lescht 30 Deeg",
+ "period_90d": "Lescht 90 Deeg",
+ "period_1y": "Lescht Joer",
+ "imports_count": "Importer",
+ "products_added": "Produkter bäigesat",
+ "inventory_locations": "Lagerplazen",
+ "data_since": "Donnéeë vun",
+ "loading": "Analysen ginn gelueden...",
+ "error_loading": "Analysedonnéeën konnten net geluede ginn"
+ }
+}
diff --git a/app/modules/analytics/models/__init__.py b/app/modules/analytics/models/__init__.py
new file mode 100644
index 00000000..5d72401c
--- /dev/null
+++ b/app/modules/analytics/models/__init__.py
@@ -0,0 +1,9 @@
+# app/modules/analytics/models/__init__.py
+"""
+Analytics module database models.
+
+Analytics primarily uses data from other modules for reporting.
+No dedicated analytics models are defined.
+"""
+
+__all__ = []
diff --git a/app/modules/analytics/routes/__init__.py b/app/modules/analytics/routes/__init__.py
index e48ed072..b7605a8f 100644
--- a/app/modules/analytics/routes/__init__.py
+++ b/app/modules/analytics/routes/__init__.py
@@ -6,8 +6,9 @@ This module provides functions to register analytics routes
with module-based access control.
NOTE: Routers are NOT auto-imported to avoid circular dependencies.
-Import directly from vendor.py as needed:
- from app.modules.analytics.routes.vendor import vendor_router
+Import directly from api/ or pages/ as needed:
+ from app.modules.analytics.routes.api import vendor_router as vendor_api_router
+ from app.modules.analytics.routes.pages import vendor_router as vendor_page_router
Note: Analytics module has no admin routes - admin uses dashboard.
"""
@@ -15,12 +16,15 @@ Note: Analytics module has no admin routes - admin uses dashboard.
# Routers are imported on-demand to avoid circular dependencies
# Do NOT add auto-imports here
-__all__ = ["vendor_router"]
+__all__ = ["vendor_api_router", "vendor_page_router"]
def __getattr__(name: str):
"""Lazy import routers to avoid circular dependencies."""
- if name == "vendor_router":
- from app.modules.analytics.routes.vendor import vendor_router
+ if name == "vendor_api_router":
+ from app.modules.analytics.routes.api import vendor_router
+ return vendor_router
+ elif name == "vendor_page_router":
+ from app.modules.analytics.routes.pages import vendor_router
return vendor_router
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/app/modules/analytics/routes/api/__init__.py b/app/modules/analytics/routes/api/__init__.py
new file mode 100644
index 00000000..c427ad60
--- /dev/null
+++ b/app/modules/analytics/routes/api/__init__.py
@@ -0,0 +1,11 @@
+# app/modules/analytics/routes/api/__init__.py
+"""
+Analytics module API routes.
+
+Provides REST API endpoints for analytics and reporting:
+- Vendor API: Vendor-scoped analytics data
+"""
+
+from app.modules.analytics.routes.api.vendor import router as vendor_router
+
+__all__ = ["vendor_router"]
diff --git a/app/modules/analytics/routes/api/vendor.py b/app/modules/analytics/routes/api/vendor.py
new file mode 100644
index 00000000..9e936f5b
--- /dev/null
+++ b/app/modules/analytics/routes/api/vendor.py
@@ -0,0 +1,56 @@
+# app/modules/analytics/routes/api/vendor.py
+"""
+Vendor Analytics API
+
+Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
+The get_current_vendor_api dependency guarantees token_vendor_id is present.
+
+Feature Requirements:
+- basic_reports: Basic analytics (Essential tier)
+- analytics_dashboard: Advanced analytics (Business tier)
+"""
+
+import logging
+
+from fastapi import APIRouter, Depends, Query
+from sqlalchemy.orm import Session
+
+from app.api.deps import get_current_vendor_api, get_db, require_module_access
+from app.core.feature_gate import RequireFeature
+from app.modules.analytics.services import stats_service
+from app.modules.analytics.schemas import (
+ VendorAnalyticsCatalog,
+ VendorAnalyticsImports,
+ VendorAnalyticsInventory,
+ VendorAnalyticsResponse,
+)
+from models.database.feature import FeatureCode
+from models.database.user import User
+
+router = APIRouter(
+ dependencies=[Depends(require_module_access("analytics"))],
+)
+logger = logging.getLogger(__name__)
+
+
+@router.get("", response_model=VendorAnalyticsResponse)
+def get_vendor_analytics(
+ period: str = Query("30d", description="Time period: 7d, 30d, 90d, 1y"),
+ current_user: User = Depends(get_current_vendor_api),
+ db: Session = Depends(get_db),
+ _: None = Depends(RequireFeature(FeatureCode.BASIC_REPORTS, FeatureCode.ANALYTICS_DASHBOARD)),
+):
+ """Get vendor analytics data for specified time period."""
+ data = stats_service.get_vendor_analytics(db, current_user.token_vendor_id, period)
+
+ return VendorAnalyticsResponse(
+ period=data["period"],
+ start_date=data["start_date"],
+ imports=VendorAnalyticsImports(count=data["imports"]["count"]),
+ catalog=VendorAnalyticsCatalog(
+ products_added=data["catalog"]["products_added"]
+ ),
+ inventory=VendorAnalyticsInventory(
+ total_locations=data["inventory"]["total_locations"]
+ ),
+ )
diff --git a/app/modules/analytics/routes/pages/__init__.py b/app/modules/analytics/routes/pages/__init__.py
new file mode 100644
index 00000000..d577a8c1
--- /dev/null
+++ b/app/modules/analytics/routes/pages/__init__.py
@@ -0,0 +1,13 @@
+# app/modules/analytics/routes/pages/__init__.py
+"""
+Analytics module page routes.
+
+Provides HTML page endpoints for analytics views:
+- Vendor pages: Analytics dashboard for vendors
+"""
+
+from app.modules.analytics.routes.pages.vendor import router as vendor_router
+
+# Note: Analytics has no admin pages - admin uses the main dashboard
+
+__all__ = ["vendor_router"]
diff --git a/app/modules/analytics/routes/pages/vendor.py b/app/modules/analytics/routes/pages/vendor.py
new file mode 100644
index 00000000..f7038628
--- /dev/null
+++ b/app/modules/analytics/routes/pages/vendor.py
@@ -0,0 +1,90 @@
+# app/modules/analytics/routes/pages/vendor.py
+"""
+Analytics Vendor Page Routes (HTML rendering).
+
+Vendor pages for analytics dashboard.
+"""
+
+import logging
+
+from fastapi import APIRouter, Depends, Path, Request
+from fastapi.responses import HTMLResponse
+from sqlalchemy.orm import Session
+
+from app.api.deps import get_current_vendor_from_cookie_or_header, get_db
+from app.services.platform_settings_service import platform_settings_service
+from app.templates_config import templates
+from models.database.user import User
+from models.database.vendor import Vendor
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter()
+
+
+# ============================================================================
+# HELPER: Build Vendor Dashboard Context
+# ============================================================================
+
+
+def get_vendor_context(
+ request: Request,
+ db: Session,
+ current_user: User,
+ vendor_code: str,
+ **extra_context,
+) -> dict:
+ """
+ Build template context for vendor dashboard pages.
+
+ Resolves locale/currency using the platform settings service with
+ vendor override support.
+ """
+ # Load vendor from database
+ vendor = db.query(Vendor).filter(Vendor.subdomain == vendor_code).first()
+
+ # Get platform defaults
+ platform_config = platform_settings_service.get_storefront_config(db)
+
+ # Resolve with vendor override
+ storefront_locale = platform_config["locale"]
+ storefront_currency = platform_config["currency"]
+
+ if vendor and vendor.storefront_locale:
+ storefront_locale = vendor.storefront_locale
+
+ context = {
+ "request": request,
+ "user": current_user,
+ "vendor": vendor,
+ "vendor_code": vendor_code,
+ "storefront_locale": storefront_locale,
+ "storefront_currency": storefront_currency,
+ **extra_context,
+ }
+
+ return context
+
+
+# ============================================================================
+# ANALYTICS PAGE
+# ============================================================================
+
+
+@router.get(
+ "/{vendor_code}/analytics", response_class=HTMLResponse, include_in_schema=False
+)
+async def vendor_analytics_page(
+ request: Request,
+ vendor_code: str = Path(..., description="Vendor code"),
+ current_user: User = Depends(get_current_vendor_from_cookie_or_header),
+ db: Session = Depends(get_db),
+):
+ """
+ Render analytics and reports page.
+ JavaScript loads analytics data via API.
+ """
+ return templates.TemplateResponse(
+ "analytics/vendor/analytics.html",
+ get_vendor_context(request, db, current_user, vendor_code),
+ )
diff --git a/app/modules/analytics/routes/vendor.py b/app/modules/analytics/routes/vendor.py
deleted file mode 100644
index 68363a00..00000000
--- a/app/modules/analytics/routes/vendor.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# app/modules/analytics/routes/vendor.py
-"""
-Analytics module vendor routes.
-
-This module wraps the existing vendor analytics routes and adds
-module-based access control. Routes are re-exported from the
-original location with the module access dependency.
-"""
-
-from fastapi import APIRouter, Depends
-
-from app.api.deps import require_module_access
-
-# Import original router (direct import to avoid circular dependency)
-from app.api.v1.vendor.analytics import router as original_router
-
-# Create module-aware router
-vendor_router = APIRouter(
- prefix="/analytics",
- dependencies=[Depends(require_module_access("analytics"))],
-)
-
-# Re-export all routes from the original module with module access control
-for route in original_router.routes:
- vendor_router.routes.append(route)
diff --git a/app/modules/analytics/schemas/__init__.py b/app/modules/analytics/schemas/__init__.py
new file mode 100644
index 00000000..d177fc9e
--- /dev/null
+++ b/app/modules/analytics/schemas/__init__.py
@@ -0,0 +1,58 @@
+# app/modules/analytics/schemas/__init__.py
+"""
+Analytics module Pydantic schemas.
+
+This is the canonical location for analytics schemas.
+"""
+
+from app.modules.analytics.schemas.stats import (
+ StatsResponse,
+ MarketplaceStatsResponse,
+ ImportStatsResponse,
+ UserStatsResponse,
+ VendorStatsResponse,
+ ProductStatsResponse,
+ PlatformStatsResponse,
+ OrderStatsBasicResponse,
+ AdminDashboardResponse,
+ VendorProductStats,
+ VendorOrderStats,
+ VendorCustomerStats,
+ VendorRevenueStats,
+ VendorInfo,
+ VendorDashboardStatsResponse,
+ VendorAnalyticsImports,
+ VendorAnalyticsCatalog,
+ VendorAnalyticsInventory,
+ VendorAnalyticsResponse,
+ ValidatorStats,
+ CodeQualityDashboardStatsResponse,
+ CustomerStatsResponse,
+ OrderStatsResponse,
+)
+
+__all__ = [
+ "StatsResponse",
+ "MarketplaceStatsResponse",
+ "ImportStatsResponse",
+ "UserStatsResponse",
+ "VendorStatsResponse",
+ "ProductStatsResponse",
+ "PlatformStatsResponse",
+ "OrderStatsBasicResponse",
+ "AdminDashboardResponse",
+ "VendorProductStats",
+ "VendorOrderStats",
+ "VendorCustomerStats",
+ "VendorRevenueStats",
+ "VendorInfo",
+ "VendorDashboardStatsResponse",
+ "VendorAnalyticsImports",
+ "VendorAnalyticsCatalog",
+ "VendorAnalyticsInventory",
+ "VendorAnalyticsResponse",
+ "ValidatorStats",
+ "CodeQualityDashboardStatsResponse",
+ "CustomerStatsResponse",
+ "OrderStatsResponse",
+]
diff --git a/app/modules/analytics/schemas/stats.py b/app/modules/analytics/schemas/stats.py
new file mode 100644
index 00000000..b0b8f5f6
--- /dev/null
+++ b/app/modules/analytics/schemas/stats.py
@@ -0,0 +1,353 @@
+# app/modules/analytics/schemas/stats.py
+"""
+Analytics module schemas for statistics and reporting.
+
+This is the canonical location for stats schemas.
+"""
+
+from datetime import datetime
+from decimal import Decimal
+from typing import Any
+
+from pydantic import BaseModel, Field
+
+
+class StatsResponse(BaseModel):
+ """Comprehensive platform statistics response schema."""
+
+ total_products: int
+ unique_brands: int
+ unique_categories: int
+ unique_marketplaces: int = 0
+ unique_vendors: int = 0
+ total_inventory_entries: int = 0
+ total_inventory_quantity: int = 0
+
+
+class MarketplaceStatsResponse(BaseModel):
+ """Statistics per marketplace response schema."""
+
+ marketplace: str
+ total_products: int
+ unique_vendors: int
+ unique_brands: int
+
+
+# ============================================================================
+# Import Statistics
+# ============================================================================
+
+
+class ImportStatsResponse(BaseModel):
+ """Import job statistics response schema.
+
+ Used by: GET /api/v1/admin/marketplace-import-jobs/stats
+ """
+
+ total: int = Field(..., description="Total number of import jobs")
+ pending: int = Field(..., description="Jobs waiting to start")
+ processing: int = Field(..., description="Jobs currently running")
+ completed: int = Field(..., description="Successfully completed jobs")
+ failed: int = Field(..., description="Failed jobs")
+ success_rate: float = Field(..., description="Percentage of successful imports")
+
+
+# ============================================================================
+# User Statistics
+# ============================================================================
+
+
+class UserStatsResponse(BaseModel):
+ """User statistics response schema.
+
+ Used by: Platform statistics endpoints
+ """
+
+ total_users: int = Field(..., description="Total number of users")
+ active_users: int = Field(..., description="Number of active users")
+ inactive_users: int = Field(..., description="Number of inactive users")
+ admin_users: int = Field(..., description="Number of admin users")
+ activation_rate: float = Field(..., description="Percentage of active users")
+
+
+# ============================================================================
+# Vendor Statistics (Admin)
+# ============================================================================
+
+
+class VendorStatsResponse(BaseModel):
+ """Vendor statistics response schema for admin dashboard.
+
+ Used by: GET /api/v1/admin/vendors/stats
+ """
+
+ total: int = Field(..., description="Total number of vendors")
+ verified: int = Field(..., description="Number of verified vendors")
+ pending: int = Field(..., description="Number of pending verification vendors")
+ inactive: int = Field(..., description="Number of inactive vendors")
+
+
+# ============================================================================
+# Product Statistics
+# ============================================================================
+
+
+class ProductStatsResponse(BaseModel):
+ """Product statistics response schema.
+
+ Used by: Platform statistics endpoints
+ """
+
+ total_products: int = Field(0, description="Total number of products")
+ active_products: int = Field(0, description="Number of active products")
+ out_of_stock: int = Field(0, description="Number of out-of-stock products")
+
+
+# ============================================================================
+# Platform Statistics (Combined)
+# ============================================================================
+
+
+class OrderStatsBasicResponse(BaseModel):
+ """Basic order statistics (stub until Order model is fully implemented).
+
+ Used by: Platform statistics endpoints
+ """
+
+ total_orders: int = Field(0, description="Total number of orders")
+ pending_orders: int = Field(0, description="Number of pending orders")
+ completed_orders: int = Field(0, description="Number of completed orders")
+
+
+class PlatformStatsResponse(BaseModel):
+ """Combined platform statistics response schema.
+
+ Used by: GET /api/v1/admin/dashboard/stats/platform
+ """
+
+ users: UserStatsResponse
+ vendors: VendorStatsResponse
+ products: ProductStatsResponse
+ orders: OrderStatsBasicResponse
+ imports: ImportStatsResponse
+
+
+# ============================================================================
+# Admin Dashboard Response
+# ============================================================================
+
+
+class AdminDashboardResponse(BaseModel):
+ """Admin dashboard response schema.
+
+ Used by: GET /api/v1/admin/dashboard
+ """
+
+ platform: dict[str, Any] = Field(..., description="Platform information")
+ users: UserStatsResponse
+ vendors: VendorStatsResponse
+ recent_vendors: list[dict[str, Any]] = Field(
+ default_factory=list, description="Recent vendors"
+ )
+ recent_imports: list[dict[str, Any]] = Field(
+ default_factory=list, description="Recent import jobs"
+ )
+
+
+# ============================================================================
+# Vendor Dashboard Statistics
+# ============================================================================
+
+
+class VendorProductStats(BaseModel):
+ """Vendor product statistics."""
+
+ total: int = Field(0, description="Total products in catalog")
+ active: int = Field(0, description="Active products")
+
+
+class VendorOrderStats(BaseModel):
+ """Vendor order statistics."""
+
+ total: int = Field(0, description="Total orders")
+ pending: int = Field(0, description="Pending orders")
+ completed: int = Field(0, description="Completed orders")
+
+
+class VendorCustomerStats(BaseModel):
+ """Vendor customer statistics."""
+
+ total: int = Field(0, description="Total customers")
+ active: int = Field(0, description="Active customers")
+
+
+class VendorRevenueStats(BaseModel):
+ """Vendor revenue statistics."""
+
+ total: float = Field(0, description="Total revenue")
+ this_month: float = Field(0, description="Revenue this month")
+
+
+class VendorInfo(BaseModel):
+ """Vendor basic info for dashboard."""
+
+ id: int
+ name: str
+ vendor_code: str
+
+
+class VendorDashboardStatsResponse(BaseModel):
+ """Vendor dashboard statistics response schema.
+
+ Used by: GET /api/v1/vendor/dashboard/stats
+ """
+
+ vendor: VendorInfo
+ products: VendorProductStats
+ orders: VendorOrderStats
+ customers: VendorCustomerStats
+ revenue: VendorRevenueStats
+
+
+# ============================================================================
+# Vendor Analytics
+# ============================================================================
+
+
+class VendorAnalyticsImports(BaseModel):
+ """Vendor import analytics."""
+
+ count: int = Field(0, description="Number of imports in period")
+
+
+class VendorAnalyticsCatalog(BaseModel):
+ """Vendor catalog analytics."""
+
+ products_added: int = Field(0, description="Products added in period")
+
+
+class VendorAnalyticsInventory(BaseModel):
+ """Vendor inventory analytics."""
+
+ total_locations: int = Field(0, description="Total inventory locations")
+
+
+class VendorAnalyticsResponse(BaseModel):
+ """Vendor analytics response schema.
+
+ Used by: GET /api/v1/vendor/analytics
+ """
+
+ period: str = Field(..., description="Analytics period (e.g., '30d')")
+ start_date: str = Field(..., description="Period start date")
+ imports: VendorAnalyticsImports
+ catalog: VendorAnalyticsCatalog
+ inventory: VendorAnalyticsInventory
+
+
+# ============================================================================
+# Code Quality Dashboard Statistics
+# ============================================================================
+
+
+class ValidatorStats(BaseModel):
+ """Statistics for a single validator type."""
+
+ total_violations: int = 0
+ errors: int = 0
+ warnings: int = 0
+ last_scan: str | None = None
+
+
+class CodeQualityDashboardStatsResponse(BaseModel):
+ """Code quality dashboard statistics response schema.
+
+ Used by: GET /api/v1/admin/code-quality/stats
+
+ Supports multiple validator types: architecture, security, performance.
+ When validator_type is specified, returns stats for that type only.
+ When not specified, returns combined stats with per-validator breakdown.
+ """
+
+ total_violations: int
+ errors: int
+ warnings: int
+ info: int = 0
+ open: int
+ assigned: int
+ resolved: int
+ ignored: int
+ technical_debt_score: int
+ trend: list[dict[str, Any]] = Field(default_factory=list)
+ by_severity: dict[str, Any] = Field(default_factory=dict)
+ by_rule: dict[str, Any] = Field(default_factory=dict)
+ by_module: dict[str, Any] = Field(default_factory=dict)
+ top_files: list[dict[str, Any]] = Field(default_factory=list)
+ last_scan: str | None = None
+ validator_type: str | None = None # Set when filtering by type
+ by_validator: dict[str, ValidatorStats] = Field(
+ default_factory=dict,
+ description="Per-validator breakdown (architecture, security, performance)",
+ )
+
+
+# ============================================================================
+# Customer Statistics (Coming Soon)
+# ============================================================================
+
+
+class CustomerStatsResponse(BaseModel):
+ """Schema for customer statistics."""
+
+ customer_id: int
+ total_orders: int
+ total_spent: Decimal
+ average_order_value: Decimal
+ last_order_date: datetime | None
+ first_order_date: datetime | None
+ lifetime_value: Decimal
+
+
+# ============================================================================
+# Order Statistics (Coming Soon)
+# ============================================================================
+
+
+class OrderStatsResponse(BaseModel):
+ """Schema for order statistics."""
+
+ total_orders: int
+ pending_orders: int
+ processing_orders: int
+ shipped_orders: int
+ delivered_orders: int
+ cancelled_orders: int
+ total_revenue: Decimal
+ average_order_value: Decimal
+
+
+__all__ = [
+ "StatsResponse",
+ "MarketplaceStatsResponse",
+ "ImportStatsResponse",
+ "UserStatsResponse",
+ "VendorStatsResponse",
+ "ProductStatsResponse",
+ "PlatformStatsResponse",
+ "OrderStatsBasicResponse",
+ "AdminDashboardResponse",
+ "VendorProductStats",
+ "VendorOrderStats",
+ "VendorCustomerStats",
+ "VendorRevenueStats",
+ "VendorInfo",
+ "VendorDashboardStatsResponse",
+ "VendorAnalyticsImports",
+ "VendorAnalyticsCatalog",
+ "VendorAnalyticsInventory",
+ "VendorAnalyticsResponse",
+ "ValidatorStats",
+ "CodeQualityDashboardStatsResponse",
+ "CustomerStatsResponse",
+ "OrderStatsResponse",
+]
diff --git a/app/modules/analytics/services/__init__.py b/app/modules/analytics/services/__init__.py
new file mode 100644
index 00000000..22676b83
--- /dev/null
+++ b/app/modules/analytics/services/__init__.py
@@ -0,0 +1,34 @@
+# app/modules/analytics/services/__init__.py
+"""
+Analytics module services.
+
+This is the canonical location for analytics services.
+"""
+
+from app.modules.analytics.services.stats_service import (
+ stats_service,
+ StatsService,
+)
+from app.modules.analytics.services.usage_service import (
+ usage_service,
+ UsageService,
+ UsageData,
+ UsageMetricData,
+ TierInfoData,
+ UpgradeTierData,
+ LimitCheckData,
+)
+
+__all__ = [
+ # Stats service
+ "stats_service",
+ "StatsService",
+ # Usage service
+ "usage_service",
+ "UsageService",
+ "UsageData",
+ "UsageMetricData",
+ "TierInfoData",
+ "UpgradeTierData",
+ "LimitCheckData",
+]
diff --git a/app/modules/analytics/services/stats_service.py b/app/modules/analytics/services/stats_service.py
new file mode 100644
index 00000000..20287e43
--- /dev/null
+++ b/app/modules/analytics/services/stats_service.py
@@ -0,0 +1,625 @@
+# app/modules/analytics/services/stats_service.py
+"""
+Statistics service for generating system analytics and metrics.
+
+This is the canonical location for the stats service.
+
+This module provides:
+- System-wide statistics (admin)
+- Vendor-specific statistics
+- Marketplace analytics
+- Performance metrics
+"""
+
+import logging
+from datetime import datetime, timedelta
+from typing import Any
+
+from sqlalchemy import func
+from sqlalchemy.orm import Session
+
+from app.exceptions import AdminOperationException, VendorNotFoundException
+from models.database.customer import Customer
+from models.database.inventory import Inventory
+from models.database.marketplace_import_job import MarketplaceImportJob
+from models.database.marketplace_product import MarketplaceProduct
+from models.database.order import Order
+from models.database.product import Product
+from models.database.user import User
+from models.database.vendor import Vendor
+
+logger = logging.getLogger(__name__)
+
+
+class StatsService:
+ """Service for statistics operations."""
+
+ # ========================================================================
+ # VENDOR-SPECIFIC STATISTICS
+ # ========================================================================
+
+ def get_vendor_stats(self, db: Session, vendor_id: int) -> dict[str, Any]:
+ """
+ Get statistics for a specific vendor.
+
+ Args:
+ db: Database session
+ vendor_id: Vendor ID
+
+ Returns:
+ Dictionary with vendor statistics
+
+ Raises:
+ VendorNotFoundException: If vendor doesn't exist
+ AdminOperationException: If database query fails
+ """
+ # Verify vendor exists
+ vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
+ if not vendor:
+ raise VendorNotFoundException(str(vendor_id), identifier_type="id")
+
+ try:
+ # Catalog statistics
+ total_catalog_products = (
+ db.query(Product)
+ .filter(Product.vendor_id == vendor_id, Product.is_active == True)
+ .count()
+ )
+
+ featured_products = (
+ db.query(Product)
+ .filter(
+ Product.vendor_id == vendor_id,
+ Product.is_featured == True,
+ Product.is_active == True,
+ )
+ .count()
+ )
+
+ # Staging statistics
+ # TODO: This is fragile - MarketplaceProduct uses vendor_name (string) not vendor_id
+ # Should add vendor_id foreign key to MarketplaceProduct for robust querying
+ # For now, matching by vendor name which could fail if names don't match exactly
+ staging_products = (
+ db.query(MarketplaceProduct)
+ .filter(MarketplaceProduct.vendor_name == vendor.name)
+ .count()
+ )
+
+ # Inventory statistics
+ total_inventory = (
+ db.query(func.sum(Inventory.quantity))
+ .filter(Inventory.vendor_id == vendor_id)
+ .scalar()
+ or 0
+ )
+
+ reserved_inventory = (
+ db.query(func.sum(Inventory.reserved_quantity))
+ .filter(Inventory.vendor_id == vendor_id)
+ .scalar()
+ or 0
+ )
+
+ inventory_locations = (
+ db.query(func.count(func.distinct(Inventory.location)))
+ .filter(Inventory.vendor_id == vendor_id)
+ .scalar()
+ or 0
+ )
+
+ # Import statistics
+ total_imports = (
+ db.query(MarketplaceImportJob)
+ .filter(MarketplaceImportJob.vendor_id == vendor_id)
+ .count()
+ )
+
+ successful_imports = (
+ db.query(MarketplaceImportJob)
+ .filter(
+ MarketplaceImportJob.vendor_id == vendor_id,
+ MarketplaceImportJob.status == "completed",
+ )
+ .count()
+ )
+
+ # Orders
+ total_orders = db.query(Order).filter(Order.vendor_id == vendor_id).count()
+
+ # Customers
+ total_customers = (
+ db.query(Customer).filter(Customer.vendor_id == vendor_id).count()
+ )
+
+ # Return flat structure compatible with VendorDashboardStatsResponse schema
+ # The endpoint will restructure this into nested format
+ return {
+ # Product stats
+ "total_products": total_catalog_products,
+ "active_products": total_catalog_products,
+ "featured_products": featured_products,
+ # Order stats (TODO: implement when Order model has status field)
+ "total_orders": total_orders,
+ "pending_orders": 0, # TODO: filter by status
+ "completed_orders": 0, # TODO: filter by status
+ # Customer stats
+ "total_customers": total_customers,
+ "active_customers": 0, # TODO: implement active customer logic
+ # Revenue stats (TODO: implement when Order model has amount field)
+ "total_revenue": 0,
+ "revenue_this_month": 0,
+ # Import stats
+ "total_imports": total_imports,
+ "successful_imports": successful_imports,
+ "import_success_rate": (
+ (successful_imports / total_imports * 100)
+ if total_imports > 0
+ else 0
+ ),
+ # Staging stats
+ "imported_products": staging_products,
+ # Inventory stats
+ "total_inventory_quantity": int(total_inventory),
+ "reserved_inventory_quantity": int(reserved_inventory),
+ "available_inventory_quantity": int(
+ total_inventory - reserved_inventory
+ ),
+ "inventory_locations_count": inventory_locations,
+ }
+
+ except VendorNotFoundException:
+ raise
+ except Exception as e:
+ logger.error(
+ f"Failed to retrieve vendor statistics for vendor {vendor_id}: {str(e)}"
+ )
+ raise AdminOperationException(
+ operation="get_vendor_stats",
+ reason=f"Database query failed: {str(e)}",
+ target_type="vendor",
+ target_id=str(vendor_id),
+ )
+
+ def get_vendor_analytics(
+ self, db: Session, vendor_id: int, period: str = "30d"
+ ) -> dict[str, Any]:
+ """
+ Get a specific vendor analytics for a time period.
+
+ Args:
+ db: Database session
+ vendor_id: Vendor ID
+ period: Time period (7d, 30d, 90d, 1y)
+
+ Returns:
+ Analytics data
+
+ Raises:
+ VendorNotFoundException: If vendor doesn't exist
+ AdminOperationException: If database query fails
+ """
+ # Verify vendor exists
+ vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
+ if not vendor:
+ raise VendorNotFoundException(str(vendor_id), identifier_type="id")
+
+ try:
+ # Parse period
+ days = self._parse_period(period)
+ start_date = datetime.utcnow() - timedelta(days=days)
+
+ # Import activity
+ recent_imports = (
+ db.query(MarketplaceImportJob)
+ .filter(
+ MarketplaceImportJob.vendor_id == vendor_id,
+ MarketplaceImportJob.created_at >= start_date,
+ )
+ .count()
+ )
+
+ # Products added to catalog
+ products_added = (
+ db.query(Product)
+ .filter(
+ Product.vendor_id == vendor_id, Product.created_at >= start_date
+ )
+ .count()
+ )
+
+ # Inventory changes
+ inventory_entries = (
+ db.query(Inventory).filter(Inventory.vendor_id == vendor_id).count()
+ )
+
+ return {
+ "period": period,
+ "start_date": start_date.isoformat(),
+ "imports": {
+ "count": recent_imports,
+ },
+ "catalog": {
+ "products_added": products_added,
+ },
+ "inventory": {
+ "total_locations": inventory_entries,
+ },
+ }
+
+ except VendorNotFoundException:
+ raise
+ except Exception as e:
+ logger.error(
+ f"Failed to retrieve vendor analytics for vendor {vendor_id}: {str(e)}"
+ )
+ raise AdminOperationException(
+ operation="get_vendor_analytics",
+ reason=f"Database query failed: {str(e)}",
+ target_type="vendor",
+ target_id=str(vendor_id),
+ )
+
+ def get_vendor_statistics(self, db: Session) -> dict:
+ """Get vendor statistics for admin dashboard.
+
+ Returns dict compatible with VendorStatsResponse schema.
+ Keys: total, verified, pending, inactive (mapped from internal names)
+ """
+ try:
+ total_vendors = db.query(Vendor).count()
+ active_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
+ verified_vendors = (
+ db.query(Vendor).filter(Vendor.is_verified == True).count()
+ )
+ inactive_vendors = total_vendors - active_vendors
+ # Pending = active but not yet verified
+ pending_vendors = (
+ db.query(Vendor)
+ .filter(Vendor.is_active == True, Vendor.is_verified == False)
+ .count()
+ )
+
+ return {
+ # Schema-compatible fields (VendorStatsResponse)
+ "total": total_vendors,
+ "verified": verified_vendors,
+ "pending": pending_vendors,
+ "inactive": inactive_vendors,
+ # Legacy fields for backward compatibility
+ "total_vendors": total_vendors,
+ "active_vendors": active_vendors,
+ "inactive_vendors": inactive_vendors,
+ "verified_vendors": verified_vendors,
+ "pending_vendors": pending_vendors,
+ "verification_rate": (
+ (verified_vendors / total_vendors * 100) if total_vendors > 0 else 0
+ ),
+ }
+ except Exception as e:
+ logger.error(f"Failed to get vendor statistics: {str(e)}")
+ raise AdminOperationException(
+ operation="get_vendor_statistics", reason="Database query failed"
+ )
+
+ # ========================================================================
+ # SYSTEM-WIDE STATISTICS (ADMIN)
+ # ========================================================================
+
+ def get_comprehensive_stats(self, db: Session) -> dict[str, Any]:
+ """
+ Get comprehensive system statistics for admin dashboard.
+
+ Args:
+ db: Database session
+
+ Returns:
+ Dictionary with comprehensive statistics
+
+ Raises:
+ AdminOperationException: If database query fails
+ """
+ try:
+ # Vendors
+ total_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
+
+ # Products
+ total_catalog_products = db.query(Product).count()
+ unique_brands = self._get_unique_brands_count(db)
+ unique_categories = self._get_unique_categories_count(db)
+
+ # Marketplaces
+ unique_marketplaces = (
+ db.query(MarketplaceProduct.marketplace)
+ .filter(MarketplaceProduct.marketplace.isnot(None))
+ .distinct()
+ .count()
+ )
+
+ # Inventory
+ inventory_stats = self._get_inventory_statistics(db)
+
+ return {
+ "total_products": total_catalog_products,
+ "unique_brands": unique_brands,
+ "unique_categories": unique_categories,
+ "unique_marketplaces": unique_marketplaces,
+ "unique_vendors": total_vendors,
+ "total_inventory_entries": inventory_stats.get("total_entries", 0),
+ "total_inventory_quantity": inventory_stats.get("total_quantity", 0),
+ }
+
+ except Exception as e:
+ logger.error(f"Failed to retrieve comprehensive statistics: {str(e)}")
+ raise AdminOperationException(
+ operation="get_comprehensive_stats",
+ reason=f"Database query failed: {str(e)}",
+ )
+
+ def get_marketplace_breakdown_stats(self, db: Session) -> list[dict[str, Any]]:
+ """
+ Get statistics broken down by marketplace.
+
+ Args:
+ db: Database session
+
+ Returns:
+ List of marketplace statistics
+
+ Raises:
+ AdminOperationException: If database query fails
+ """
+ try:
+ marketplace_stats = (
+ db.query(
+ MarketplaceProduct.marketplace,
+ func.count(MarketplaceProduct.id).label("total_products"),
+ func.count(func.distinct(MarketplaceProduct.vendor_name)).label(
+ "unique_vendors"
+ ),
+ func.count(func.distinct(MarketplaceProduct.brand)).label(
+ "unique_brands"
+ ),
+ )
+ .filter(MarketplaceProduct.marketplace.isnot(None))
+ .group_by(MarketplaceProduct.marketplace)
+ .all()
+ )
+
+ return [
+ {
+ "marketplace": stat.marketplace,
+ "total_products": stat.total_products,
+ "unique_vendors": stat.unique_vendors,
+ "unique_brands": stat.unique_brands,
+ }
+ for stat in marketplace_stats
+ ]
+
+ except Exception as e:
+ logger.error(
+ f"Failed to retrieve marketplace breakdown statistics: {str(e)}"
+ )
+ raise AdminOperationException(
+ operation="get_marketplace_breakdown_stats",
+ reason=f"Database query failed: {str(e)}",
+ )
+
+ def get_user_statistics(self, db: Session) -> dict[str, Any]:
+ """
+ Get user statistics for admin dashboard.
+
+ Args:
+ db: Database session
+
+ Returns:
+ Dictionary with user statistics
+
+ Raises:
+ AdminOperationException: If database query fails
+ """
+ try:
+ total_users = db.query(User).count()
+ active_users = db.query(User).filter(User.is_active == True).count()
+ inactive_users = total_users - active_users
+ admin_users = db.query(User).filter(User.role == "admin").count()
+
+ return {
+ "total_users": total_users,
+ "active_users": active_users,
+ "inactive_users": inactive_users,
+ "admin_users": admin_users,
+ "activation_rate": (
+ (active_users / total_users * 100) if total_users > 0 else 0
+ ),
+ }
+ except Exception as e:
+ logger.error(f"Failed to get user statistics: {str(e)}")
+ raise AdminOperationException(
+ operation="get_user_statistics", reason="Database query failed"
+ )
+
+ def get_import_statistics(self, db: Session) -> dict[str, Any]:
+ """
+ Get import job statistics.
+
+ Args:
+ db: Database session
+
+ Returns:
+ Dictionary with import statistics
+
+ Raises:
+ AdminOperationException: If database query fails
+ """
+ try:
+ total = db.query(MarketplaceImportJob).count()
+ pending = (
+ db.query(MarketplaceImportJob)
+ .filter(MarketplaceImportJob.status == "pending")
+ .count()
+ )
+ processing = (
+ db.query(MarketplaceImportJob)
+ .filter(MarketplaceImportJob.status == "processing")
+ .count()
+ )
+ completed = (
+ db.query(MarketplaceImportJob)
+ .filter(
+ MarketplaceImportJob.status.in_(
+ ["completed", "completed_with_errors"]
+ )
+ )
+ .count()
+ )
+ failed = (
+ db.query(MarketplaceImportJob)
+ .filter(MarketplaceImportJob.status == "failed")
+ .count()
+ )
+
+ return {
+ # Frontend-expected fields
+ "total": total,
+ "pending": pending,
+ "processing": processing,
+ "completed": completed,
+ "failed": failed,
+ # Legacy fields for backward compatibility
+ "total_imports": total,
+ "completed_imports": completed,
+ "failed_imports": failed,
+ "success_rate": (completed / total * 100) if total > 0 else 0,
+ }
+ except Exception as e:
+ logger.error(f"Failed to get import statistics: {str(e)}")
+ return {
+ "total": 0,
+ "pending": 0,
+ "processing": 0,
+ "completed": 0,
+ "failed": 0,
+ "total_imports": 0,
+ "completed_imports": 0,
+ "failed_imports": 0,
+ "success_rate": 0,
+ }
+
+ def get_order_statistics(self, db: Session) -> dict[str, Any]:
+ """
+ Get order statistics.
+
+ Args:
+ db: Database session
+
+ Returns:
+ Dictionary with order statistics
+
+ Note:
+ TODO: Implement when Order model is fully available
+ """
+ return {"total_orders": 0, "pending_orders": 0, "completed_orders": 0}
+
+ def get_product_statistics(self, db: Session) -> dict[str, Any]:
+ """
+ Get product statistics.
+
+ Args:
+ db: Database session
+
+ Returns:
+ Dictionary with product statistics
+
+ Note:
+ TODO: Implement when Product model is fully available
+ """
+ return {"total_products": 0, "active_products": 0, "out_of_stock": 0}
+
+ # ========================================================================
+ # PRIVATE HELPER METHODS
+ # ========================================================================
+
+ def _parse_period(self, period: str) -> int:
+ """
+ Parse period string to days.
+
+ Args:
+ period: Period string (7d, 30d, 90d, 1y)
+
+ Returns:
+ Number of days
+ """
+ period_map = {
+ "7d": 7,
+ "30d": 30,
+ "90d": 90,
+ "1y": 365,
+ }
+ return period_map.get(period, 30)
+
+ def _get_unique_brands_count(self, db: Session) -> int:
+ """
+ Get count of unique brands.
+
+ Args:
+ db: Database session
+
+ Returns:
+ Count of unique brands
+ """
+ return (
+ db.query(MarketplaceProduct.brand)
+ .filter(
+ MarketplaceProduct.brand.isnot(None), MarketplaceProduct.brand != ""
+ )
+ .distinct()
+ .count()
+ )
+
+ def _get_unique_categories_count(self, db: Session) -> int:
+ """
+ Get count of unique categories.
+
+ Args:
+ db: Database session
+
+ Returns:
+ Count of unique categories
+ """
+ return (
+ db.query(MarketplaceProduct.google_product_category)
+ .filter(
+ MarketplaceProduct.google_product_category.isnot(None),
+ MarketplaceProduct.google_product_category != "",
+ )
+ .distinct()
+ .count()
+ )
+
+ def _get_inventory_statistics(self, db: Session) -> dict[str, int]:
+ """
+ Get inventory-related statistics.
+
+ Args:
+ db: Database session
+
+ Returns:
+ Dictionary with inventory statistics
+ """
+ total_entries = db.query(Inventory).count()
+ total_quantity = db.query(func.sum(Inventory.quantity)).scalar() or 0
+ total_reserved = db.query(func.sum(Inventory.reserved_quantity)).scalar() or 0
+
+ return {
+ "total_entries": total_entries,
+ "total_quantity": int(total_quantity),
+ "total_reserved": int(total_reserved),
+ "total_available": int(total_quantity - total_reserved),
+ }
+
+
+# Create service instance
+stats_service = StatsService()
+
+__all__ = ["stats_service", "StatsService"]
diff --git a/app/modules/analytics/services/usage_service.py b/app/modules/analytics/services/usage_service.py
new file mode 100644
index 00000000..97bb397c
--- /dev/null
+++ b/app/modules/analytics/services/usage_service.py
@@ -0,0 +1,447 @@
+# app/modules/analytics/services/usage_service.py
+"""
+Usage and limits service.
+
+This is the canonical location for the usage service.
+
+Provides methods for:
+- Getting current usage vs limits
+- Calculating upgrade recommendations
+- Checking limits before actions
+"""
+
+import logging
+from dataclasses import dataclass
+
+from sqlalchemy import func
+from sqlalchemy.orm import Session
+
+from models.database.product import Product
+from models.database.subscription import SubscriptionTier, VendorSubscription
+from models.database.vendor import VendorUser
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class UsageMetricData:
+ """Usage metric data."""
+
+ name: str
+ current: int
+ limit: int | None
+ percentage: float
+ is_unlimited: bool
+ is_at_limit: bool
+ is_approaching_limit: bool
+
+
+@dataclass
+class TierInfoData:
+ """Tier information."""
+
+ code: str
+ name: str
+ price_monthly_cents: int
+ is_highest_tier: bool
+
+
+@dataclass
+class UpgradeTierData:
+ """Upgrade tier information."""
+
+ code: str
+ name: str
+ price_monthly_cents: int
+ price_increase_cents: int
+ benefits: list[str]
+
+
+@dataclass
+class UsageData:
+ """Full usage data."""
+
+ tier: TierInfoData
+ usage: list[UsageMetricData]
+ has_limits_approaching: bool
+ has_limits_reached: bool
+ upgrade_available: bool
+ upgrade_tier: UpgradeTierData | None
+ upgrade_reasons: list[str]
+
+
+@dataclass
+class LimitCheckData:
+ """Limit check result."""
+
+ limit_type: str
+ can_proceed: bool
+ current: int
+ limit: int | None
+ percentage: float
+ message: str | None
+ upgrade_tier_code: str | None
+ upgrade_tier_name: str | None
+
+
+class UsageService:
+ """Service for usage and limits management."""
+
+ def get_vendor_usage(self, db: Session, vendor_id: int) -> UsageData:
+ """
+ Get comprehensive usage data for a vendor.
+
+ Returns current usage, limits, and upgrade recommendations.
+ """
+ from app.services.subscription_service import subscription_service
+
+ # Get subscription
+ subscription = subscription_service.get_or_create_subscription(db, vendor_id)
+
+ # Get current tier
+ tier = self._get_tier(db, subscription)
+
+ # Calculate usage metrics
+ usage_metrics = self._calculate_usage_metrics(db, vendor_id, subscription)
+
+ # Check for approaching/reached limits
+ has_limits_approaching = any(m.is_approaching_limit for m in usage_metrics)
+ has_limits_reached = any(m.is_at_limit for m in usage_metrics)
+
+ # Get upgrade info
+ next_tier = self._get_next_tier(db, tier)
+ is_highest_tier = next_tier is None
+
+ # Build upgrade info
+ upgrade_tier_info = None
+ upgrade_reasons = []
+
+ if next_tier:
+ upgrade_tier_info = self._build_upgrade_tier_info(tier, next_tier)
+ upgrade_reasons = self._build_upgrade_reasons(
+ usage_metrics, has_limits_reached, has_limits_approaching
+ )
+
+ return UsageData(
+ tier=TierInfoData(
+ code=tier.code if tier else subscription.tier,
+ name=tier.name if tier else subscription.tier.title(),
+ price_monthly_cents=tier.price_monthly_cents if tier else 0,
+ is_highest_tier=is_highest_tier,
+ ),
+ usage=usage_metrics,
+ has_limits_approaching=has_limits_approaching,
+ has_limits_reached=has_limits_reached,
+ upgrade_available=not is_highest_tier,
+ upgrade_tier=upgrade_tier_info,
+ upgrade_reasons=upgrade_reasons,
+ )
+
+ def check_limit(
+ self, db: Session, vendor_id: int, limit_type: str
+ ) -> LimitCheckData:
+ """
+ Check a specific limit before performing an action.
+
+ Args:
+ db: Database session
+ vendor_id: Vendor ID
+ limit_type: One of "orders", "products", "team_members"
+
+ Returns:
+ LimitCheckData with proceed status and upgrade info
+ """
+ from app.services.subscription_service import subscription_service
+
+ if limit_type == "orders":
+ can_proceed, message = subscription_service.can_create_order(db, vendor_id)
+ subscription = subscription_service.get_subscription(db, vendor_id)
+ current = subscription.orders_this_period if subscription else 0
+ limit = subscription.orders_limit if subscription else 0
+
+ elif limit_type == "products":
+ can_proceed, message = subscription_service.can_add_product(db, vendor_id)
+ subscription = subscription_service.get_subscription(db, vendor_id)
+ current = self._get_product_count(db, vendor_id)
+ limit = subscription.products_limit if subscription else 0
+
+ elif limit_type == "team_members":
+ can_proceed, message = subscription_service.can_add_team_member(db, vendor_id)
+ subscription = subscription_service.get_subscription(db, vendor_id)
+ current = self._get_team_member_count(db, vendor_id)
+ limit = subscription.team_members_limit if subscription else 0
+
+ else:
+ return LimitCheckData(
+ limit_type=limit_type,
+ can_proceed=True,
+ current=0,
+ limit=None,
+ percentage=0,
+ message=f"Unknown limit type: {limit_type}",
+ upgrade_tier_code=None,
+ upgrade_tier_name=None,
+ )
+
+ # Calculate percentage
+ is_unlimited = limit is None or limit < 0
+ percentage = 0 if is_unlimited else (current / limit * 100 if limit > 0 else 100)
+
+ # Get upgrade info if at limit
+ upgrade_tier_code = None
+ upgrade_tier_name = None
+
+ if not can_proceed:
+ subscription = subscription_service.get_subscription(db, vendor_id)
+ current_tier = subscription.tier_obj if subscription else None
+
+ if current_tier:
+ next_tier = self._get_next_tier(db, current_tier)
+ if next_tier:
+ upgrade_tier_code = next_tier.code
+ upgrade_tier_name = next_tier.name
+
+ return LimitCheckData(
+ limit_type=limit_type,
+ can_proceed=can_proceed,
+ current=current,
+ limit=None if is_unlimited else limit,
+ percentage=percentage,
+ message=message,
+ upgrade_tier_code=upgrade_tier_code,
+ upgrade_tier_name=upgrade_tier_name,
+ )
+
+ # =========================================================================
+ # Private Helper Methods
+ # =========================================================================
+
+ def _get_tier(
+ self, db: Session, subscription: VendorSubscription
+ ) -> SubscriptionTier | None:
+ """Get tier from subscription or query by code."""
+ tier = subscription.tier_obj
+ if not tier:
+ tier = (
+ db.query(SubscriptionTier)
+ .filter(SubscriptionTier.code == subscription.tier)
+ .first()
+ )
+ return tier
+
+ def _get_product_count(self, db: Session, vendor_id: int) -> int:
+ """Get product count for vendor."""
+ return (
+ db.query(func.count(Product.id))
+ .filter(Product.vendor_id == vendor_id)
+ .scalar()
+ or 0
+ )
+
+ def _get_team_member_count(self, db: Session, vendor_id: int) -> int:
+ """Get active team member count for vendor."""
+ return (
+ db.query(func.count(VendorUser.id))
+ .filter(VendorUser.vendor_id == vendor_id, VendorUser.is_active == True) # noqa: E712
+ .scalar()
+ or 0
+ )
+
+ def _calculate_usage_metrics(
+ self, db: Session, vendor_id: int, subscription: VendorSubscription
+ ) -> list[UsageMetricData]:
+ """Calculate all usage metrics for a vendor."""
+ metrics = []
+
+ # Orders this period
+ orders_current = subscription.orders_this_period or 0
+ orders_limit = subscription.orders_limit
+ orders_unlimited = orders_limit is None or orders_limit < 0
+ orders_percentage = (
+ 0
+ if orders_unlimited
+ else (orders_current / orders_limit * 100 if orders_limit > 0 else 100)
+ )
+
+ metrics.append(
+ UsageMetricData(
+ name="orders",
+ current=orders_current,
+ limit=None if orders_unlimited else orders_limit,
+ percentage=orders_percentage,
+ is_unlimited=orders_unlimited,
+ is_at_limit=not orders_unlimited and orders_current >= orders_limit,
+ is_approaching_limit=not orders_unlimited and orders_percentage >= 80,
+ )
+ )
+
+ # Products
+ products_count = self._get_product_count(db, vendor_id)
+ products_limit = subscription.products_limit
+ products_unlimited = products_limit is None or products_limit < 0
+ products_percentage = (
+ 0
+ if products_unlimited
+ else (products_count / products_limit * 100 if products_limit > 0 else 100)
+ )
+
+ metrics.append(
+ UsageMetricData(
+ name="products",
+ current=products_count,
+ limit=None if products_unlimited else products_limit,
+ percentage=products_percentage,
+ is_unlimited=products_unlimited,
+ is_at_limit=not products_unlimited and products_count >= products_limit,
+ is_approaching_limit=not products_unlimited and products_percentage >= 80,
+ )
+ )
+
+ # Team members
+ team_count = self._get_team_member_count(db, vendor_id)
+ team_limit = subscription.team_members_limit
+ team_unlimited = team_limit is None or team_limit < 0
+ team_percentage = (
+ 0
+ if team_unlimited
+ else (team_count / team_limit * 100 if team_limit > 0 else 100)
+ )
+
+ metrics.append(
+ UsageMetricData(
+ name="team_members",
+ current=team_count,
+ limit=None if team_unlimited else team_limit,
+ percentage=team_percentage,
+ is_unlimited=team_unlimited,
+ is_at_limit=not team_unlimited and team_count >= team_limit,
+ is_approaching_limit=not team_unlimited and team_percentage >= 80,
+ )
+ )
+
+ return metrics
+
+ def _get_next_tier(
+ self, db: Session, current_tier: SubscriptionTier | None
+ ) -> SubscriptionTier | None:
+ """Get next tier for upgrade."""
+ current_tier_order = current_tier.display_order if current_tier else 0
+
+ return (
+ db.query(SubscriptionTier)
+ .filter(
+ SubscriptionTier.is_active == True, # noqa: E712
+ SubscriptionTier.display_order > current_tier_order,
+ )
+ .order_by(SubscriptionTier.display_order)
+ .first()
+ )
+
+ def _build_upgrade_tier_info(
+ self, current_tier: SubscriptionTier | None, next_tier: SubscriptionTier
+ ) -> UpgradeTierData:
+ """Build upgrade tier information with benefits."""
+ benefits = []
+
+ # Numeric limit benefits
+ if next_tier.orders_per_month and (
+ not current_tier
+ or (
+ current_tier.orders_per_month
+ and next_tier.orders_per_month > current_tier.orders_per_month
+ )
+ ):
+ if next_tier.orders_per_month < 0:
+ benefits.append("Unlimited orders per month")
+ else:
+ benefits.append(f"{next_tier.orders_per_month:,} orders/month")
+
+ if next_tier.products_limit and (
+ not current_tier
+ or (
+ current_tier.products_limit
+ and next_tier.products_limit > current_tier.products_limit
+ )
+ ):
+ if next_tier.products_limit < 0:
+ benefits.append("Unlimited products")
+ else:
+ benefits.append(f"{next_tier.products_limit:,} products")
+
+ if next_tier.team_members and (
+ not current_tier
+ or (
+ current_tier.team_members
+ and next_tier.team_members > current_tier.team_members
+ )
+ ):
+ if next_tier.team_members < 0:
+ benefits.append("Unlimited team members")
+ else:
+ benefits.append(f"{next_tier.team_members} team members")
+
+ # Feature benefits
+ current_features = (
+ set(current_tier.features) if current_tier and current_tier.features else set()
+ )
+ next_features = set(next_tier.features) if next_tier.features else set()
+ new_features = next_features - current_features
+
+ feature_names = {
+ "analytics_dashboard": "Advanced Analytics",
+ "api_access": "API Access",
+ "automation_rules": "Automation Rules",
+ "team_roles": "Team Roles & Permissions",
+ "custom_domain": "Custom Domain",
+ "webhooks": "Webhooks",
+ "accounting_export": "Accounting Export",
+ }
+ for feature in list(new_features)[:3]:
+ if feature in feature_names:
+ benefits.append(feature_names[feature])
+
+ current_price = current_tier.price_monthly_cents if current_tier else 0
+
+ return UpgradeTierData(
+ code=next_tier.code,
+ name=next_tier.name,
+ price_monthly_cents=next_tier.price_monthly_cents,
+ price_increase_cents=next_tier.price_monthly_cents - current_price,
+ benefits=benefits,
+ )
+
+ def _build_upgrade_reasons(
+ self,
+ usage_metrics: list[UsageMetricData],
+ has_limits_reached: bool,
+ has_limits_approaching: bool,
+ ) -> list[str]:
+ """Build upgrade reasons based on usage."""
+ reasons = []
+
+ if has_limits_reached:
+ for m in usage_metrics:
+ if m.is_at_limit:
+ reasons.append(f"You've reached your {m.name.replace('_', ' ')} limit")
+ elif has_limits_approaching:
+ for m in usage_metrics:
+ if m.is_approaching_limit:
+ reasons.append(
+ f"You're approaching your {m.name.replace('_', ' ')} limit ({int(m.percentage)}%)"
+ )
+
+ return reasons
+
+
+# Singleton instance
+usage_service = UsageService()
+
+__all__ = [
+ "usage_service",
+ "UsageService",
+ "UsageData",
+ "UsageMetricData",
+ "TierInfoData",
+ "UpgradeTierData",
+ "LimitCheckData",
+]
diff --git a/static/vendor/js/analytics.js b/app/modules/analytics/static/vendor/js/analytics.js
similarity index 99%
rename from static/vendor/js/analytics.js
rename to app/modules/analytics/static/vendor/js/analytics.js
index a09d83ec..ad7ac13e 100644
--- a/static/vendor/js/analytics.js
+++ b/app/modules/analytics/static/vendor/js/analytics.js
@@ -1,4 +1,4 @@
-// static/vendor/js/analytics.js
+// app/modules/analytics/static/vendor/js/analytics.js
/**
* Vendor analytics and reports page logic
* View business metrics and performance data
diff --git a/app/templates/vendor/analytics.html b/app/modules/analytics/templates/analytics/vendor/analytics.html
similarity index 98%
rename from app/templates/vendor/analytics.html
rename to app/modules/analytics/templates/analytics/vendor/analytics.html
index 7f261389..9a5df9d2 100644
--- a/app/templates/vendor/analytics.html
+++ b/app/modules/analytics/templates/analytics/vendor/analytics.html
@@ -1,4 +1,4 @@
-{# app/templates/vendor/analytics.html #}
+{# app/modules/analytics/templates/analytics/vendor/analytics.html #}
{% extends "vendor/base.html" %}
{% from 'shared/macros/headers.html' import page_header_flex, refresh_button %}
{% from 'shared/macros/alerts.html' import loading_state, error_state %}
@@ -227,5 +227,5 @@
{% endblock %}
{% block extra_scripts %}
-
+
{% endblock %}
diff --git a/static/admin/js/analytics.js b/static/admin/js/analytics.js
deleted file mode 100644
index 78688cc3..00000000
--- a/static/admin/js/analytics.js
+++ /dev/null
@@ -1 +0,0 @@
-// Admin analytics