- Auto-fixed 4,496 lint issues (import sorting, modern syntax, etc.) - Added ignore rules for patterns intentional in this codebase: E402 (late imports), E712 (SQLAlchemy filters), B904 (raise from), SIM108/SIM105/SIM117 (readability preferences) - Added per-file ignores for tests and scripts - Excluded broken scripts/rename_terminology.py (has curly quotes) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
772 lines
25 KiB
Python
772 lines
25 KiB
Python
# tests/unit/services/test_admin_notification_service.py
|
|
"""
|
|
Unit tests for AdminNotificationService and PlatformAlertService.
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
import pytest
|
|
|
|
from app.modules.messaging.services.admin_notification_service import (
|
|
AdminNotificationService,
|
|
AlertType,
|
|
NotificationType,
|
|
PlatformAlertService,
|
|
Priority,
|
|
Severity,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def notification_service():
|
|
"""Create AdminNotificationService instance."""
|
|
return AdminNotificationService()
|
|
|
|
|
|
@pytest.fixture
|
|
def alert_service():
|
|
"""Create PlatformAlertService instance."""
|
|
return PlatformAlertService()
|
|
|
|
|
|
# ============================================================================
|
|
# ADMIN NOTIFICATION SERVICE TESTS
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestAdminNotificationServiceCreate:
|
|
"""Tests for notification creation."""
|
|
|
|
def test_create_notification_basic(self, db, notification_service):
|
|
"""Test creating a basic notification."""
|
|
notification = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Test Alert",
|
|
message="This is a test notification.",
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.id is not None
|
|
assert notification.type == NotificationType.SYSTEM_ALERT
|
|
assert notification.title == "Test Alert"
|
|
assert notification.message == "This is a test notification."
|
|
assert notification.priority == Priority.NORMAL
|
|
assert notification.action_required is False
|
|
assert notification.is_read is False
|
|
|
|
def test_create_notification_with_priority(self, db, notification_service):
|
|
"""Test creating notification with custom priority."""
|
|
notification = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.CRITICAL_ERROR,
|
|
title="Critical Error",
|
|
message="Something went wrong",
|
|
priority=Priority.CRITICAL,
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.priority == Priority.CRITICAL
|
|
|
|
def test_create_notification_with_action(self, db, notification_service):
|
|
"""Test creating notification with action URL."""
|
|
notification = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.IMPORT_FAILURE,
|
|
title="Import Failed",
|
|
message="Failed to import products",
|
|
action_required=True,
|
|
action_url="/admin/marketplace",
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.action_required is True
|
|
assert notification.action_url == "/admin/marketplace"
|
|
|
|
def test_create_notification_with_metadata(self, db, notification_service):
|
|
"""Test creating notification with metadata."""
|
|
metadata = {"store_id": 1, "job_id": 42}
|
|
notification = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.IMPORT_FAILURE,
|
|
title="Import Failed",
|
|
message="Error message",
|
|
metadata=metadata,
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.notification_metadata == metadata
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestAdminNotificationServiceConvenience:
|
|
"""Tests for convenience notification methods."""
|
|
|
|
def test_notify_import_failure(self, db, notification_service):
|
|
"""Test import failure notification creation."""
|
|
notification = notification_service.notify_import_failure(
|
|
db=db,
|
|
store_name="Test Store",
|
|
job_id=123,
|
|
error_message="Connection failed",
|
|
store_id=1,
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.type == NotificationType.IMPORT_FAILURE
|
|
assert notification.title == "Import Failed: Test Store"
|
|
assert notification.message == "Connection failed"
|
|
assert notification.priority == Priority.HIGH
|
|
assert notification.action_required is True
|
|
assert "store_id=1" in notification.action_url
|
|
|
|
def test_notify_order_sync_failure(self, db, notification_service):
|
|
"""Test order sync failure notification."""
|
|
notification = notification_service.notify_order_sync_failure(
|
|
db=db,
|
|
store_name="Test Store",
|
|
error_message="API timeout",
|
|
store_id=5,
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.type == NotificationType.ORDER_SYNC_FAILURE
|
|
assert notification.title == "Order Sync Failed: Test Store"
|
|
assert notification.priority == Priority.HIGH
|
|
|
|
def test_notify_order_exception(self, db, notification_service):
|
|
"""Test order exception notification."""
|
|
notification = notification_service.notify_order_exception(
|
|
db=db,
|
|
store_name="Test Store",
|
|
order_number="ORD-12345",
|
|
exception_count=3,
|
|
store_id=2,
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.type == NotificationType.ORDER_EXCEPTION
|
|
assert "3 item(s)" in notification.message
|
|
assert notification.priority == Priority.NORMAL
|
|
|
|
def test_notify_critical_error(self, db, notification_service):
|
|
"""Test critical error notification."""
|
|
details = {"stack_trace": "line 1\nline 2"}
|
|
notification = notification_service.notify_critical_error(
|
|
db=db,
|
|
error_type="Database Error",
|
|
error_message="Connection pool exhausted",
|
|
details=details,
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.type == NotificationType.CRITICAL_ERROR
|
|
assert notification.priority == Priority.CRITICAL
|
|
assert notification.notification_metadata == details
|
|
|
|
def test_notify_store_issue(self, db, notification_service):
|
|
"""Test store issue notification."""
|
|
notification = notification_service.notify_store_issue(
|
|
db=db,
|
|
store_name="Bad Store",
|
|
issue_type="payment",
|
|
message="Payment method expired",
|
|
store_id=10,
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.type == NotificationType.STORE_ISSUE
|
|
assert "Bad Store" in notification.title
|
|
assert notification.priority == Priority.HIGH
|
|
|
|
def test_notify_security_alert(self, db, notification_service):
|
|
"""Test security alert notification."""
|
|
notification = notification_service.notify_security_alert(
|
|
db=db,
|
|
title="Suspicious Login Attempt",
|
|
message="Multiple failed logins detected",
|
|
details={"ip_address": "1.2.3.4"},
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.type == NotificationType.SECURITY_ALERT
|
|
assert notification.priority == Priority.CRITICAL
|
|
assert notification.action_url == "/admin/audit"
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestAdminNotificationServiceQuery:
|
|
"""Tests for notification query methods."""
|
|
|
|
def test_get_notifications_empty(self, db, notification_service):
|
|
"""Test getting notifications when none exist."""
|
|
notifications, total, unread = notification_service.get_notifications(db)
|
|
|
|
assert notifications == []
|
|
assert total == 0
|
|
assert unread == 0
|
|
|
|
def test_get_notifications_with_data(self, db, notification_service):
|
|
"""Test getting notifications with data."""
|
|
# Create some notifications
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Alert 1",
|
|
message="Message 1",
|
|
)
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.IMPORT_FAILURE,
|
|
title="Alert 2",
|
|
message="Message 2",
|
|
priority=Priority.HIGH,
|
|
)
|
|
db.commit()
|
|
|
|
notifications, total, unread = notification_service.get_notifications(db)
|
|
|
|
assert total == 2
|
|
assert unread == 2
|
|
assert len(notifications) == 2
|
|
|
|
def test_get_notifications_filter_priority(self, db, notification_service):
|
|
"""Test filtering by priority."""
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Normal",
|
|
message="Normal priority",
|
|
priority=Priority.NORMAL,
|
|
)
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.CRITICAL_ERROR,
|
|
title="Critical",
|
|
message="Critical priority",
|
|
priority=Priority.CRITICAL,
|
|
)
|
|
db.commit()
|
|
|
|
notifications, total, _ = notification_service.get_notifications(
|
|
db, priority=Priority.CRITICAL
|
|
)
|
|
|
|
assert total == 1
|
|
assert notifications[0].title == "Critical"
|
|
|
|
def test_get_notifications_filter_read(self, db, notification_service, test_admin):
|
|
"""Test filtering by read status."""
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Unread",
|
|
message="Not read yet",
|
|
)
|
|
n2 = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Read",
|
|
message="Already read",
|
|
)
|
|
db.commit()
|
|
|
|
# Mark one as read
|
|
notification_service.mark_as_read(db, n2.id, test_admin.id)
|
|
db.commit()
|
|
|
|
# Get unread only
|
|
notifications, total, _ = notification_service.get_notifications(
|
|
db, is_read=False
|
|
)
|
|
assert total == 1
|
|
assert notifications[0].title == "Unread"
|
|
|
|
# Get read only
|
|
notifications, total, _ = notification_service.get_notifications(
|
|
db, is_read=True
|
|
)
|
|
assert total == 1
|
|
assert notifications[0].title == "Read"
|
|
|
|
def test_get_notifications_pagination(self, db, notification_service):
|
|
"""Test pagination of notifications."""
|
|
for i in range(10):
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title=f"Alert {i}",
|
|
message=f"Message {i}",
|
|
)
|
|
db.commit()
|
|
|
|
notifications, total, _ = notification_service.get_notifications(
|
|
db, skip=0, limit=3
|
|
)
|
|
assert len(notifications) == 3
|
|
assert total == 10
|
|
|
|
notifications, total, _ = notification_service.get_notifications(
|
|
db, skip=8, limit=3
|
|
)
|
|
assert len(notifications) == 2 # Only 2 remaining
|
|
|
|
def test_get_unread_count(self, db, notification_service):
|
|
"""Test unread count."""
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Alert 1",
|
|
message="Message 1",
|
|
)
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Alert 2",
|
|
message="Message 2",
|
|
)
|
|
db.commit()
|
|
|
|
assert notification_service.get_unread_count(db) == 2
|
|
|
|
def test_get_recent_notifications(self, db, notification_service):
|
|
"""Test getting recent unread notifications."""
|
|
for i in range(10):
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title=f"Alert {i}",
|
|
message=f"Message {i}",
|
|
)
|
|
db.commit()
|
|
|
|
recent = notification_service.get_recent_notifications(db, limit=5)
|
|
assert len(recent) == 5
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestAdminNotificationServiceActions:
|
|
"""Tests for notification action methods."""
|
|
|
|
def test_mark_as_read(self, db, notification_service, test_admin):
|
|
"""Test marking notification as read."""
|
|
notification = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Test",
|
|
message="Test message",
|
|
)
|
|
db.commit()
|
|
|
|
assert notification.is_read is False
|
|
|
|
result = notification_service.mark_as_read(db, notification.id, test_admin.id)
|
|
db.commit()
|
|
|
|
assert result.is_read is True
|
|
assert result.read_at is not None
|
|
assert result.read_by_user_id == test_admin.id
|
|
|
|
def test_mark_as_read_already_read(self, db, notification_service, test_admin):
|
|
"""Test marking already-read notification."""
|
|
notification = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Test",
|
|
message="Test message",
|
|
)
|
|
db.commit()
|
|
|
|
notification_service.mark_as_read(db, notification.id, test_admin.id)
|
|
db.commit()
|
|
|
|
original_read_at = notification.read_at
|
|
|
|
# Try to mark as read again
|
|
result = notification_service.mark_as_read(db, notification.id, test_admin.id)
|
|
db.commit()
|
|
|
|
# Should not update timestamp
|
|
assert result.read_at == original_read_at
|
|
|
|
def test_mark_as_read_not_found(self, db, notification_service, test_admin):
|
|
"""Test marking nonexistent notification."""
|
|
result = notification_service.mark_as_read(db, 99999, test_admin.id)
|
|
assert result is None
|
|
|
|
def test_mark_all_as_read(self, db, notification_service, test_admin):
|
|
"""Test marking all notifications as read."""
|
|
for i in range(5):
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title=f"Alert {i}",
|
|
message=f"Message {i}",
|
|
)
|
|
db.commit()
|
|
|
|
assert notification_service.get_unread_count(db) == 5
|
|
|
|
count = notification_service.mark_all_as_read(db, test_admin.id)
|
|
db.commit()
|
|
|
|
assert count == 5
|
|
assert notification_service.get_unread_count(db) == 0
|
|
|
|
def test_delete_notification(self, db, notification_service):
|
|
"""Test deleting notification."""
|
|
notification = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Test",
|
|
message="Test message",
|
|
)
|
|
db.commit()
|
|
|
|
notification_id = notification.id
|
|
result = notification_service.delete_notification(db, notification_id)
|
|
db.commit()
|
|
|
|
assert result is True
|
|
|
|
# Verify deleted
|
|
notifications, total, _ = notification_service.get_notifications(db)
|
|
assert total == 0
|
|
|
|
def test_delete_notification_not_found(self, db, notification_service):
|
|
"""Test deleting nonexistent notification."""
|
|
result = notification_service.delete_notification(db, 99999)
|
|
assert result is False
|
|
|
|
def test_delete_old_notifications(self, db, notification_service, test_admin):
|
|
"""Test deleting old read notifications."""
|
|
# Create old read notification
|
|
old_notification = notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Old",
|
|
message="Old message",
|
|
)
|
|
db.commit()
|
|
|
|
notification_service.mark_as_read(db, old_notification.id, test_admin.id)
|
|
db.commit()
|
|
|
|
# Manually backdate it
|
|
old_notification.created_at = datetime.utcnow() - timedelta(days=45)
|
|
db.commit()
|
|
|
|
# Create recent notification
|
|
notification_service.create_notification(
|
|
db=db,
|
|
notification_type=NotificationType.SYSTEM_ALERT,
|
|
title="Recent",
|
|
message="Recent message",
|
|
)
|
|
db.commit()
|
|
|
|
# Delete old notifications
|
|
count = notification_service.delete_old_notifications(db, days=30)
|
|
db.commit()
|
|
|
|
assert count == 1
|
|
notifications, total, _ = notification_service.get_notifications(db)
|
|
assert total == 1
|
|
assert notifications[0].title == "Recent"
|
|
|
|
|
|
# ============================================================================
|
|
# PLATFORM ALERT SERVICE TESTS
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestPlatformAlertServiceCreate:
|
|
"""Tests for platform alert creation."""
|
|
|
|
def test_create_alert_basic(self, db, alert_service):
|
|
"""Test creating a basic platform alert."""
|
|
alert = alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.WARNING,
|
|
title="Test Alert",
|
|
)
|
|
db.commit()
|
|
|
|
assert alert.id is not None
|
|
assert alert.alert_type == AlertType.SYSTEM
|
|
assert alert.severity == Severity.WARNING
|
|
assert alert.title == "Test Alert"
|
|
assert alert.is_resolved is False
|
|
assert alert.occurrence_count == 1
|
|
|
|
def test_create_alert_with_details(self, db, alert_service):
|
|
"""Test creating alert with full details."""
|
|
alert = alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.DATABASE,
|
|
severity=Severity.CRITICAL,
|
|
title="Database Connection Issue",
|
|
description="Connection pool exhausted",
|
|
affected_stores=[1, 2, 3],
|
|
affected_systems=["api", "worker"],
|
|
auto_generated=True,
|
|
)
|
|
db.commit()
|
|
|
|
assert alert.description == "Connection pool exhausted"
|
|
assert alert.affected_stores == [1, 2, 3]
|
|
assert alert.affected_systems == ["api", "worker"]
|
|
assert alert.auto_generated is True
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestPlatformAlertServiceQuery:
|
|
"""Tests for platform alert query methods."""
|
|
|
|
def test_get_alerts_empty(self, db, alert_service):
|
|
"""Test getting alerts when none exist."""
|
|
alerts, total, active, critical = alert_service.get_alerts(db)
|
|
|
|
assert alerts == []
|
|
assert total == 0
|
|
assert active == 0
|
|
assert critical == 0
|
|
|
|
def test_get_alerts_with_data(self, db, alert_service):
|
|
"""Test getting alerts with data."""
|
|
alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.WARNING,
|
|
title="Warning Alert",
|
|
)
|
|
alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SECURITY,
|
|
severity=Severity.CRITICAL,
|
|
title="Critical Alert",
|
|
)
|
|
db.commit()
|
|
|
|
alerts, total, active, critical = alert_service.get_alerts(db)
|
|
|
|
assert total == 2
|
|
assert active == 2
|
|
assert critical == 1
|
|
|
|
def test_get_alerts_filter_severity(self, db, alert_service):
|
|
"""Test filtering by severity."""
|
|
alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.WARNING,
|
|
title="Warning",
|
|
)
|
|
alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.CRITICAL,
|
|
title="Critical",
|
|
)
|
|
db.commit()
|
|
|
|
alerts, total, _, _ = alert_service.get_alerts(db, severity=Severity.CRITICAL)
|
|
assert total == 1
|
|
assert alerts[0].title == "Critical"
|
|
|
|
def test_get_alerts_filter_type(self, db, alert_service):
|
|
"""Test filtering by alert type."""
|
|
alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SECURITY,
|
|
severity=Severity.WARNING,
|
|
title="Security",
|
|
)
|
|
alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.DATABASE,
|
|
severity=Severity.WARNING,
|
|
title="Database",
|
|
)
|
|
db.commit()
|
|
|
|
alerts, total, _, _ = alert_service.get_alerts(db, alert_type=AlertType.SECURITY)
|
|
assert total == 1
|
|
assert alerts[0].title == "Security"
|
|
|
|
def test_get_statistics(self, db, alert_service, test_admin):
|
|
"""Test getting alert statistics."""
|
|
# Create some alerts
|
|
alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.WARNING,
|
|
title="Active Warning",
|
|
)
|
|
alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SECURITY,
|
|
severity=Severity.CRITICAL,
|
|
title="Active Critical",
|
|
)
|
|
alert3 = alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.DATABASE,
|
|
severity=Severity.INFO,
|
|
title="Resolved",
|
|
)
|
|
db.commit()
|
|
|
|
# Resolve one today
|
|
alert_service.resolve_alert(db, alert3.id, test_admin.id)
|
|
db.commit()
|
|
|
|
stats = alert_service.get_statistics(db)
|
|
|
|
assert stats["total_alerts"] == 3
|
|
assert stats["active_alerts"] == 2
|
|
assert stats["critical_alerts"] == 1
|
|
assert stats["resolved_today"] == 1
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestPlatformAlertServiceActions:
|
|
"""Tests for platform alert action methods."""
|
|
|
|
def test_resolve_alert(self, db, alert_service, test_admin):
|
|
"""Test resolving an alert."""
|
|
alert = alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.WARNING,
|
|
title="Test Alert",
|
|
)
|
|
db.commit()
|
|
|
|
result = alert_service.resolve_alert(
|
|
db,
|
|
alert.id,
|
|
test_admin.id,
|
|
resolution_notes="Fixed by restarting service",
|
|
)
|
|
db.commit()
|
|
|
|
assert result.is_resolved is True
|
|
assert result.resolved_at is not None
|
|
assert result.resolved_by_user_id == test_admin.id
|
|
assert result.resolution_notes == "Fixed by restarting service"
|
|
|
|
def test_resolve_alert_already_resolved(self, db, alert_service, test_admin):
|
|
"""Test resolving already-resolved alert."""
|
|
alert = alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.WARNING,
|
|
title="Test Alert",
|
|
)
|
|
db.commit()
|
|
|
|
alert_service.resolve_alert(db, alert.id, test_admin.id)
|
|
db.commit()
|
|
|
|
original_resolved_at = alert.resolved_at
|
|
|
|
# Try resolving again
|
|
result = alert_service.resolve_alert(db, alert.id, test_admin.id)
|
|
|
|
# Should not update
|
|
assert result.resolved_at == original_resolved_at
|
|
|
|
def test_resolve_alert_not_found(self, db, alert_service, test_admin):
|
|
"""Test resolving nonexistent alert."""
|
|
result = alert_service.resolve_alert(db, 99999, test_admin.id)
|
|
assert result is None
|
|
|
|
def test_increment_occurrence(self, db, alert_service):
|
|
"""Test incrementing alert occurrence count."""
|
|
alert = alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.WARNING,
|
|
title="Recurring Alert",
|
|
)
|
|
db.commit()
|
|
|
|
original_occurred_at = alert.last_occurred_at
|
|
assert alert.occurrence_count == 1
|
|
|
|
result = alert_service.increment_occurrence(db, alert.id)
|
|
db.commit()
|
|
|
|
assert result.occurrence_count == 2
|
|
assert result.last_occurred_at > original_occurred_at
|
|
|
|
def test_find_similar_active_alert(self, db, alert_service, test_admin):
|
|
"""Test finding similar active alert."""
|
|
alert = alert_service.create_alert(
|
|
db=db,
|
|
alert_type=AlertType.SYSTEM,
|
|
severity=Severity.WARNING,
|
|
title="Duplicate Alert",
|
|
)
|
|
db.commit()
|
|
|
|
# Should find the active alert
|
|
found = alert_service.find_similar_active_alert(
|
|
db, AlertType.SYSTEM, "Duplicate Alert"
|
|
)
|
|
assert found is not None
|
|
assert found.id == alert.id
|
|
|
|
# Resolve the alert
|
|
alert_service.resolve_alert(db, alert.id, test_admin.id)
|
|
db.commit()
|
|
|
|
# Should not find resolved alert
|
|
found = alert_service.find_similar_active_alert(
|
|
db, AlertType.SYSTEM, "Duplicate Alert"
|
|
)
|
|
assert found is None
|
|
|
|
def test_create_or_increment_alert_new(self, db, alert_service):
|
|
"""Test creating new alert when none exists."""
|
|
alert = alert_service.create_or_increment_alert(
|
|
db=db,
|
|
alert_type=AlertType.PERFORMANCE,
|
|
severity=Severity.WARNING,
|
|
title="Performance Issue",
|
|
description="High CPU usage",
|
|
)
|
|
db.commit()
|
|
|
|
assert alert.occurrence_count == 1
|
|
assert alert.title == "Performance Issue"
|
|
|
|
def test_create_or_increment_alert_existing(self, db, alert_service):
|
|
"""Test incrementing existing alert."""
|
|
# Create initial alert
|
|
alert = alert_service.create_or_increment_alert(
|
|
db=db,
|
|
alert_type=AlertType.PERFORMANCE,
|
|
severity=Severity.WARNING,
|
|
title="Performance Issue",
|
|
)
|
|
db.commit()
|
|
alert_id = alert.id
|
|
|
|
# Call again with same type/title
|
|
result = alert_service.create_or_increment_alert(
|
|
db=db,
|
|
alert_type=AlertType.PERFORMANCE,
|
|
severity=Severity.WARNING,
|
|
title="Performance Issue",
|
|
)
|
|
db.commit()
|
|
|
|
# Should be same alert with incremented count
|
|
assert result.id == alert_id
|
|
assert result.occurrence_count == 2
|