diff --git a/app/services/admin_notification_service.py b/app/services/admin_notification_service.py index 704669cf..5daf9ee1 100644 --- a/app/services/admin_notification_service.py +++ b/app/services/admin_notification_service.py @@ -12,7 +12,7 @@ import logging from datetime import datetime, timedelta from typing import Any -from sqlalchemy import and_, func, or_ +from sqlalchemy import and_, case, func, or_ from sqlalchemy.orm import Session from models.database.admin import AdminNotification, PlatformAlert @@ -176,7 +176,7 @@ class AdminNotificationService: ) # Get paginated results ordered by priority and date - priority_order = func.case( + priority_order = case( (AdminNotification.priority == "critical", 1), (AdminNotification.priority == "high", 2), (AdminNotification.priority == "normal", 3), @@ -211,7 +211,7 @@ class AdminNotificationService: limit: int = 5, ) -> list[AdminNotification]: """Get recent unread notifications for header dropdown.""" - priority_order = func.case( + priority_order = case( (AdminNotification.priority == "critical", 1), (AdminNotification.priority == "high", 2), (AdminNotification.priority == "normal", 3), @@ -554,7 +554,7 @@ class PlatformAlertService: ) # Get paginated results - severity_order = func.case( + severity_order = case( (PlatformAlert.severity == "critical", 1), (PlatformAlert.severity == "error", 2), (PlatformAlert.severity == "warning", 3), diff --git a/tests/unit/services/test_admin_notification_service.py b/tests/unit/services/test_admin_notification_service.py new file mode 100644 index 00000000..bb4f160d --- /dev/null +++ b/tests/unit/services/test_admin_notification_service.py @@ -0,0 +1,772 @@ +# tests/unit/services/test_admin_notification_service.py +""" +Unit tests for AdminNotificationService and PlatformAlertService. +""" + +from datetime import datetime, timedelta + +import pytest + +from app.services.admin_notification_service import ( + AdminNotificationService, + AlertType, + NotificationType, + PlatformAlertService, + Priority, + Severity, +) +from models.database.admin import AdminNotification, PlatformAlert + + +@pytest.fixture +def notification_service(): + """Create AdminNotificationService instance.""" + return AdminNotificationService() + + +@pytest.fixture +def alert_service(): + """Create PlatformAlertService instance.""" + return PlatformAlertService() + + +# ============================================================================ +# ADMIN NOTIFICATION SERVICE TESTS +# ============================================================================ + + +@pytest.mark.unit +class TestAdminNotificationServiceCreate: + """Tests for notification creation.""" + + def test_create_notification_basic(self, db, notification_service): + """Test creating a basic notification.""" + notification = notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Test Alert", + message="This is a test notification.", + ) + db.commit() + + assert notification.id is not None + assert notification.type == NotificationType.SYSTEM_ALERT + assert notification.title == "Test Alert" + assert notification.message == "This is a test notification." + assert notification.priority == Priority.NORMAL + assert notification.action_required is False + assert notification.is_read is False + + def test_create_notification_with_priority(self, db, notification_service): + """Test creating notification with custom priority.""" + notification = notification_service.create_notification( + db=db, + notification_type=NotificationType.CRITICAL_ERROR, + title="Critical Error", + message="Something went wrong", + priority=Priority.CRITICAL, + ) + db.commit() + + assert notification.priority == Priority.CRITICAL + + def test_create_notification_with_action(self, db, notification_service): + """Test creating notification with action URL.""" + notification = notification_service.create_notification( + db=db, + notification_type=NotificationType.IMPORT_FAILURE, + title="Import Failed", + message="Failed to import products", + action_required=True, + action_url="/admin/marketplace", + ) + db.commit() + + assert notification.action_required is True + assert notification.action_url == "/admin/marketplace" + + def test_create_notification_with_metadata(self, db, notification_service): + """Test creating notification with metadata.""" + metadata = {"vendor_id": 1, "job_id": 42} + notification = notification_service.create_notification( + db=db, + notification_type=NotificationType.IMPORT_FAILURE, + title="Import Failed", + message="Error message", + metadata=metadata, + ) + db.commit() + + assert notification.notification_metadata == metadata + + +@pytest.mark.unit +class TestAdminNotificationServiceConvenience: + """Tests for convenience notification methods.""" + + def test_notify_import_failure(self, db, notification_service): + """Test import failure notification creation.""" + notification = notification_service.notify_import_failure( + db=db, + vendor_name="Test Vendor", + job_id=123, + error_message="Connection failed", + vendor_id=1, + ) + db.commit() + + assert notification.type == NotificationType.IMPORT_FAILURE + assert notification.title == "Import Failed: Test Vendor" + assert notification.message == "Connection failed" + assert notification.priority == Priority.HIGH + assert notification.action_required is True + assert "vendor_id=1" in notification.action_url + + def test_notify_order_sync_failure(self, db, notification_service): + """Test order sync failure notification.""" + notification = notification_service.notify_order_sync_failure( + db=db, + vendor_name="Test Vendor", + error_message="API timeout", + vendor_id=5, + ) + db.commit() + + assert notification.type == NotificationType.ORDER_SYNC_FAILURE + assert notification.title == "Order Sync Failed: Test Vendor" + assert notification.priority == Priority.HIGH + + def test_notify_order_exception(self, db, notification_service): + """Test order exception notification.""" + notification = notification_service.notify_order_exception( + db=db, + vendor_name="Test Vendor", + order_number="ORD-12345", + exception_count=3, + vendor_id=2, + ) + db.commit() + + assert notification.type == NotificationType.ORDER_EXCEPTION + assert "3 item(s)" in notification.message + assert notification.priority == Priority.NORMAL + + def test_notify_critical_error(self, db, notification_service): + """Test critical error notification.""" + details = {"stack_trace": "line 1\nline 2"} + notification = notification_service.notify_critical_error( + db=db, + error_type="Database Error", + error_message="Connection pool exhausted", + details=details, + ) + db.commit() + + assert notification.type == NotificationType.CRITICAL_ERROR + assert notification.priority == Priority.CRITICAL + assert notification.notification_metadata == details + + def test_notify_vendor_issue(self, db, notification_service): + """Test vendor issue notification.""" + notification = notification_service.notify_vendor_issue( + db=db, + vendor_name="Bad Vendor", + issue_type="payment", + message="Payment method expired", + vendor_id=10, + ) + db.commit() + + assert notification.type == NotificationType.VENDOR_ISSUE + assert "Bad Vendor" in notification.title + assert notification.priority == Priority.HIGH + + def test_notify_security_alert(self, db, notification_service): + """Test security alert notification.""" + notification = notification_service.notify_security_alert( + db=db, + title="Suspicious Login Attempt", + message="Multiple failed logins detected", + details={"ip_address": "1.2.3.4"}, + ) + db.commit() + + assert notification.type == NotificationType.SECURITY_ALERT + assert notification.priority == Priority.CRITICAL + assert notification.action_url == "/admin/audit" + + +@pytest.mark.unit +class TestAdminNotificationServiceQuery: + """Tests for notification query methods.""" + + def test_get_notifications_empty(self, db, notification_service): + """Test getting notifications when none exist.""" + notifications, total, unread = notification_service.get_notifications(db) + + assert notifications == [] + assert total == 0 + assert unread == 0 + + def test_get_notifications_with_data(self, db, notification_service): + """Test getting notifications with data.""" + # Create some notifications + notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Alert 1", + message="Message 1", + ) + notification_service.create_notification( + db=db, + notification_type=NotificationType.IMPORT_FAILURE, + title="Alert 2", + message="Message 2", + priority=Priority.HIGH, + ) + db.commit() + + notifications, total, unread = notification_service.get_notifications(db) + + assert total == 2 + assert unread == 2 + assert len(notifications) == 2 + + def test_get_notifications_filter_priority(self, db, notification_service): + """Test filtering by priority.""" + notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Normal", + message="Normal priority", + priority=Priority.NORMAL, + ) + notification_service.create_notification( + db=db, + notification_type=NotificationType.CRITICAL_ERROR, + title="Critical", + message="Critical priority", + priority=Priority.CRITICAL, + ) + db.commit() + + notifications, total, _ = notification_service.get_notifications( + db, priority=Priority.CRITICAL + ) + + assert total == 1 + assert notifications[0].title == "Critical" + + def test_get_notifications_filter_read(self, db, notification_service, test_admin): + """Test filtering by read status.""" + n1 = notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Unread", + message="Not read yet", + ) + n2 = notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Read", + message="Already read", + ) + db.commit() + + # Mark one as read + notification_service.mark_as_read(db, n2.id, test_admin.id) + db.commit() + + # Get unread only + notifications, total, _ = notification_service.get_notifications( + db, is_read=False + ) + assert total == 1 + assert notifications[0].title == "Unread" + + # Get read only + notifications, total, _ = notification_service.get_notifications( + db, is_read=True + ) + assert total == 1 + assert notifications[0].title == "Read" + + def test_get_notifications_pagination(self, db, notification_service): + """Test pagination of notifications.""" + for i in range(10): + notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title=f"Alert {i}", + message=f"Message {i}", + ) + db.commit() + + notifications, total, _ = notification_service.get_notifications( + db, skip=0, limit=3 + ) + assert len(notifications) == 3 + assert total == 10 + + notifications, total, _ = notification_service.get_notifications( + db, skip=8, limit=3 + ) + assert len(notifications) == 2 # Only 2 remaining + + def test_get_unread_count(self, db, notification_service): + """Test unread count.""" + notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Alert 1", + message="Message 1", + ) + notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Alert 2", + message="Message 2", + ) + db.commit() + + assert notification_service.get_unread_count(db) == 2 + + def test_get_recent_notifications(self, db, notification_service): + """Test getting recent unread notifications.""" + for i in range(10): + notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title=f"Alert {i}", + message=f"Message {i}", + ) + db.commit() + + recent = notification_service.get_recent_notifications(db, limit=5) + assert len(recent) == 5 + + +@pytest.mark.unit +class TestAdminNotificationServiceActions: + """Tests for notification action methods.""" + + def test_mark_as_read(self, db, notification_service, test_admin): + """Test marking notification as read.""" + notification = notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Test", + message="Test message", + ) + db.commit() + + assert notification.is_read is False + + result = notification_service.mark_as_read(db, notification.id, test_admin.id) + db.commit() + + assert result.is_read is True + assert result.read_at is not None + assert result.read_by_user_id == test_admin.id + + def test_mark_as_read_already_read(self, db, notification_service, test_admin): + """Test marking already-read notification.""" + notification = notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Test", + message="Test message", + ) + db.commit() + + notification_service.mark_as_read(db, notification.id, test_admin.id) + db.commit() + + original_read_at = notification.read_at + + # Try to mark as read again + result = notification_service.mark_as_read(db, notification.id, test_admin.id) + db.commit() + + # Should not update timestamp + assert result.read_at == original_read_at + + def test_mark_as_read_not_found(self, db, notification_service, test_admin): + """Test marking nonexistent notification.""" + result = notification_service.mark_as_read(db, 99999, test_admin.id) + assert result is None + + def test_mark_all_as_read(self, db, notification_service, test_admin): + """Test marking all notifications as read.""" + for i in range(5): + notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title=f"Alert {i}", + message=f"Message {i}", + ) + db.commit() + + assert notification_service.get_unread_count(db) == 5 + + count = notification_service.mark_all_as_read(db, test_admin.id) + db.commit() + + assert count == 5 + assert notification_service.get_unread_count(db) == 0 + + def test_delete_notification(self, db, notification_service): + """Test deleting notification.""" + notification = notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Test", + message="Test message", + ) + db.commit() + + notification_id = notification.id + result = notification_service.delete_notification(db, notification_id) + db.commit() + + assert result is True + + # Verify deleted + notifications, total, _ = notification_service.get_notifications(db) + assert total == 0 + + def test_delete_notification_not_found(self, db, notification_service): + """Test deleting nonexistent notification.""" + result = notification_service.delete_notification(db, 99999) + assert result is False + + def test_delete_old_notifications(self, db, notification_service, test_admin): + """Test deleting old read notifications.""" + # Create old read notification + old_notification = notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Old", + message="Old message", + ) + db.commit() + + notification_service.mark_as_read(db, old_notification.id, test_admin.id) + db.commit() + + # Manually backdate it + old_notification.created_at = datetime.utcnow() - timedelta(days=45) + db.commit() + + # Create recent notification + notification_service.create_notification( + db=db, + notification_type=NotificationType.SYSTEM_ALERT, + title="Recent", + message="Recent message", + ) + db.commit() + + # Delete old notifications + count = notification_service.delete_old_notifications(db, days=30) + db.commit() + + assert count == 1 + notifications, total, _ = notification_service.get_notifications(db) + assert total == 1 + assert notifications[0].title == "Recent" + + +# ============================================================================ +# PLATFORM ALERT SERVICE TESTS +# ============================================================================ + + +@pytest.mark.unit +class TestPlatformAlertServiceCreate: + """Tests for platform alert creation.""" + + def test_create_alert_basic(self, db, alert_service): + """Test creating a basic platform alert.""" + alert = alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.WARNING, + title="Test Alert", + ) + db.commit() + + assert alert.id is not None + assert alert.alert_type == AlertType.SYSTEM + assert alert.severity == Severity.WARNING + assert alert.title == "Test Alert" + assert alert.is_resolved is False + assert alert.occurrence_count == 1 + + def test_create_alert_with_details(self, db, alert_service): + """Test creating alert with full details.""" + alert = alert_service.create_alert( + db=db, + alert_type=AlertType.DATABASE, + severity=Severity.CRITICAL, + title="Database Connection Issue", + description="Connection pool exhausted", + affected_vendors=[1, 2, 3], + affected_systems=["api", "worker"], + auto_generated=True, + ) + db.commit() + + assert alert.description == "Connection pool exhausted" + assert alert.affected_vendors == [1, 2, 3] + assert alert.affected_systems == ["api", "worker"] + assert alert.auto_generated is True + + +@pytest.mark.unit +class TestPlatformAlertServiceQuery: + """Tests for platform alert query methods.""" + + def test_get_alerts_empty(self, db, alert_service): + """Test getting alerts when none exist.""" + alerts, total, active, critical = alert_service.get_alerts(db) + + assert alerts == [] + assert total == 0 + assert active == 0 + assert critical == 0 + + def test_get_alerts_with_data(self, db, alert_service): + """Test getting alerts with data.""" + alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.WARNING, + title="Warning Alert", + ) + alert_service.create_alert( + db=db, + alert_type=AlertType.SECURITY, + severity=Severity.CRITICAL, + title="Critical Alert", + ) + db.commit() + + alerts, total, active, critical = alert_service.get_alerts(db) + + assert total == 2 + assert active == 2 + assert critical == 1 + + def test_get_alerts_filter_severity(self, db, alert_service): + """Test filtering by severity.""" + alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.WARNING, + title="Warning", + ) + alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.CRITICAL, + title="Critical", + ) + db.commit() + + alerts, total, _, _ = alert_service.get_alerts(db, severity=Severity.CRITICAL) + assert total == 1 + assert alerts[0].title == "Critical" + + def test_get_alerts_filter_type(self, db, alert_service): + """Test filtering by alert type.""" + alert_service.create_alert( + db=db, + alert_type=AlertType.SECURITY, + severity=Severity.WARNING, + title="Security", + ) + alert_service.create_alert( + db=db, + alert_type=AlertType.DATABASE, + severity=Severity.WARNING, + title="Database", + ) + db.commit() + + alerts, total, _, _ = alert_service.get_alerts(db, alert_type=AlertType.SECURITY) + assert total == 1 + assert alerts[0].title == "Security" + + def test_get_statistics(self, db, alert_service, test_admin): + """Test getting alert statistics.""" + # Create some alerts + alert1 = alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.WARNING, + title="Active Warning", + ) + alert2 = alert_service.create_alert( + db=db, + alert_type=AlertType.SECURITY, + severity=Severity.CRITICAL, + title="Active Critical", + ) + alert3 = alert_service.create_alert( + db=db, + alert_type=AlertType.DATABASE, + severity=Severity.INFO, + title="Resolved", + ) + db.commit() + + # Resolve one today + alert_service.resolve_alert(db, alert3.id, test_admin.id) + db.commit() + + stats = alert_service.get_statistics(db) + + assert stats["total_alerts"] == 3 + assert stats["active_alerts"] == 2 + assert stats["critical_alerts"] == 1 + assert stats["resolved_today"] == 1 + + +@pytest.mark.unit +class TestPlatformAlertServiceActions: + """Tests for platform alert action methods.""" + + def test_resolve_alert(self, db, alert_service, test_admin): + """Test resolving an alert.""" + alert = alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.WARNING, + title="Test Alert", + ) + db.commit() + + result = alert_service.resolve_alert( + db, + alert.id, + test_admin.id, + resolution_notes="Fixed by restarting service", + ) + db.commit() + + assert result.is_resolved is True + assert result.resolved_at is not None + assert result.resolved_by_user_id == test_admin.id + assert result.resolution_notes == "Fixed by restarting service" + + def test_resolve_alert_already_resolved(self, db, alert_service, test_admin): + """Test resolving already-resolved alert.""" + alert = alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.WARNING, + title="Test Alert", + ) + db.commit() + + alert_service.resolve_alert(db, alert.id, test_admin.id) + db.commit() + + original_resolved_at = alert.resolved_at + + # Try resolving again + result = alert_service.resolve_alert(db, alert.id, test_admin.id) + + # Should not update + assert result.resolved_at == original_resolved_at + + def test_resolve_alert_not_found(self, db, alert_service, test_admin): + """Test resolving nonexistent alert.""" + result = alert_service.resolve_alert(db, 99999, test_admin.id) + assert result is None + + def test_increment_occurrence(self, db, alert_service): + """Test incrementing alert occurrence count.""" + alert = alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.WARNING, + title="Recurring Alert", + ) + db.commit() + + original_occurred_at = alert.last_occurred_at + assert alert.occurrence_count == 1 + + result = alert_service.increment_occurrence(db, alert.id) + db.commit() + + assert result.occurrence_count == 2 + assert result.last_occurred_at > original_occurred_at + + def test_find_similar_active_alert(self, db, alert_service, test_admin): + """Test finding similar active alert.""" + alert = alert_service.create_alert( + db=db, + alert_type=AlertType.SYSTEM, + severity=Severity.WARNING, + title="Duplicate Alert", + ) + db.commit() + + # Should find the active alert + found = alert_service.find_similar_active_alert( + db, AlertType.SYSTEM, "Duplicate Alert" + ) + assert found is not None + assert found.id == alert.id + + # Resolve the alert + alert_service.resolve_alert(db, alert.id, test_admin.id) + db.commit() + + # Should not find resolved alert + found = alert_service.find_similar_active_alert( + db, AlertType.SYSTEM, "Duplicate Alert" + ) + assert found is None + + def test_create_or_increment_alert_new(self, db, alert_service): + """Test creating new alert when none exists.""" + alert = alert_service.create_or_increment_alert( + db=db, + alert_type=AlertType.PERFORMANCE, + severity=Severity.WARNING, + title="Performance Issue", + description="High CPU usage", + ) + db.commit() + + assert alert.occurrence_count == 1 + assert alert.title == "Performance Issue" + + def test_create_or_increment_alert_existing(self, db, alert_service): + """Test incrementing existing alert.""" + # Create initial alert + alert = alert_service.create_or_increment_alert( + db=db, + alert_type=AlertType.PERFORMANCE, + severity=Severity.WARNING, + title="Performance Issue", + ) + db.commit() + alert_id = alert.id + + # Call again with same type/title + result = alert_service.create_or_increment_alert( + db=db, + alert_type=AlertType.PERFORMANCE, + severity=Severity.WARNING, + title="Performance Issue", + ) + db.commit() + + # Should be same alert with incremented count + assert result.id == alert_id + assert result.occurrence_count == 2