Stats management revamping
This commit is contained in:
@@ -621,50 +621,6 @@ class AdminService:
|
||||
# STATISTICS
|
||||
# ============================================================================
|
||||
|
||||
def get_user_statistics(self, db: Session) -> dict:
|
||||
"""Get user statistics for admin dashboard."""
|
||||
try:
|
||||
total_users = db.query(User).count()
|
||||
active_users = db.query(User).filter(User.is_active == True).count()
|
||||
inactive_users = total_users - active_users
|
||||
admin_users = db.query(User).filter(User.role == "admin").count()
|
||||
|
||||
return {
|
||||
"total_users": total_users,
|
||||
"active_users": active_users,
|
||||
"inactive_users": inactive_users,
|
||||
"admin_users": admin_users,
|
||||
"activation_rate": (active_users / total_users * 100) if total_users > 0 else 0
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get user statistics: {str(e)}")
|
||||
raise AdminOperationException(
|
||||
operation="get_user_statistics",
|
||||
reason="Database query failed"
|
||||
)
|
||||
|
||||
def get_vendor_statistics(self, db: Session) -> dict:
|
||||
"""Get vendor statistics for admin dashboard."""
|
||||
try:
|
||||
total_vendors = db.query(Vendor).count()
|
||||
active_vendors = db.query(Vendor).filter(Vendor.is_active == True).count()
|
||||
verified_vendors = db.query(Vendor).filter(Vendor.is_verified == True).count()
|
||||
inactive_vendors = total_vendors - active_vendors
|
||||
|
||||
return {
|
||||
"total_vendors": total_vendors,
|
||||
"active_vendors": active_vendors,
|
||||
"inactive_vendors": inactive_vendors,
|
||||
"verified_vendors": verified_vendors,
|
||||
"verification_rate": (verified_vendors / total_vendors * 100) if total_vendors > 0 else 0
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get vendor statistics: {str(e)}")
|
||||
raise AdminOperationException(
|
||||
operation="get_vendor_statistics",
|
||||
reason="Database query failed"
|
||||
)
|
||||
|
||||
def get_recent_vendors(self, db: Session, limit: int = 5) -> List[dict]:
|
||||
"""Get recently created vendors."""
|
||||
try:
|
||||
@@ -716,45 +672,6 @@ class AdminService:
|
||||
logger.error(f"Failed to get recent import jobs: {str(e)}")
|
||||
return []
|
||||
|
||||
def get_product_statistics(self, db: Session) -> dict:
|
||||
"""Get product statistics."""
|
||||
# TODO: Implement when Product model is available
|
||||
return {
|
||||
"total_products": 0,
|
||||
"active_products": 0,
|
||||
"out_of_stock": 0
|
||||
}
|
||||
|
||||
def get_order_statistics(self, db: Session) -> dict:
|
||||
"""Get order statistics."""
|
||||
# TODO: Implement when Order model is available
|
||||
return {
|
||||
"total_orders": 0,
|
||||
"pending_orders": 0,
|
||||
"completed_orders": 0
|
||||
}
|
||||
|
||||
def get_import_statistics(self, db: Session) -> dict:
|
||||
"""Get import job statistics."""
|
||||
try:
|
||||
total = db.query(MarketplaceImportJob).count()
|
||||
completed = db.query(MarketplaceImportJob).filter(
|
||||
MarketplaceImportJob.status == "completed"
|
||||
).count()
|
||||
failed = db.query(MarketplaceImportJob).filter(
|
||||
MarketplaceImportJob.status == "failed"
|
||||
).count()
|
||||
|
||||
return {
|
||||
"total_imports": total,
|
||||
"completed_imports": completed,
|
||||
"failed_imports": failed,
|
||||
"success_rate": (completed / total * 100) if total > 0 else 0
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get import statistics: {str(e)}")
|
||||
return {"total_imports": 0, "completed_imports": 0, "failed_imports": 0, "success_rate": 0}
|
||||
|
||||
# ============================================================================
|
||||
# PRIVATE HELPER METHODS
|
||||
# ============================================================================
|
||||
|
||||
Reference in New Issue
Block a user