Files
orion/app/services/stats_service.py
Samir Boulahtit 238c1ec9b8 refactor: modernize code quality tooling with Ruff
- Replace black, isort, and flake8 with Ruff (all-in-one linter and formatter)
- Add comprehensive pyproject.toml configuration
- Simplify Makefile code quality targets
- Configure exclusions for venv/.venv in pyproject.toml
- Auto-fix 1,359 linting issues across codebase

Benefits:
- Much faster builds (Ruff is written in Rust)
- Single tool replaces multiple tools
- More comprehensive rule set (UP, B, C4, SIM, PIE, RET, Q)
- All configuration centralized in pyproject.toml
- Better import sorting and formatting consistency

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 19:37:38 +01:00

575 lines
18 KiB
Python

# app/services/stats_service.py
"""
Statistics service for generating system analytics and metrics.
This module provides:
- System-wide statistics (admin)
- Vendor-specific statistics
- Marketplace analytics
- Performance metrics
"""
import logging
from datetime import datetime, timedelta
from typing import Any
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.exceptions import AdminOperationException, VendorNotFoundException
from models.database.customer import Customer
from models.database.inventory import Inventory
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.marketplace_product import MarketplaceProduct
from models.database.order import Order
from models.database.product import Product
from models.database.user import User
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
class StatsService:
"""Service for statistics operations."""
# ========================================================================
# VENDOR-SPECIFIC STATISTICS
# ========================================================================
def get_vendor_stats(self, db: Session, vendor_id: int) -> dict[str, Any]:
"""
Get statistics for a specific vendor.
Args:
db: Database session
vendor_id: Vendor ID
Returns:
Dictionary with vendor statistics
Raises:
VendorNotFoundException: If vendor doesn't exist
AdminOperationException: If database query fails
"""
# Verify vendor exists
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
try:
# Catalog statistics
total_catalog_products = (
db.query(Product)
.filter(Product.vendor_id == vendor_id, Product.is_active == True)
.count()
)
featured_products = (
db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.is_featured == True,
Product.is_active == True,
)
.count()
)
# Staging statistics
# TODO: This is fragile - MarketplaceProduct uses vendor_name (string) not vendor_id
# Should add vendor_id foreign key to MarketplaceProduct for robust querying
# For now, matching by vendor name which could fail if names don't match exactly
staging_products = (
db.query(MarketplaceProduct)
.filter(MarketplaceProduct.vendor_name == vendor.name)
.count()
)
# Inventory statistics
total_inventory = (
db.query(func.sum(Inventory.quantity))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
reserved_inventory = (
db.query(func.sum(Inventory.reserved_quantity))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
inventory_locations = (
db.query(func.count(func.distinct(Inventory.location)))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
# Import statistics
total_imports = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.vendor_id == vendor_id)
.count()
)
successful_imports = (
db.query(MarketplaceImportJob)
.filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.status == "completed",
)
.count()
)
# Orders
total_orders = db.query(Order).filter(Order.vendor_id == vendor_id).count()
# Customers
total_customers = (
db.query(Customer).filter(Customer.vendor_id == vendor_id).count()
)
return {
"catalog": {
"total_products": total_catalog_products,
"featured_products": featured_products,
"active_products": total_catalog_products,
},
"staging": {
"imported_products": staging_products,
},
"inventory": {
"total_quantity": int(total_inventory),
"reserved_quantity": int(reserved_inventory),
"available_quantity": int(total_inventory - reserved_inventory),
"locations_count": inventory_locations,
},
"imports": {
"total_imports": total_imports,
"successful_imports": successful_imports,
"success_rate": (
(successful_imports / total_imports * 100)
if total_imports > 0
else 0
),
},
"orders": {
"total_orders": total_orders,
},
"customers": {
"total_customers": total_customers,
},
}
except VendorNotFoundException:
raise
except Exception as e:
logger.error(
f"Failed to retrieve vendor statistics for vendor {vendor_id}: {str(e)}"
)
raise AdminOperationException(
operation="get_vendor_stats",
reason=f"Database query failed: {str(e)}",
target_type="vendor",
target_id=str(vendor_id),
)
def get_vendor_analytics(
self, db: Session, vendor_id: int, period: str = "30d"
) -> dict[str, Any]:
"""
Get a specific vendor analytics for a time period.
Args:
db: Database session
vendor_id: Vendor ID
period: Time period (7d, 30d, 90d, 1y)
Returns:
Analytics data
Raises:
VendorNotFoundException: If vendor doesn't exist
AdminOperationException: If database query fails
"""
# Verify vendor exists
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
try:
# Parse period
days = self._parse_period(period)
start_date = datetime.utcnow() - timedelta(days=days)
# Import activity
recent_imports = (
db.query(MarketplaceImportJob)
.filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.created_at >= start_date,
)
.count()
)
# Products added to catalog
products_added = (
db.query(Product)
.filter(
Product.vendor_id == vendor_id, Product.created_at >= start_date
)
.count()
)
# Inventory changes
inventory_entries = (
db.query(Inventory).filter(Inventory.vendor_id == vendor_id).count()
)
return {
"period": period,
"start_date": start_date.isoformat(),
"imports": {
"count": recent_imports,
},
"catalog": {
"products_added": products_added,
},
"inventory": {
"total_locations": inventory_entries,
},
}
except VendorNotFoundException:
raise
except Exception as e:
logger.error(
f"Failed to retrieve vendor analytics for vendor {vendor_id}: {str(e)}"
)
raise AdminOperationException(
operation="get_vendor_analytics",
reason=f"Database query failed: {str(e)}",
target_type="vendor",
target_id=str(vendor_id),
)
def get_vendor_statistics(self, db: Session) -> dict:
"""Get vendor statistics for admin dashboard."""
try:
total_vendors = db.query(Vendor).count()
active_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
verified_vendors = (
db.query(Vendor).filter(Vendor.is_verified == True).count()
)
inactive_vendors = total_vendors - active_vendors
return {
"total_vendors": total_vendors,
"active_vendors": active_vendors,
"inactive_vendors": inactive_vendors,
"verified_vendors": verified_vendors,
"verification_rate": (
(verified_vendors / total_vendors * 100) if total_vendors > 0 else 0
),
}
except Exception as e:
logger.error(f"Failed to get vendor statistics: {str(e)}")
raise AdminOperationException(
operation="get_vendor_statistics", reason="Database query failed"
)
# ========================================================================
# SYSTEM-WIDE STATISTICS (ADMIN)
# ========================================================================
def get_comprehensive_stats(self, db: Session) -> dict[str, Any]:
"""
Get comprehensive system statistics for admin dashboard.
Args:
db: Database session
Returns:
Dictionary with comprehensive statistics
Raises:
AdminOperationException: If database query fails
"""
try:
# Vendors
total_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
# Products
total_catalog_products = db.query(Product).count()
unique_brands = self._get_unique_brands_count(db)
unique_categories = self._get_unique_categories_count(db)
# Marketplaces
unique_marketplaces = (
db.query(MarketplaceProduct.marketplace)
.filter(MarketplaceProduct.marketplace.isnot(None))
.distinct()
.count()
)
# Inventory
inventory_stats = self._get_inventory_statistics(db)
return {
"total_products": total_catalog_products,
"unique_brands": unique_brands,
"unique_categories": unique_categories,
"unique_marketplaces": unique_marketplaces,
"unique_vendors": total_vendors,
"total_inventory_entries": inventory_stats.get("total_entries", 0),
"total_inventory_quantity": inventory_stats.get("total_quantity", 0),
}
except Exception as e:
logger.error(f"Failed to retrieve comprehensive statistics: {str(e)}")
raise AdminOperationException(
operation="get_comprehensive_stats",
reason=f"Database query failed: {str(e)}",
)
def get_marketplace_breakdown_stats(self, db: Session) -> list[dict[str, Any]]:
"""
Get statistics broken down by marketplace.
Args:
db: Database session
Returns:
List of marketplace statistics
Raises:
AdminOperationException: If database query fails
"""
try:
marketplace_stats = (
db.query(
MarketplaceProduct.marketplace,
func.count(MarketplaceProduct.id).label("total_products"),
func.count(func.distinct(MarketplaceProduct.vendor_name)).label(
"unique_vendors"
),
func.count(func.distinct(MarketplaceProduct.brand)).label(
"unique_brands"
),
)
.filter(MarketplaceProduct.marketplace.isnot(None))
.group_by(MarketplaceProduct.marketplace)
.all()
)
return [
{
"marketplace": stat.marketplace,
"total_products": stat.total_products,
"unique_vendors": stat.unique_vendors,
"unique_brands": stat.unique_brands,
}
for stat in marketplace_stats
]
except Exception as e:
logger.error(
f"Failed to retrieve marketplace breakdown statistics: {str(e)}"
)
raise AdminOperationException(
operation="get_marketplace_breakdown_stats",
reason=f"Database query failed: {str(e)}",
)
def get_user_statistics(self, db: Session) -> dict[str, Any]:
"""
Get user statistics for admin dashboard.
Args:
db: Database session
Returns:
Dictionary with user statistics
Raises:
AdminOperationException: If database query fails
"""
try:
total_users = db.query(User).count()
active_users = db.query(User).filter(User.is_active == True).count()
inactive_users = total_users - active_users
admin_users = db.query(User).filter(User.role == "admin").count()
return {
"total_users": total_users,
"active_users": active_users,
"inactive_users": inactive_users,
"admin_users": admin_users,
"activation_rate": (
(active_users / total_users * 100) if total_users > 0 else 0
),
}
except Exception as e:
logger.error(f"Failed to get user statistics: {str(e)}")
raise AdminOperationException(
operation="get_user_statistics", reason="Database query failed"
)
def get_import_statistics(self, db: Session) -> dict[str, Any]:
"""
Get import job statistics.
Args:
db: Database session
Returns:
Dictionary with import statistics
Raises:
AdminOperationException: If database query fails
"""
try:
total = db.query(MarketplaceImportJob).count()
completed = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.status == "completed")
.count()
)
failed = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.status == "failed")
.count()
)
return {
"total_imports": total,
"completed_imports": completed,
"failed_imports": failed,
"success_rate": (completed / total * 100) if total > 0 else 0,
}
except Exception as e:
logger.error(f"Failed to get import statistics: {str(e)}")
return {
"total_imports": 0,
"completed_imports": 0,
"failed_imports": 0,
"success_rate": 0,
}
def get_order_statistics(self, db: Session) -> dict[str, Any]:
"""
Get order statistics.
Args:
db: Database session
Returns:
Dictionary with order statistics
Note:
TODO: Implement when Order model is fully available
"""
return {"total_orders": 0, "pending_orders": 0, "completed_orders": 0}
def get_product_statistics(self, db: Session) -> dict[str, Any]:
"""
Get product statistics.
Args:
db: Database session
Returns:
Dictionary with product statistics
Note:
TODO: Implement when Product model is fully available
"""
return {"total_products": 0, "active_products": 0, "out_of_stock": 0}
# ========================================================================
# PRIVATE HELPER METHODS
# ========================================================================
def _parse_period(self, period: str) -> int:
"""
Parse period string to days.
Args:
period: Period string (7d, 30d, 90d, 1y)
Returns:
Number of days
"""
period_map = {
"7d": 7,
"30d": 30,
"90d": 90,
"1y": 365,
}
return period_map.get(period, 30)
def _get_unique_brands_count(self, db: Session) -> int:
"""
Get count of unique brands.
Args:
db: Database session
Returns:
Count of unique brands
"""
return (
db.query(MarketplaceProduct.brand)
.filter(
MarketplaceProduct.brand.isnot(None), MarketplaceProduct.brand != ""
)
.distinct()
.count()
)
def _get_unique_categories_count(self, db: Session) -> int:
"""
Get count of unique categories.
Args:
db: Database session
Returns:
Count of unique categories
"""
return (
db.query(MarketplaceProduct.google_product_category)
.filter(
MarketplaceProduct.google_product_category.isnot(None),
MarketplaceProduct.google_product_category != "",
)
.distinct()
.count()
)
def _get_inventory_statistics(self, db: Session) -> dict[str, int]:
"""
Get inventory-related statistics.
Args:
db: Database session
Returns:
Dictionary with inventory statistics
"""
total_entries = db.query(Inventory).count()
total_quantity = db.query(func.sum(Inventory.quantity)).scalar() or 0
total_reserved = db.query(func.sum(Inventory.reserved_quantity)).scalar() or 0
return {
"total_entries": total_entries,
"total_quantity": int(total_quantity),
"total_reserved": int(total_reserved),
"total_available": int(total_quantity - total_reserved),
}
# Create service instance
stats_service = StatsService()