refactor: convert legacy services/tasks to re-exports

Legacy service and task files now re-export from module locations:

app/services/:
- stats_service.py -> app.modules.analytics.services
- usage_service.py -> app.modules.analytics.services

app/tasks/celery_tasks/:
- code_quality.py -> app.modules.dev_tools.tasks
- test_runner.py -> app.modules.dev_tools.tasks

Maintains backwards compatibility while actual code lives
in self-contained modules.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-28 22:22:28 +01:00
parent 3ffa337fca
commit bf871dc9f9
4 changed files with 61 additions and 1351 deletions

View File

@@ -1,621 +1,18 @@
# app/services/stats_service.py
"""
Statistics service for generating system analytics and metrics.
Statistics service - LEGACY LOCATION
This module provides:
- System-wide statistics (admin)
- Vendor-specific statistics
- Marketplace analytics
- Performance metrics
This file exists for backward compatibility.
The canonical location is now: app/modules/analytics/services/stats_service.py
All imports should use the new location:
from app.modules.analytics.services import stats_service, StatsService
"""
import logging
from datetime import datetime, timedelta
from typing import Any
# Re-export from canonical location for backward compatibility
from app.modules.analytics.services.stats_service import (
stats_service,
StatsService,
)
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.exceptions import AdminOperationException, VendorNotFoundException
from models.database.customer import Customer
from models.database.inventory import Inventory
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.marketplace_product import MarketplaceProduct
from models.database.order import Order
from models.database.product import Product
from models.database.user import User
from models.database.vendor import Vendor
logger = logging.getLogger(__name__)
class StatsService:
"""Service for statistics operations."""
# ========================================================================
# VENDOR-SPECIFIC STATISTICS
# ========================================================================
def get_vendor_stats(self, db: Session, vendor_id: int) -> dict[str, Any]:
"""
Get statistics for a specific vendor.
Args:
db: Database session
vendor_id: Vendor ID
Returns:
Dictionary with vendor statistics
Raises:
VendorNotFoundException: If vendor doesn't exist
AdminOperationException: If database query fails
"""
# Verify vendor exists
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
try:
# Catalog statistics
total_catalog_products = (
db.query(Product)
.filter(Product.vendor_id == vendor_id, Product.is_active == True)
.count()
)
featured_products = (
db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.is_featured == True,
Product.is_active == True,
)
.count()
)
# Staging statistics
# TODO: This is fragile - MarketplaceProduct uses vendor_name (string) not vendor_id
# Should add vendor_id foreign key to MarketplaceProduct for robust querying
# For now, matching by vendor name which could fail if names don't match exactly
staging_products = (
db.query(MarketplaceProduct)
.filter(MarketplaceProduct.vendor_name == vendor.name)
.count()
)
# Inventory statistics
total_inventory = (
db.query(func.sum(Inventory.quantity))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
reserved_inventory = (
db.query(func.sum(Inventory.reserved_quantity))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
inventory_locations = (
db.query(func.count(func.distinct(Inventory.location)))
.filter(Inventory.vendor_id == vendor_id)
.scalar()
or 0
)
# Import statistics
total_imports = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.vendor_id == vendor_id)
.count()
)
successful_imports = (
db.query(MarketplaceImportJob)
.filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.status == "completed",
)
.count()
)
# Orders
total_orders = db.query(Order).filter(Order.vendor_id == vendor_id).count()
# Customers
total_customers = (
db.query(Customer).filter(Customer.vendor_id == vendor_id).count()
)
# Return flat structure compatible with VendorDashboardStatsResponse schema
# The endpoint will restructure this into nested format
return {
# Product stats
"total_products": total_catalog_products,
"active_products": total_catalog_products,
"featured_products": featured_products,
# Order stats (TODO: implement when Order model has status field)
"total_orders": total_orders,
"pending_orders": 0, # TODO: filter by status
"completed_orders": 0, # TODO: filter by status
# Customer stats
"total_customers": total_customers,
"active_customers": 0, # TODO: implement active customer logic
# Revenue stats (TODO: implement when Order model has amount field)
"total_revenue": 0,
"revenue_this_month": 0,
# Import stats
"total_imports": total_imports,
"successful_imports": successful_imports,
"import_success_rate": (
(successful_imports / total_imports * 100)
if total_imports > 0
else 0
),
# Staging stats
"imported_products": staging_products,
# Inventory stats
"total_inventory_quantity": int(total_inventory),
"reserved_inventory_quantity": int(reserved_inventory),
"available_inventory_quantity": int(
total_inventory - reserved_inventory
),
"inventory_locations_count": inventory_locations,
}
except VendorNotFoundException:
raise
except Exception as e:
logger.error(
f"Failed to retrieve vendor statistics for vendor {vendor_id}: {str(e)}"
)
raise AdminOperationException(
operation="get_vendor_stats",
reason=f"Database query failed: {str(e)}",
target_type="vendor",
target_id=str(vendor_id),
)
def get_vendor_analytics(
self, db: Session, vendor_id: int, period: str = "30d"
) -> dict[str, Any]:
"""
Get a specific vendor analytics for a time period.
Args:
db: Database session
vendor_id: Vendor ID
period: Time period (7d, 30d, 90d, 1y)
Returns:
Analytics data
Raises:
VendorNotFoundException: If vendor doesn't exist
AdminOperationException: If database query fails
"""
# Verify vendor exists
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
try:
# Parse period
days = self._parse_period(period)
start_date = datetime.utcnow() - timedelta(days=days)
# Import activity
recent_imports = (
db.query(MarketplaceImportJob)
.filter(
MarketplaceImportJob.vendor_id == vendor_id,
MarketplaceImportJob.created_at >= start_date,
)
.count()
)
# Products added to catalog
products_added = (
db.query(Product)
.filter(
Product.vendor_id == vendor_id, Product.created_at >= start_date
)
.count()
)
# Inventory changes
inventory_entries = (
db.query(Inventory).filter(Inventory.vendor_id == vendor_id).count()
)
return {
"period": period,
"start_date": start_date.isoformat(),
"imports": {
"count": recent_imports,
},
"catalog": {
"products_added": products_added,
},
"inventory": {
"total_locations": inventory_entries,
},
}
except VendorNotFoundException:
raise
except Exception as e:
logger.error(
f"Failed to retrieve vendor analytics for vendor {vendor_id}: {str(e)}"
)
raise AdminOperationException(
operation="get_vendor_analytics",
reason=f"Database query failed: {str(e)}",
target_type="vendor",
target_id=str(vendor_id),
)
def get_vendor_statistics(self, db: Session) -> dict:
"""Get vendor statistics for admin dashboard.
Returns dict compatible with VendorStatsResponse schema.
Keys: total, verified, pending, inactive (mapped from internal names)
"""
try:
total_vendors = db.query(Vendor).count()
active_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
verified_vendors = (
db.query(Vendor).filter(Vendor.is_verified == True).count()
)
inactive_vendors = total_vendors - active_vendors
# Pending = active but not yet verified
pending_vendors = (
db.query(Vendor)
.filter(Vendor.is_active == True, Vendor.is_verified == False)
.count()
)
return {
# Schema-compatible fields (VendorStatsResponse)
"total": total_vendors,
"verified": verified_vendors,
"pending": pending_vendors,
"inactive": inactive_vendors,
# Legacy fields for backward compatibility
"total_vendors": total_vendors,
"active_vendors": active_vendors,
"inactive_vendors": inactive_vendors,
"verified_vendors": verified_vendors,
"pending_vendors": pending_vendors,
"verification_rate": (
(verified_vendors / total_vendors * 100) if total_vendors > 0 else 0
),
}
except Exception as e:
logger.error(f"Failed to get vendor statistics: {str(e)}")
raise AdminOperationException(
operation="get_vendor_statistics", reason="Database query failed"
)
# ========================================================================
# SYSTEM-WIDE STATISTICS (ADMIN)
# ========================================================================
def get_comprehensive_stats(self, db: Session) -> dict[str, Any]:
"""
Get comprehensive system statistics for admin dashboard.
Args:
db: Database session
Returns:
Dictionary with comprehensive statistics
Raises:
AdminOperationException: If database query fails
"""
try:
# Vendors
total_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
# Products
total_catalog_products = db.query(Product).count()
unique_brands = self._get_unique_brands_count(db)
unique_categories = self._get_unique_categories_count(db)
# Marketplaces
unique_marketplaces = (
db.query(MarketplaceProduct.marketplace)
.filter(MarketplaceProduct.marketplace.isnot(None))
.distinct()
.count()
)
# Inventory
inventory_stats = self._get_inventory_statistics(db)
return {
"total_products": total_catalog_products,
"unique_brands": unique_brands,
"unique_categories": unique_categories,
"unique_marketplaces": unique_marketplaces,
"unique_vendors": total_vendors,
"total_inventory_entries": inventory_stats.get("total_entries", 0),
"total_inventory_quantity": inventory_stats.get("total_quantity", 0),
}
except Exception as e:
logger.error(f"Failed to retrieve comprehensive statistics: {str(e)}")
raise AdminOperationException(
operation="get_comprehensive_stats",
reason=f"Database query failed: {str(e)}",
)
def get_marketplace_breakdown_stats(self, db: Session) -> list[dict[str, Any]]:
"""
Get statistics broken down by marketplace.
Args:
db: Database session
Returns:
List of marketplace statistics
Raises:
AdminOperationException: If database query fails
"""
try:
marketplace_stats = (
db.query(
MarketplaceProduct.marketplace,
func.count(MarketplaceProduct.id).label("total_products"),
func.count(func.distinct(MarketplaceProduct.vendor_name)).label(
"unique_vendors"
),
func.count(func.distinct(MarketplaceProduct.brand)).label(
"unique_brands"
),
)
.filter(MarketplaceProduct.marketplace.isnot(None))
.group_by(MarketplaceProduct.marketplace)
.all()
)
return [
{
"marketplace": stat.marketplace,
"total_products": stat.total_products,
"unique_vendors": stat.unique_vendors,
"unique_brands": stat.unique_brands,
}
for stat in marketplace_stats
]
except Exception as e:
logger.error(
f"Failed to retrieve marketplace breakdown statistics: {str(e)}"
)
raise AdminOperationException(
operation="get_marketplace_breakdown_stats",
reason=f"Database query failed: {str(e)}",
)
def get_user_statistics(self, db: Session) -> dict[str, Any]:
"""
Get user statistics for admin dashboard.
Args:
db: Database session
Returns:
Dictionary with user statistics
Raises:
AdminOperationException: If database query fails
"""
try:
total_users = db.query(User).count()
active_users = db.query(User).filter(User.is_active == True).count()
inactive_users = total_users - active_users
admin_users = db.query(User).filter(User.role == "admin").count()
return {
"total_users": total_users,
"active_users": active_users,
"inactive_users": inactive_users,
"admin_users": admin_users,
"activation_rate": (
(active_users / total_users * 100) if total_users > 0 else 0
),
}
except Exception as e:
logger.error(f"Failed to get user statistics: {str(e)}")
raise AdminOperationException(
operation="get_user_statistics", reason="Database query failed"
)
def get_import_statistics(self, db: Session) -> dict[str, Any]:
"""
Get import job statistics.
Args:
db: Database session
Returns:
Dictionary with import statistics
Raises:
AdminOperationException: If database query fails
"""
try:
total = db.query(MarketplaceImportJob).count()
pending = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.status == "pending")
.count()
)
processing = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.status == "processing")
.count()
)
completed = (
db.query(MarketplaceImportJob)
.filter(
MarketplaceImportJob.status.in_(
["completed", "completed_with_errors"]
)
)
.count()
)
failed = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.status == "failed")
.count()
)
return {
# Frontend-expected fields
"total": total,
"pending": pending,
"processing": processing,
"completed": completed,
"failed": failed,
# Legacy fields for backward compatibility
"total_imports": total,
"completed_imports": completed,
"failed_imports": failed,
"success_rate": (completed / total * 100) if total > 0 else 0,
}
except Exception as e:
logger.error(f"Failed to get import statistics: {str(e)}")
return {
"total": 0,
"pending": 0,
"processing": 0,
"completed": 0,
"failed": 0,
"total_imports": 0,
"completed_imports": 0,
"failed_imports": 0,
"success_rate": 0,
}
def get_order_statistics(self, db: Session) -> dict[str, Any]:
"""
Get order statistics.
Args:
db: Database session
Returns:
Dictionary with order statistics
Note:
TODO: Implement when Order model is fully available
"""
return {"total_orders": 0, "pending_orders": 0, "completed_orders": 0}
def get_product_statistics(self, db: Session) -> dict[str, Any]:
"""
Get product statistics.
Args:
db: Database session
Returns:
Dictionary with product statistics
Note:
TODO: Implement when Product model is fully available
"""
return {"total_products": 0, "active_products": 0, "out_of_stock": 0}
# ========================================================================
# PRIVATE HELPER METHODS
# ========================================================================
def _parse_period(self, period: str) -> int:
"""
Parse period string to days.
Args:
period: Period string (7d, 30d, 90d, 1y)
Returns:
Number of days
"""
period_map = {
"7d": 7,
"30d": 30,
"90d": 90,
"1y": 365,
}
return period_map.get(period, 30)
def _get_unique_brands_count(self, db: Session) -> int:
"""
Get count of unique brands.
Args:
db: Database session
Returns:
Count of unique brands
"""
return (
db.query(MarketplaceProduct.brand)
.filter(
MarketplaceProduct.brand.isnot(None), MarketplaceProduct.brand != ""
)
.distinct()
.count()
)
def _get_unique_categories_count(self, db: Session) -> int:
"""
Get count of unique categories.
Args:
db: Database session
Returns:
Count of unique categories
"""
return (
db.query(MarketplaceProduct.google_product_category)
.filter(
MarketplaceProduct.google_product_category.isnot(None),
MarketplaceProduct.google_product_category != "",
)
.distinct()
.count()
)
def _get_inventory_statistics(self, db: Session) -> dict[str, int]:
"""
Get inventory-related statistics.
Args:
db: Database session
Returns:
Dictionary with inventory statistics
"""
total_entries = db.query(Inventory).count()
total_quantity = db.query(func.sum(Inventory.quantity)).scalar() or 0
total_reserved = db.query(func.sum(Inventory.reserved_quantity)).scalar() or 0
return {
"total_entries": total_entries,
"total_quantity": int(total_quantity),
"total_reserved": int(total_reserved),
"total_available": int(total_quantity - total_reserved),
}
# Create service instance
stats_service = StatsService()
__all__ = ["stats_service", "StatsService"]