diff --git a/app/services/stats_service.py b/app/services/stats_service.py index a5a66716..27f47574 100644 --- a/app/services/stats_service.py +++ b/app/services/stats_service.py @@ -1,621 +1,18 @@ # app/services/stats_service.py """ -Statistics service for generating system analytics and metrics. +Statistics service - LEGACY LOCATION -This module provides: -- System-wide statistics (admin) -- Vendor-specific statistics -- Marketplace analytics -- Performance metrics +This file exists for backward compatibility. +The canonical location is now: app/modules/analytics/services/stats_service.py + +All imports should use the new location: + from app.modules.analytics.services import stats_service, StatsService """ -import logging -from datetime import datetime, timedelta -from typing import Any +# Re-export from canonical location for backward compatibility +from app.modules.analytics.services.stats_service import ( + stats_service, + StatsService, +) -from sqlalchemy import func -from sqlalchemy.orm import Session - -from app.exceptions import AdminOperationException, VendorNotFoundException -from models.database.customer import Customer -from models.database.inventory import Inventory -from models.database.marketplace_import_job import MarketplaceImportJob -from models.database.marketplace_product import MarketplaceProduct -from models.database.order import Order -from models.database.product import Product -from models.database.user import User -from models.database.vendor import Vendor - -logger = logging.getLogger(__name__) - - -class StatsService: - """Service for statistics operations.""" - - # ======================================================================== - # VENDOR-SPECIFIC STATISTICS - # ======================================================================== - - def get_vendor_stats(self, db: Session, vendor_id: int) -> dict[str, Any]: - """ - Get statistics for a specific vendor. - - Args: - db: Database session - vendor_id: Vendor ID - - Returns: - Dictionary with vendor statistics - - Raises: - VendorNotFoundException: If vendor doesn't exist - AdminOperationException: If database query fails - """ - # Verify vendor exists - vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() - if not vendor: - raise VendorNotFoundException(str(vendor_id), identifier_type="id") - - try: - # Catalog statistics - total_catalog_products = ( - db.query(Product) - .filter(Product.vendor_id == vendor_id, Product.is_active == True) - .count() - ) - - featured_products = ( - db.query(Product) - .filter( - Product.vendor_id == vendor_id, - Product.is_featured == True, - Product.is_active == True, - ) - .count() - ) - - # Staging statistics - # TODO: This is fragile - MarketplaceProduct uses vendor_name (string) not vendor_id - # Should add vendor_id foreign key to MarketplaceProduct for robust querying - # For now, matching by vendor name which could fail if names don't match exactly - staging_products = ( - db.query(MarketplaceProduct) - .filter(MarketplaceProduct.vendor_name == vendor.name) - .count() - ) - - # Inventory statistics - total_inventory = ( - db.query(func.sum(Inventory.quantity)) - .filter(Inventory.vendor_id == vendor_id) - .scalar() - or 0 - ) - - reserved_inventory = ( - db.query(func.sum(Inventory.reserved_quantity)) - .filter(Inventory.vendor_id == vendor_id) - .scalar() - or 0 - ) - - inventory_locations = ( - db.query(func.count(func.distinct(Inventory.location))) - .filter(Inventory.vendor_id == vendor_id) - .scalar() - or 0 - ) - - # Import statistics - total_imports = ( - db.query(MarketplaceImportJob) - .filter(MarketplaceImportJob.vendor_id == vendor_id) - .count() - ) - - successful_imports = ( - db.query(MarketplaceImportJob) - .filter( - MarketplaceImportJob.vendor_id == vendor_id, - MarketplaceImportJob.status == "completed", - ) - .count() - ) - - # Orders - total_orders = db.query(Order).filter(Order.vendor_id == vendor_id).count() - - # Customers - total_customers = ( - db.query(Customer).filter(Customer.vendor_id == vendor_id).count() - ) - - # Return flat structure compatible with VendorDashboardStatsResponse schema - # The endpoint will restructure this into nested format - return { - # Product stats - "total_products": total_catalog_products, - "active_products": total_catalog_products, - "featured_products": featured_products, - # Order stats (TODO: implement when Order model has status field) - "total_orders": total_orders, - "pending_orders": 0, # TODO: filter by status - "completed_orders": 0, # TODO: filter by status - # Customer stats - "total_customers": total_customers, - "active_customers": 0, # TODO: implement active customer logic - # Revenue stats (TODO: implement when Order model has amount field) - "total_revenue": 0, - "revenue_this_month": 0, - # Import stats - "total_imports": total_imports, - "successful_imports": successful_imports, - "import_success_rate": ( - (successful_imports / total_imports * 100) - if total_imports > 0 - else 0 - ), - # Staging stats - "imported_products": staging_products, - # Inventory stats - "total_inventory_quantity": int(total_inventory), - "reserved_inventory_quantity": int(reserved_inventory), - "available_inventory_quantity": int( - total_inventory - reserved_inventory - ), - "inventory_locations_count": inventory_locations, - } - - except VendorNotFoundException: - raise - except Exception as e: - logger.error( - f"Failed to retrieve vendor statistics for vendor {vendor_id}: {str(e)}" - ) - raise AdminOperationException( - operation="get_vendor_stats", - reason=f"Database query failed: {str(e)}", - target_type="vendor", - target_id=str(vendor_id), - ) - - def get_vendor_analytics( - self, db: Session, vendor_id: int, period: str = "30d" - ) -> dict[str, Any]: - """ - Get a specific vendor analytics for a time period. - - Args: - db: Database session - vendor_id: Vendor ID - period: Time period (7d, 30d, 90d, 1y) - - Returns: - Analytics data - - Raises: - VendorNotFoundException: If vendor doesn't exist - AdminOperationException: If database query fails - """ - # Verify vendor exists - vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() - if not vendor: - raise VendorNotFoundException(str(vendor_id), identifier_type="id") - - try: - # Parse period - days = self._parse_period(period) - start_date = datetime.utcnow() - timedelta(days=days) - - # Import activity - recent_imports = ( - db.query(MarketplaceImportJob) - .filter( - MarketplaceImportJob.vendor_id == vendor_id, - MarketplaceImportJob.created_at >= start_date, - ) - .count() - ) - - # Products added to catalog - products_added = ( - db.query(Product) - .filter( - Product.vendor_id == vendor_id, Product.created_at >= start_date - ) - .count() - ) - - # Inventory changes - inventory_entries = ( - db.query(Inventory).filter(Inventory.vendor_id == vendor_id).count() - ) - - return { - "period": period, - "start_date": start_date.isoformat(), - "imports": { - "count": recent_imports, - }, - "catalog": { - "products_added": products_added, - }, - "inventory": { - "total_locations": inventory_entries, - }, - } - - except VendorNotFoundException: - raise - except Exception as e: - logger.error( - f"Failed to retrieve vendor analytics for vendor {vendor_id}: {str(e)}" - ) - raise AdminOperationException( - operation="get_vendor_analytics", - reason=f"Database query failed: {str(e)}", - target_type="vendor", - target_id=str(vendor_id), - ) - - def get_vendor_statistics(self, db: Session) -> dict: - """Get vendor statistics for admin dashboard. - - Returns dict compatible with VendorStatsResponse schema. - Keys: total, verified, pending, inactive (mapped from internal names) - """ - try: - total_vendors = db.query(Vendor).count() - active_vendors = db.query(Vendor).filter(Vendor.is_active == True).count() - verified_vendors = ( - db.query(Vendor).filter(Vendor.is_verified == True).count() - ) - inactive_vendors = total_vendors - active_vendors - # Pending = active but not yet verified - pending_vendors = ( - db.query(Vendor) - .filter(Vendor.is_active == True, Vendor.is_verified == False) - .count() - ) - - return { - # Schema-compatible fields (VendorStatsResponse) - "total": total_vendors, - "verified": verified_vendors, - "pending": pending_vendors, - "inactive": inactive_vendors, - # Legacy fields for backward compatibility - "total_vendors": total_vendors, - "active_vendors": active_vendors, - "inactive_vendors": inactive_vendors, - "verified_vendors": verified_vendors, - "pending_vendors": pending_vendors, - "verification_rate": ( - (verified_vendors / total_vendors * 100) if total_vendors > 0 else 0 - ), - } - except Exception as e: - logger.error(f"Failed to get vendor statistics: {str(e)}") - raise AdminOperationException( - operation="get_vendor_statistics", reason="Database query failed" - ) - - # ======================================================================== - # SYSTEM-WIDE STATISTICS (ADMIN) - # ======================================================================== - - def get_comprehensive_stats(self, db: Session) -> dict[str, Any]: - """ - Get comprehensive system statistics for admin dashboard. - - Args: - db: Database session - - Returns: - Dictionary with comprehensive statistics - - Raises: - AdminOperationException: If database query fails - """ - try: - # Vendors - total_vendors = db.query(Vendor).filter(Vendor.is_active == True).count() - - # Products - total_catalog_products = db.query(Product).count() - unique_brands = self._get_unique_brands_count(db) - unique_categories = self._get_unique_categories_count(db) - - # Marketplaces - unique_marketplaces = ( - db.query(MarketplaceProduct.marketplace) - .filter(MarketplaceProduct.marketplace.isnot(None)) - .distinct() - .count() - ) - - # Inventory - inventory_stats = self._get_inventory_statistics(db) - - return { - "total_products": total_catalog_products, - "unique_brands": unique_brands, - "unique_categories": unique_categories, - "unique_marketplaces": unique_marketplaces, - "unique_vendors": total_vendors, - "total_inventory_entries": inventory_stats.get("total_entries", 0), - "total_inventory_quantity": inventory_stats.get("total_quantity", 0), - } - - except Exception as e: - logger.error(f"Failed to retrieve comprehensive statistics: {str(e)}") - raise AdminOperationException( - operation="get_comprehensive_stats", - reason=f"Database query failed: {str(e)}", - ) - - def get_marketplace_breakdown_stats(self, db: Session) -> list[dict[str, Any]]: - """ - Get statistics broken down by marketplace. - - Args: - db: Database session - - Returns: - List of marketplace statistics - - Raises: - AdminOperationException: If database query fails - """ - try: - marketplace_stats = ( - db.query( - MarketplaceProduct.marketplace, - func.count(MarketplaceProduct.id).label("total_products"), - func.count(func.distinct(MarketplaceProduct.vendor_name)).label( - "unique_vendors" - ), - func.count(func.distinct(MarketplaceProduct.brand)).label( - "unique_brands" - ), - ) - .filter(MarketplaceProduct.marketplace.isnot(None)) - .group_by(MarketplaceProduct.marketplace) - .all() - ) - - return [ - { - "marketplace": stat.marketplace, - "total_products": stat.total_products, - "unique_vendors": stat.unique_vendors, - "unique_brands": stat.unique_brands, - } - for stat in marketplace_stats - ] - - except Exception as e: - logger.error( - f"Failed to retrieve marketplace breakdown statistics: {str(e)}" - ) - raise AdminOperationException( - operation="get_marketplace_breakdown_stats", - reason=f"Database query failed: {str(e)}", - ) - - def get_user_statistics(self, db: Session) -> dict[str, Any]: - """ - Get user statistics for admin dashboard. - - Args: - db: Database session - - Returns: - Dictionary with user statistics - - Raises: - AdminOperationException: If database query fails - """ - try: - total_users = db.query(User).count() - active_users = db.query(User).filter(User.is_active == True).count() - inactive_users = total_users - active_users - admin_users = db.query(User).filter(User.role == "admin").count() - - return { - "total_users": total_users, - "active_users": active_users, - "inactive_users": inactive_users, - "admin_users": admin_users, - "activation_rate": ( - (active_users / total_users * 100) if total_users > 0 else 0 - ), - } - except Exception as e: - logger.error(f"Failed to get user statistics: {str(e)}") - raise AdminOperationException( - operation="get_user_statistics", reason="Database query failed" - ) - - def get_import_statistics(self, db: Session) -> dict[str, Any]: - """ - Get import job statistics. - - Args: - db: Database session - - Returns: - Dictionary with import statistics - - Raises: - AdminOperationException: If database query fails - """ - try: - total = db.query(MarketplaceImportJob).count() - pending = ( - db.query(MarketplaceImportJob) - .filter(MarketplaceImportJob.status == "pending") - .count() - ) - processing = ( - db.query(MarketplaceImportJob) - .filter(MarketplaceImportJob.status == "processing") - .count() - ) - completed = ( - db.query(MarketplaceImportJob) - .filter( - MarketplaceImportJob.status.in_( - ["completed", "completed_with_errors"] - ) - ) - .count() - ) - failed = ( - db.query(MarketplaceImportJob) - .filter(MarketplaceImportJob.status == "failed") - .count() - ) - - return { - # Frontend-expected fields - "total": total, - "pending": pending, - "processing": processing, - "completed": completed, - "failed": failed, - # Legacy fields for backward compatibility - "total_imports": total, - "completed_imports": completed, - "failed_imports": failed, - "success_rate": (completed / total * 100) if total > 0 else 0, - } - except Exception as e: - logger.error(f"Failed to get import statistics: {str(e)}") - return { - "total": 0, - "pending": 0, - "processing": 0, - "completed": 0, - "failed": 0, - "total_imports": 0, - "completed_imports": 0, - "failed_imports": 0, - "success_rate": 0, - } - - def get_order_statistics(self, db: Session) -> dict[str, Any]: - """ - Get order statistics. - - Args: - db: Database session - - Returns: - Dictionary with order statistics - - Note: - TODO: Implement when Order model is fully available - """ - return {"total_orders": 0, "pending_orders": 0, "completed_orders": 0} - - def get_product_statistics(self, db: Session) -> dict[str, Any]: - """ - Get product statistics. - - Args: - db: Database session - - Returns: - Dictionary with product statistics - - Note: - TODO: Implement when Product model is fully available - """ - return {"total_products": 0, "active_products": 0, "out_of_stock": 0} - - # ======================================================================== - # PRIVATE HELPER METHODS - # ======================================================================== - - def _parse_period(self, period: str) -> int: - """ - Parse period string to days. - - Args: - period: Period string (7d, 30d, 90d, 1y) - - Returns: - Number of days - """ - period_map = { - "7d": 7, - "30d": 30, - "90d": 90, - "1y": 365, - } - return period_map.get(period, 30) - - def _get_unique_brands_count(self, db: Session) -> int: - """ - Get count of unique brands. - - Args: - db: Database session - - Returns: - Count of unique brands - """ - return ( - db.query(MarketplaceProduct.brand) - .filter( - MarketplaceProduct.brand.isnot(None), MarketplaceProduct.brand != "" - ) - .distinct() - .count() - ) - - def _get_unique_categories_count(self, db: Session) -> int: - """ - Get count of unique categories. - - Args: - db: Database session - - Returns: - Count of unique categories - """ - return ( - db.query(MarketplaceProduct.google_product_category) - .filter( - MarketplaceProduct.google_product_category.isnot(None), - MarketplaceProduct.google_product_category != "", - ) - .distinct() - .count() - ) - - def _get_inventory_statistics(self, db: Session) -> dict[str, int]: - """ - Get inventory-related statistics. - - Args: - db: Database session - - Returns: - Dictionary with inventory statistics - """ - total_entries = db.query(Inventory).count() - total_quantity = db.query(func.sum(Inventory.quantity)).scalar() or 0 - total_reserved = db.query(func.sum(Inventory.reserved_quantity)).scalar() or 0 - - return { - "total_entries": total_entries, - "total_quantity": int(total_quantity), - "total_reserved": int(total_reserved), - "total_available": int(total_quantity - total_reserved), - } - - -# Create service instance -stats_service = StatsService() +__all__ = ["stats_service", "StatsService"] diff --git a/app/services/usage_service.py b/app/services/usage_service.py index d650109b..9cc42567 100644 --- a/app/services/usage_service.py +++ b/app/services/usage_service.py @@ -1,438 +1,24 @@ # app/services/usage_service.py """ -Usage and limits service. +Usage and limits service - LEGACY LOCATION -Provides methods for: -- Getting current usage vs limits -- Calculating upgrade recommendations -- Checking limits before actions +This file exists for backward compatibility. +The canonical location is now: app/modules/analytics/services/usage_service.py + +All imports should use the new location: + from app.modules.analytics.services import usage_service, UsageService """ -import logging -from dataclasses import dataclass - -from sqlalchemy import func -from sqlalchemy.orm import Session - -from models.database.product import Product -from models.database.subscription import SubscriptionTier, VendorSubscription -from models.database.vendor import VendorUser - -logger = logging.getLogger(__name__) - - -@dataclass -class UsageMetricData: - """Usage metric data.""" - - name: str - current: int - limit: int | None - percentage: float - is_unlimited: bool - is_at_limit: bool - is_approaching_limit: bool - - -@dataclass -class TierInfoData: - """Tier information.""" - - code: str - name: str - price_monthly_cents: int - is_highest_tier: bool - - -@dataclass -class UpgradeTierData: - """Upgrade tier information.""" - - code: str - name: str - price_monthly_cents: int - price_increase_cents: int - benefits: list[str] - - -@dataclass -class UsageData: - """Full usage data.""" - - tier: TierInfoData - usage: list[UsageMetricData] - has_limits_approaching: bool - has_limits_reached: bool - upgrade_available: bool - upgrade_tier: UpgradeTierData | None - upgrade_reasons: list[str] - - -@dataclass -class LimitCheckData: - """Limit check result.""" - - limit_type: str - can_proceed: bool - current: int - limit: int | None - percentage: float - message: str | None - upgrade_tier_code: str | None - upgrade_tier_name: str | None - - -class UsageService: - """Service for usage and limits management.""" - - def get_vendor_usage(self, db: Session, vendor_id: int) -> UsageData: - """ - Get comprehensive usage data for a vendor. - - Returns current usage, limits, and upgrade recommendations. - """ - from app.services.subscription_service import subscription_service - - # Get subscription - subscription = subscription_service.get_or_create_subscription(db, vendor_id) - - # Get current tier - tier = self._get_tier(db, subscription) - - # Calculate usage metrics - usage_metrics = self._calculate_usage_metrics(db, vendor_id, subscription) - - # Check for approaching/reached limits - has_limits_approaching = any(m.is_approaching_limit for m in usage_metrics) - has_limits_reached = any(m.is_at_limit for m in usage_metrics) - - # Get upgrade info - next_tier = self._get_next_tier(db, tier) - is_highest_tier = next_tier is None - - # Build upgrade info - upgrade_tier_info = None - upgrade_reasons = [] - - if next_tier: - upgrade_tier_info = self._build_upgrade_tier_info(tier, next_tier) - upgrade_reasons = self._build_upgrade_reasons( - usage_metrics, has_limits_reached, has_limits_approaching - ) - - return UsageData( - tier=TierInfoData( - code=tier.code if tier else subscription.tier, - name=tier.name if tier else subscription.tier.title(), - price_monthly_cents=tier.price_monthly_cents if tier else 0, - is_highest_tier=is_highest_tier, - ), - usage=usage_metrics, - has_limits_approaching=has_limits_approaching, - has_limits_reached=has_limits_reached, - upgrade_available=not is_highest_tier, - upgrade_tier=upgrade_tier_info, - upgrade_reasons=upgrade_reasons, - ) - - def check_limit( - self, db: Session, vendor_id: int, limit_type: str - ) -> LimitCheckData: - """ - Check a specific limit before performing an action. - - Args: - db: Database session - vendor_id: Vendor ID - limit_type: One of "orders", "products", "team_members" - - Returns: - LimitCheckData with proceed status and upgrade info - """ - from app.services.subscription_service import subscription_service - - if limit_type == "orders": - can_proceed, message = subscription_service.can_create_order(db, vendor_id) - subscription = subscription_service.get_subscription(db, vendor_id) - current = subscription.orders_this_period if subscription else 0 - limit = subscription.orders_limit if subscription else 0 - - elif limit_type == "products": - can_proceed, message = subscription_service.can_add_product(db, vendor_id) - subscription = subscription_service.get_subscription(db, vendor_id) - current = self._get_product_count(db, vendor_id) - limit = subscription.products_limit if subscription else 0 - - elif limit_type == "team_members": - can_proceed, message = subscription_service.can_add_team_member(db, vendor_id) - subscription = subscription_service.get_subscription(db, vendor_id) - current = self._get_team_member_count(db, vendor_id) - limit = subscription.team_members_limit if subscription else 0 - - else: - return LimitCheckData( - limit_type=limit_type, - can_proceed=True, - current=0, - limit=None, - percentage=0, - message=f"Unknown limit type: {limit_type}", - upgrade_tier_code=None, - upgrade_tier_name=None, - ) - - # Calculate percentage - is_unlimited = limit is None or limit < 0 - percentage = 0 if is_unlimited else (current / limit * 100 if limit > 0 else 100) - - # Get upgrade info if at limit - upgrade_tier_code = None - upgrade_tier_name = None - - if not can_proceed: - subscription = subscription_service.get_subscription(db, vendor_id) - current_tier = subscription.tier_obj if subscription else None - - if current_tier: - next_tier = self._get_next_tier(db, current_tier) - if next_tier: - upgrade_tier_code = next_tier.code - upgrade_tier_name = next_tier.name - - return LimitCheckData( - limit_type=limit_type, - can_proceed=can_proceed, - current=current, - limit=None if is_unlimited else limit, - percentage=percentage, - message=message, - upgrade_tier_code=upgrade_tier_code, - upgrade_tier_name=upgrade_tier_name, - ) - - # ========================================================================= - # Private Helper Methods - # ========================================================================= - - def _get_tier( - self, db: Session, subscription: VendorSubscription - ) -> SubscriptionTier | None: - """Get tier from subscription or query by code.""" - tier = subscription.tier_obj - if not tier: - tier = ( - db.query(SubscriptionTier) - .filter(SubscriptionTier.code == subscription.tier) - .first() - ) - return tier - - def _get_product_count(self, db: Session, vendor_id: int) -> int: - """Get product count for vendor.""" - return ( - db.query(func.count(Product.id)) - .filter(Product.vendor_id == vendor_id) - .scalar() - or 0 - ) - - def _get_team_member_count(self, db: Session, vendor_id: int) -> int: - """Get active team member count for vendor.""" - return ( - db.query(func.count(VendorUser.id)) - .filter(VendorUser.vendor_id == vendor_id, VendorUser.is_active == True) # noqa: E712 - .scalar() - or 0 - ) - - def _calculate_usage_metrics( - self, db: Session, vendor_id: int, subscription: VendorSubscription - ) -> list[UsageMetricData]: - """Calculate all usage metrics for a vendor.""" - metrics = [] - - # Orders this period - orders_current = subscription.orders_this_period or 0 - orders_limit = subscription.orders_limit - orders_unlimited = orders_limit is None or orders_limit < 0 - orders_percentage = ( - 0 - if orders_unlimited - else (orders_current / orders_limit * 100 if orders_limit > 0 else 100) - ) - - metrics.append( - UsageMetricData( - name="orders", - current=orders_current, - limit=None if orders_unlimited else orders_limit, - percentage=orders_percentage, - is_unlimited=orders_unlimited, - is_at_limit=not orders_unlimited and orders_current >= orders_limit, - is_approaching_limit=not orders_unlimited and orders_percentage >= 80, - ) - ) - - # Products - products_count = self._get_product_count(db, vendor_id) - products_limit = subscription.products_limit - products_unlimited = products_limit is None or products_limit < 0 - products_percentage = ( - 0 - if products_unlimited - else (products_count / products_limit * 100 if products_limit > 0 else 100) - ) - - metrics.append( - UsageMetricData( - name="products", - current=products_count, - limit=None if products_unlimited else products_limit, - percentage=products_percentage, - is_unlimited=products_unlimited, - is_at_limit=not products_unlimited and products_count >= products_limit, - is_approaching_limit=not products_unlimited and products_percentage >= 80, - ) - ) - - # Team members - team_count = self._get_team_member_count(db, vendor_id) - team_limit = subscription.team_members_limit - team_unlimited = team_limit is None or team_limit < 0 - team_percentage = ( - 0 - if team_unlimited - else (team_count / team_limit * 100 if team_limit > 0 else 100) - ) - - metrics.append( - UsageMetricData( - name="team_members", - current=team_count, - limit=None if team_unlimited else team_limit, - percentage=team_percentage, - is_unlimited=team_unlimited, - is_at_limit=not team_unlimited and team_count >= team_limit, - is_approaching_limit=not team_unlimited and team_percentage >= 80, - ) - ) - - return metrics - - def _get_next_tier( - self, db: Session, current_tier: SubscriptionTier | None - ) -> SubscriptionTier | None: - """Get next tier for upgrade.""" - current_tier_order = current_tier.display_order if current_tier else 0 - - return ( - db.query(SubscriptionTier) - .filter( - SubscriptionTier.is_active == True, # noqa: E712 - SubscriptionTier.display_order > current_tier_order, - ) - .order_by(SubscriptionTier.display_order) - .first() - ) - - def _build_upgrade_tier_info( - self, current_tier: SubscriptionTier | None, next_tier: SubscriptionTier - ) -> UpgradeTierData: - """Build upgrade tier information with benefits.""" - benefits = [] - - # Numeric limit benefits - if next_tier.orders_per_month and ( - not current_tier - or ( - current_tier.orders_per_month - and next_tier.orders_per_month > current_tier.orders_per_month - ) - ): - if next_tier.orders_per_month < 0: - benefits.append("Unlimited orders per month") - else: - benefits.append(f"{next_tier.orders_per_month:,} orders/month") - - if next_tier.products_limit and ( - not current_tier - or ( - current_tier.products_limit - and next_tier.products_limit > current_tier.products_limit - ) - ): - if next_tier.products_limit < 0: - benefits.append("Unlimited products") - else: - benefits.append(f"{next_tier.products_limit:,} products") - - if next_tier.team_members and ( - not current_tier - or ( - current_tier.team_members - and next_tier.team_members > current_tier.team_members - ) - ): - if next_tier.team_members < 0: - benefits.append("Unlimited team members") - else: - benefits.append(f"{next_tier.team_members} team members") - - # Feature benefits - current_features = ( - set(current_tier.features) if current_tier and current_tier.features else set() - ) - next_features = set(next_tier.features) if next_tier.features else set() - new_features = next_features - current_features - - feature_names = { - "analytics_dashboard": "Advanced Analytics", - "api_access": "API Access", - "automation_rules": "Automation Rules", - "team_roles": "Team Roles & Permissions", - "custom_domain": "Custom Domain", - "webhooks": "Webhooks", - "accounting_export": "Accounting Export", - } - for feature in list(new_features)[:3]: - if feature in feature_names: - benefits.append(feature_names[feature]) - - current_price = current_tier.price_monthly_cents if current_tier else 0 - - return UpgradeTierData( - code=next_tier.code, - name=next_tier.name, - price_monthly_cents=next_tier.price_monthly_cents, - price_increase_cents=next_tier.price_monthly_cents - current_price, - benefits=benefits, - ) - - def _build_upgrade_reasons( - self, - usage_metrics: list[UsageMetricData], - has_limits_reached: bool, - has_limits_approaching: bool, - ) -> list[str]: - """Build upgrade reasons based on usage.""" - reasons = [] - - if has_limits_reached: - for m in usage_metrics: - if m.is_at_limit: - reasons.append(f"You've reached your {m.name.replace('_', ' ')} limit") - elif has_limits_approaching: - for m in usage_metrics: - if m.is_approaching_limit: - reasons.append( - f"You're approaching your {m.name.replace('_', ' ')} limit ({int(m.percentage)}%)" - ) - - return reasons - - -# Singleton instance -usage_service = UsageService() +# Re-export from canonical location for backward compatibility +from app.modules.analytics.services.usage_service import ( + usage_service, + UsageService, + UsageData, + UsageMetricData, + TierInfoData, + UpgradeTierData, + LimitCheckData, +) __all__ = [ "usage_service", diff --git a/app/tasks/celery_tasks/code_quality.py b/app/tasks/celery_tasks/code_quality.py index efb99be3..8ade35e0 100644 --- a/app/tasks/celery_tasks/code_quality.py +++ b/app/tasks/celery_tasks/code_quality.py @@ -1,236 +1,31 @@ # app/tasks/celery_tasks/code_quality.py """ -Celery tasks for code quality scans. +Celery tasks for code quality scans - LEGACY LOCATION -Wraps the existing execute_code_quality_scan function for Celery execution. +This file exists for backward compatibility. +The canonical location is now: app/modules/dev_tools/tasks/code_quality.py + +All imports should use the new location: + from app.modules.dev_tools.tasks import execute_code_quality_scan """ -import json -import logging -import subprocess -from datetime import UTC, datetime - -from app.core.celery_config import celery_app -from app.services.admin_notification_service import admin_notification_service -from app.tasks.celery_tasks.base import DatabaseTask -from models.database.architecture_scan import ArchitectureScan, ArchitectureViolation - -logger = logging.getLogger(__name__) - -# Validator type constants -VALIDATOR_ARCHITECTURE = "architecture" -VALIDATOR_SECURITY = "security" -VALIDATOR_PERFORMANCE = "performance" - -VALID_VALIDATOR_TYPES = [VALIDATOR_ARCHITECTURE, VALIDATOR_SECURITY, VALIDATOR_PERFORMANCE] - -# Map validator types to their scripts -VALIDATOR_SCRIPTS = { - VALIDATOR_ARCHITECTURE: "scripts/validate_architecture.py", - VALIDATOR_SECURITY: "scripts/validate_security.py", - VALIDATOR_PERFORMANCE: "scripts/validate_performance.py", -} - -# Human-readable names -VALIDATOR_NAMES = { - VALIDATOR_ARCHITECTURE: "Architecture", - VALIDATOR_SECURITY: "Security", - VALIDATOR_PERFORMANCE: "Performance", -} - - -def _get_git_commit_hash() -> str | None: - """Get current git commit hash.""" - try: - result = subprocess.run( - ["git", "rev-parse", "HEAD"], - capture_output=True, - text=True, - timeout=5, - ) - if result.returncode == 0: - return result.stdout.strip()[:40] - except Exception: - pass - return None - - -@celery_app.task( - bind=True, - base=DatabaseTask, - name="app.tasks.celery_tasks.code_quality.execute_code_quality_scan", - max_retries=1, - time_limit=700, # 11+ minutes hard limit - soft_time_limit=600, # 10 minutes soft limit +# Re-export from canonical location for backward compatibility +from app.modules.dev_tools.tasks.code_quality import ( + execute_code_quality_scan, + VALIDATOR_ARCHITECTURE, + VALIDATOR_SECURITY, + VALIDATOR_PERFORMANCE, + VALID_VALIDATOR_TYPES, + VALIDATOR_SCRIPTS, + VALIDATOR_NAMES, ) -def execute_code_quality_scan(self, scan_id: int): - """ - Celery task to execute a code quality scan. - This task: - 1. Gets the scan record from DB - 2. Updates status to 'running' - 3. Runs the validator script - 4. Parses JSON output and creates violation records - 5. Updates scan with results and status 'completed' or 'failed' - - Args: - scan_id: ID of the ArchitectureScan record - - Returns: - dict: Scan results summary - """ - with self.get_db() as db: - # Get the scan record - scan = db.query(ArchitectureScan).filter(ArchitectureScan.id == scan_id).first() - if not scan: - logger.error(f"Code quality scan {scan_id} not found") - return {"error": f"Scan {scan_id} not found"} - - # Store Celery task ID - scan.celery_task_id = self.request.id - - validator_type = scan.validator_type - if validator_type not in VALID_VALIDATOR_TYPES: - scan.status = "failed" - scan.error_message = f"Invalid validator type: {validator_type}" - db.commit() - return {"error": f"Invalid validator type: {validator_type}"} - - script_path = VALIDATOR_SCRIPTS[validator_type] - validator_name = VALIDATOR_NAMES[validator_type] - - try: - # Update status to running - scan.status = "running" - scan.started_at = datetime.now(UTC) - scan.progress_message = f"Running {validator_name} validator..." - scan.git_commit_hash = _get_git_commit_hash() - db.commit() - - logger.info(f"Starting {validator_name} scan (scan_id={scan_id})") - - # Run validator with JSON output - start_time = datetime.now(UTC) - try: - result = subprocess.run( - ["python", script_path, "--json"], - capture_output=True, - text=True, - timeout=600, # 10 minute timeout - ) - except subprocess.TimeoutExpired: - logger.error(f"{validator_name} scan {scan_id} timed out after 10 minutes") - scan.status = "failed" - scan.error_message = "Scan timed out after 10 minutes" - scan.completed_at = datetime.now(UTC) - db.commit() - return {"error": "Scan timed out"} - - duration = (datetime.now(UTC) - start_time).total_seconds() - - # Update progress - scan.progress_message = "Parsing results..." - db.commit() - - # Parse JSON output - try: - lines = result.stdout.strip().split("\n") - json_start = -1 - for i, line in enumerate(lines): - if line.strip().startswith("{"): - json_start = i - break - - if json_start == -1: - raise ValueError("No JSON output found in validator output") - - json_output = "\n".join(lines[json_start:]) - data = json.loads(json_output) - except (json.JSONDecodeError, ValueError) as e: - logger.error(f"Failed to parse {validator_name} validator output: {e}") - scan.status = "failed" - scan.error_message = f"Failed to parse validator output: {e}" - scan.completed_at = datetime.now(UTC) - scan.duration_seconds = duration - db.commit() - return {"error": str(e)} - - # Update progress - scan.progress_message = "Storing violations..." - db.commit() - - # Create violation records - violations_data = data.get("violations", []) - logger.info(f"Creating {len(violations_data)} {validator_name} violation records") - - for v in violations_data: - violation = ArchitectureViolation( - scan_id=scan.id, - validator_type=validator_type, - rule_id=v.get("rule_id", "UNKNOWN"), - rule_name=v.get("rule_name", "Unknown Rule"), - severity=v.get("severity", "warning"), - file_path=v.get("file_path", ""), - line_number=v.get("line_number", 0), - message=v.get("message", ""), - context=v.get("context", ""), - suggestion=v.get("suggestion", ""), - status="open", - ) - db.add(violation) - - # Update scan with results - scan.total_files = data.get("files_checked", 0) - scan.total_violations = data.get("total_violations", len(violations_data)) - scan.errors = data.get("errors", 0) - scan.warnings = data.get("warnings", 0) - scan.duration_seconds = duration - scan.completed_at = datetime.now(UTC) - scan.progress_message = None - - # Set final status based on results - if scan.errors > 0: - scan.status = "completed_with_warnings" - else: - scan.status = "completed" - - db.commit() - - logger.info( - f"{validator_name} scan {scan_id} completed: " - f"files={scan.total_files}, violations={scan.total_violations}, " - f"errors={scan.errors}, warnings={scan.warnings}, " - f"duration={duration:.1f}s" - ) - - return { - "scan_id": scan_id, - "validator_type": validator_type, - "status": scan.status, - "total_files": scan.total_files, - "total_violations": scan.total_violations, - "errors": scan.errors, - "warnings": scan.warnings, - "duration_seconds": duration, - } - - except Exception as e: - logger.error(f"Code quality scan {scan_id} failed: {e}", exc_info=True) - scan.status = "failed" - scan.error_message = str(e)[:500] - scan.completed_at = datetime.now(UTC) - scan.progress_message = None - - # Create admin notification for scan failure - admin_notification_service.create_notification( - db=db, - title="Code Quality Scan Failed", - message=f"{VALIDATOR_NAMES.get(scan.validator_type, 'Unknown')} scan failed: {str(e)[:200]}", - notification_type="error", - category="code_quality", - action_url="/admin/code-quality", - ) - - db.commit() - raise # Re-raise for Celery +__all__ = [ + "execute_code_quality_scan", + "VALIDATOR_ARCHITECTURE", + "VALIDATOR_SECURITY", + "VALIDATOR_PERFORMANCE", + "VALID_VALIDATOR_TYPES", + "VALIDATOR_SCRIPTS", + "VALIDATOR_NAMES", +] diff --git a/app/tasks/celery_tasks/test_runner.py b/app/tasks/celery_tasks/test_runner.py index 4bf29384..e6fa44c9 100644 --- a/app/tasks/celery_tasks/test_runner.py +++ b/app/tasks/celery_tasks/test_runner.py @@ -1,83 +1,15 @@ # app/tasks/celery_tasks/test_runner.py """ -Celery tasks for test execution. +Celery tasks for test execution - LEGACY LOCATION -Wraps the existing execute_test_run function for Celery execution. +This file exists for backward compatibility. +The canonical location is now: app/modules/dev_tools/tasks/test_runner.py + +All imports should use the new location: + from app.modules.dev_tools.tasks import execute_test_run """ -import logging +# Re-export from canonical location for backward compatibility +from app.modules.dev_tools.tasks.test_runner import execute_test_run -from app.core.celery_config import celery_app -from app.services.test_runner_service import test_runner_service -from app.tasks.celery_tasks.base import DatabaseTask -from models.database.test_run import TestRun - -logger = logging.getLogger(__name__) - - -@celery_app.task( - bind=True, - base=DatabaseTask, - name="app.tasks.celery_tasks.test_runner.execute_test_run", - max_retries=1, - time_limit=3600, # 1 hour hard limit - soft_time_limit=3300, # 55 minutes soft limit -) -def execute_test_run( - self, - run_id: int, - test_path: str = "tests", - extra_args: list[str] | None = None, -): - """ - Celery task to execute pytest tests. - - Args: - run_id: ID of the TestRun record - test_path: Path to tests (relative to project root) - extra_args: Additional pytest arguments - - Returns: - dict: Test run results summary - """ - with self.get_db() as db: - # Get the test run record - test_run = db.query(TestRun).filter(TestRun.id == run_id).first() - if not test_run: - logger.error(f"Test run {run_id} not found") - return {"error": f"Test run {run_id} not found"} - - # Store Celery task ID - test_run.celery_task_id = self.request.id - db.commit() - - try: - logger.info(f"Starting test execution: Run {run_id}, Path: {test_path}") - - # Execute the tests - test_runner_service._execute_tests(db, test_run, test_path, extra_args) - db.commit() - - logger.info( - f"Test run {run_id} completed: " - f"status={test_run.status}, passed={test_run.passed}, " - f"failed={test_run.failed}, duration={test_run.duration_seconds:.1f}s" - ) - - return { - "run_id": run_id, - "status": test_run.status, - "total_tests": test_run.total_tests, - "passed": test_run.passed, - "failed": test_run.failed, - "errors": test_run.errors, - "skipped": test_run.skipped, - "coverage_percent": test_run.coverage_percent, - "duration_seconds": test_run.duration_seconds, - } - - except Exception as e: - logger.error(f"Test run {run_id} failed: {e}", exc_info=True) - test_run.status = "error" - db.commit() - raise # Re-raise for Celery +__all__ = ["execute_test_run"]