# app/modules/messaging/services/admin_notification_service.py """ Admin notification service. Provides functionality for: - Creating and managing admin notifications - Managing platform alerts - Notification statistics and queries """ from __future__ import annotations import logging from datetime import datetime, timedelta from typing import Any from sqlalchemy import and_, case from sqlalchemy.orm import Session from app.modules.messaging.models.admin_notification import AdminNotification from app.modules.tenancy.schemas.admin import ( AdminNotificationCreate, PlatformAlertCreate, ) logger = logging.getLogger(__name__) def _get_platform_alert_model(): """Deferred import for PlatformAlert model (lives in tenancy, consumed by messaging).""" from app.modules.tenancy.models import PlatformAlert return PlatformAlert # ============================================================================ # NOTIFICATION TYPES # ============================================================================ class NotificationType: """Notification type constants.""" SYSTEM_ALERT = "system_alert" IMPORT_FAILURE = "import_failure" EXPORT_FAILURE = "export_failure" ORDER_SYNC_FAILURE = "order_sync_failure" STORE_ISSUE = "store_issue" CUSTOMER_MESSAGE = "customer_message" STORE_MESSAGE = "store_message" SECURITY_ALERT = "security_alert" PERFORMANCE_ALERT = "performance_alert" ORDER_EXCEPTION = "order_exception" CRITICAL_ERROR = "critical_error" class Priority: """Priority level constants.""" LOW = "low" NORMAL = "normal" HIGH = "high" CRITICAL = "critical" class AlertType: """Platform alert type constants.""" SECURITY = "security" PERFORMANCE = "performance" CAPACITY = "capacity" INTEGRATION = "integration" DATABASE = "database" SYSTEM = "system" class Severity: """Alert severity constants.""" INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical" # ============================================================================ # ADMIN NOTIFICATION SERVICE # ============================================================================ class AdminNotificationService: """Service for managing admin notifications.""" def create_notification( self, db: Session, notification_type: str, title: str, message: str, priority: str = Priority.NORMAL, action_required: bool = False, action_url: str | None = None, metadata: dict[str, Any] | None = None, ) -> AdminNotification: """ Create a new admin notification. Args: db: Database session notification_type: Type of notification title: Notification title message: Notification message priority: Priority level (low, normal, high, critical) action_required: Whether action is required action_url: URL to relevant admin page metadata: Additional contextual data Returns: Created AdminNotification """ notification = AdminNotification( type=notification_type, title=title, message=message, priority=priority, action_required=action_required, action_url=action_url, notification_metadata=metadata, ) db.add(notification) db.flush() logger.info( f"Created notification: {notification_type} - {title} (priority: {priority})" ) return notification def create_from_schema( self, db: Session, data: AdminNotificationCreate, ) -> AdminNotification: """Create notification from Pydantic schema.""" return self.create_notification( db=db, notification_type=data.type, title=data.title, message=data.message, priority=data.priority, action_required=data.action_required, action_url=data.action_url, metadata=data.metadata, ) def get_notifications( self, db: Session, priority: str | None = None, is_read: bool | None = None, notification_type: str | None = None, skip: int = 0, limit: int = 50, ) -> tuple[list[AdminNotification], int, int]: """ Get paginated admin notifications. Returns: Tuple of (notifications, total_count, unread_count) """ query = db.query(AdminNotification) # Apply filters if priority: query = query.filter(AdminNotification.priority == priority) if is_read is not None: query = query.filter(AdminNotification.is_read == is_read) if notification_type: query = query.filter(AdminNotification.type == notification_type) # Get counts total = query.count() unread_count = ( db.query(AdminNotification) .filter(AdminNotification.is_read == False) # noqa: E712 .count() ) # Get paginated results ordered by priority and date priority_order = case( (AdminNotification.priority == "critical", 1), (AdminNotification.priority == "high", 2), (AdminNotification.priority == "normal", 3), (AdminNotification.priority == "low", 4), else_=5, ) notifications = ( query.order_by( AdminNotification.is_read, # Unread first priority_order, AdminNotification.created_at.desc(), ) .offset(skip) .limit(limit) .all() ) return notifications, total, unread_count def get_unread_count(self, db: Session) -> int: """Get count of unread notifications.""" return ( db.query(AdminNotification) .filter(AdminNotification.is_read == False) # noqa: E712 .count() ) def get_recent_notifications( self, db: Session, limit: int = 5, ) -> list[AdminNotification]: """Get recent unread notifications for header dropdown.""" priority_order = case( (AdminNotification.priority == "critical", 1), (AdminNotification.priority == "high", 2), (AdminNotification.priority == "normal", 3), (AdminNotification.priority == "low", 4), else_=5, ) return ( db.query(AdminNotification) .filter(AdminNotification.is_read == False) # noqa: E712 .order_by(priority_order, AdminNotification.created_at.desc()) .limit(limit) .all() ) def mark_as_read( self, db: Session, notification_id: int, user_id: int, ) -> AdminNotification | None: """Mark a notification as read.""" notification = ( db.query(AdminNotification) .filter(AdminNotification.id == notification_id) .first() ) if notification and not notification.is_read: notification.is_read = True notification.read_at = datetime.utcnow() notification.read_by_user_id = user_id db.flush() return notification def mark_all_as_read( self, db: Session, user_id: int, ) -> int: """Mark all unread notifications as read. Returns count of updated.""" now = datetime.utcnow() count = ( db.query(AdminNotification) .filter(AdminNotification.is_read == False) # noqa: E712 .update( { AdminNotification.is_read: True, AdminNotification.read_at: now, AdminNotification.read_by_user_id: user_id, } ) ) db.flush() return count def delete_old_notifications( self, db: Session, days: int = 30, ) -> int: """Delete notifications older than specified days.""" cutoff = datetime.utcnow() - timedelta(days=days) count = ( db.query(AdminNotification) .filter( and_( AdminNotification.is_read == True, # noqa: E712 AdminNotification.created_at < cutoff, ) ) .delete() ) db.flush() return count def delete_notification( self, db: Session, notification_id: int, ) -> bool: """ Delete a notification by ID. Returns: True if notification was deleted, False if not found. """ notification = ( db.query(AdminNotification) .filter(AdminNotification.id == notification_id) .first() ) if notification: db.delete(notification) db.flush() logger.info(f"Deleted notification {notification_id}") return True return False # ========================================================================= # CONVENIENCE METHODS FOR CREATING SPECIFIC NOTIFICATION TYPES # ========================================================================= def notify_import_failure( self, db: Session, store_name: str, job_id: int, error_message: str, store_id: int | None = None, ) -> AdminNotification: """Create notification for import job failure.""" return self.create_notification( db=db, notification_type=NotificationType.IMPORT_FAILURE, title=f"Import Failed: {store_name}", message=error_message, priority=Priority.HIGH, action_required=True, action_url=f"/admin/marketplace/letzshop?store_id={store_id}&tab=jobs" if store_id else "/admin/marketplace", metadata={"store_name": store_name, "job_id": job_id, "store_id": store_id}, ) def notify_order_sync_failure( self, db: Session, store_name: str, error_message: str, store_id: int | None = None, ) -> AdminNotification: """Create notification for order sync failure.""" return self.create_notification( db=db, notification_type=NotificationType.ORDER_SYNC_FAILURE, title=f"Order Sync Failed: {store_name}", message=error_message, priority=Priority.HIGH, action_required=True, action_url=f"/admin/marketplace/letzshop?store_id={store_id}&tab=jobs" if store_id else "/admin/marketplace/letzshop", metadata={"store_name": store_name, "store_id": store_id}, ) def notify_order_exception( self, db: Session, store_name: str, order_number: str, exception_count: int, store_id: int | None = None, ) -> AdminNotification: """Create notification for order item exceptions.""" return self.create_notification( db=db, notification_type=NotificationType.ORDER_EXCEPTION, title=f"Order Exception: {order_number}", message=f"{exception_count} item(s) need attention for order {order_number} ({store_name})", priority=Priority.NORMAL, action_required=True, action_url=f"/admin/marketplace/letzshop?store_id={store_id}&tab=exceptions" if store_id else "/admin/marketplace/letzshop", metadata={ "store_name": store_name, "order_number": order_number, "exception_count": exception_count, "store_id": store_id, }, ) def notify_critical_error( self, db: Session, error_type: str, error_message: str, details: dict[str, Any] | None = None, ) -> AdminNotification: """Create notification for critical application errors.""" return self.create_notification( db=db, notification_type=NotificationType.CRITICAL_ERROR, title=f"Critical Error: {error_type}", message=error_message, priority=Priority.CRITICAL, action_required=True, action_url="/admin/logs", metadata=details, ) def notify_store_issue( self, db: Session, store_name: str, issue_type: str, message: str, store_id: int | None = None, ) -> AdminNotification: """Create notification for store-related issues.""" return self.create_notification( db=db, notification_type=NotificationType.STORE_ISSUE, title=f"Store Issue: {store_name}", message=message, priority=Priority.HIGH, action_required=True, action_url=f"/admin/stores/{store_id}" if store_id else "/admin/stores", metadata={ "store_name": store_name, "issue_type": issue_type, "store_id": store_id, }, ) def notify_security_alert( self, db: Session, title: str, message: str, details: dict[str, Any] | None = None, ) -> AdminNotification: """Create notification for security-related alerts.""" return self.create_notification( db=db, notification_type=NotificationType.SECURITY_ALERT, title=title, message=message, priority=Priority.CRITICAL, action_required=True, action_url="/admin/audit", metadata=details, ) # ============================================================================ # PLATFORM ALERT SERVICE # ============================================================================ class PlatformAlertService: """Service for managing platform-wide alerts.""" def create_alert( self, db: Session, alert_type: str, severity: str, title: str, description: str | None = None, affected_stores: list[int] | None = None, affected_systems: list[str] | None = None, auto_generated: bool = True, ) -> PlatformAlert: """Create a new platform alert.""" PlatformAlert = _get_platform_alert_model() now = datetime.utcnow() alert = PlatformAlert( alert_type=alert_type, severity=severity, title=title, description=description, affected_stores=affected_stores, affected_systems=affected_systems, auto_generated=auto_generated, first_occurred_at=now, last_occurred_at=now, ) db.add(alert) db.flush() logger.info(f"Created platform alert: {alert_type} - {title} ({severity})") return alert def create_from_schema( self, db: Session, data: PlatformAlertCreate, ) -> PlatformAlert: """Create alert from Pydantic schema.""" return self.create_alert( db=db, alert_type=data.alert_type, severity=data.severity, title=data.title, description=data.description, affected_stores=data.affected_stores, affected_systems=data.affected_systems, auto_generated=data.auto_generated, ) def get_alerts( self, db: Session, severity: str | None = None, alert_type: str | None = None, is_resolved: bool | None = None, skip: int = 0, limit: int = 50, ) -> tuple[list[PlatformAlert], int, int, int]: """ Get paginated platform alerts. Returns: Tuple of (alerts, total_count, active_count, critical_count) """ PlatformAlert = _get_platform_alert_model() query = db.query(PlatformAlert) # Apply filters if severity: query = query.filter(PlatformAlert.severity == severity) if alert_type: query = query.filter(PlatformAlert.alert_type == alert_type) if is_resolved is not None: query = query.filter(PlatformAlert.is_resolved == is_resolved) # Get counts total = query.count() active_count = ( db.query(PlatformAlert) .filter(PlatformAlert.is_resolved == False) # noqa: E712 .count() ) critical_count = ( db.query(PlatformAlert) .filter( and_( PlatformAlert.is_resolved == False, # noqa: E712 PlatformAlert.severity == Severity.CRITICAL, ) ) .count() ) # Get paginated results severity_order = case( (PlatformAlert.severity == "critical", 1), (PlatformAlert.severity == "error", 2), (PlatformAlert.severity == "warning", 3), (PlatformAlert.severity == "info", 4), else_=5, ) alerts = ( query.order_by( PlatformAlert.is_resolved, # Unresolved first severity_order, PlatformAlert.last_occurred_at.desc(), ) .offset(skip) .limit(limit) .all() ) return alerts, total, active_count, critical_count def resolve_alert( self, db: Session, alert_id: int, user_id: int, resolution_notes: str | None = None, ) -> PlatformAlert | None: """Resolve a platform alert.""" PlatformAlert = _get_platform_alert_model() alert = db.query(PlatformAlert).filter(PlatformAlert.id == alert_id).first() if alert and not alert.is_resolved: alert.is_resolved = True alert.resolved_at = datetime.utcnow() alert.resolved_by_user_id = user_id alert.resolution_notes = resolution_notes db.flush() logger.info(f"Resolved platform alert {alert_id}") return alert def get_statistics(self, db: Session) -> dict[str, int]: """Get alert statistics.""" PlatformAlert = _get_platform_alert_model() today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) total = db.query(PlatformAlert).count() active = ( db.query(PlatformAlert) .filter(PlatformAlert.is_resolved == False) # noqa: E712 .count() ) critical = ( db.query(PlatformAlert) .filter( and_( PlatformAlert.is_resolved == False, # noqa: E712 PlatformAlert.severity == Severity.CRITICAL, ) ) .count() ) resolved_today = ( db.query(PlatformAlert) .filter( and_( PlatformAlert.is_resolved == True, # noqa: E712 PlatformAlert.resolved_at >= today_start, ) ) .count() ) return { "total_alerts": total, "active_alerts": active, "critical_alerts": critical, "resolved_today": resolved_today, } def increment_occurrence( self, db: Session, alert_id: int, ) -> PlatformAlert | None: """Increment occurrence count for repeated alert.""" PlatformAlert = _get_platform_alert_model() alert = db.query(PlatformAlert).filter(PlatformAlert.id == alert_id).first() if alert: alert.occurrence_count += 1 alert.last_occurred_at = datetime.utcnow() db.flush() return alert def find_similar_active_alert( self, db: Session, alert_type: str, title: str, ) -> PlatformAlert | None: """Find an active alert with same type and title.""" PlatformAlert = _get_platform_alert_model() return ( db.query(PlatformAlert) .filter( and_( PlatformAlert.alert_type == alert_type, PlatformAlert.title == title, PlatformAlert.is_resolved == False, # noqa: E712 ) ) .first() ) def create_or_increment_alert( self, db: Session, alert_type: str, severity: str, title: str, description: str | None = None, affected_stores: list[int] | None = None, affected_systems: list[str] | None = None, ) -> PlatformAlert: """Create alert or increment occurrence if similar exists.""" existing = self.find_similar_active_alert(db, alert_type, title) if existing: self.increment_occurrence(db, existing.id) return existing return self.create_alert( db=db, alert_type=alert_type, severity=severity, title=title, description=description, affected_stores=affected_stores, affected_systems=affected_systems, ) # Singleton instances admin_notification_service = AdminNotificationService() platform_alert_service = PlatformAlertService()