From 8b7d2fe312b4ca6b007a69490bf46f5f70862a35 Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Sun, 21 Dec 2025 14:08:31 +0100 Subject: [PATCH] feat: add messaging system database models and core services MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Conversation, ConversationParticipant, Message, MessageAttachment models - Add ConversationType enum (admin_vendor, vendor_customer, admin_customer) - Add ParticipantType enum (admin, vendor, customer) - Add Alembic migration for messaging tables - Add MessagingService for conversation/message operations - Add MessageAttachmentService for file upload handling - Add message-related exceptions (ConversationNotFoundException, etc.) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../e3f4a5b6c7d8_add_messaging_tables.py | 339 +++++++++++ app/exceptions/__init__.py | 15 + app/exceptions/message.py | 63 ++ app/services/message_attachment_service.py | 225 +++++++ app/services/messaging_service.py | 572 ++++++++++++++++++ models/database/__init__.py | 15 + models/database/message.py | 267 ++++++++ models/schema/__init__.py | 2 + models/schema/message.py | 308 ++++++++++ 9 files changed, 1806 insertions(+) create mode 100644 alembic/versions/e3f4a5b6c7d8_add_messaging_tables.py create mode 100644 app/exceptions/message.py create mode 100644 app/services/message_attachment_service.py create mode 100644 app/services/messaging_service.py create mode 100644 models/database/message.py create mode 100644 models/schema/message.py diff --git a/alembic/versions/e3f4a5b6c7d8_add_messaging_tables.py b/alembic/versions/e3f4a5b6c7d8_add_messaging_tables.py new file mode 100644 index 00000000..62b9937d --- /dev/null +++ b/alembic/versions/e3f4a5b6c7d8_add_messaging_tables.py @@ -0,0 +1,339 @@ +"""add_messaging_tables + +Revision ID: e3f4a5b6c7d8 +Revises: c9e22eadf533 +Create Date: 2025-12-21 + +This migration adds the messaging system tables: +- conversations: Threaded conversation threads +- conversation_participants: Links users/customers to conversations +- messages: Individual messages within conversations +- message_attachments: File attachments for messages + +Supports three communication channels: +- Admin <-> Vendor +- Vendor <-> Customer +- Admin <-> Customer +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect + + +# revision identifiers, used by Alembic. +revision: str = "e3f4a5b6c7d8" +down_revision: Union[str, None] = "c9e22eadf533" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def table_exists(table_name: str) -> bool: + """Check if a table exists in the database.""" + bind = op.get_bind() + inspector = inspect(bind) + return table_name in inspector.get_table_names() + + +def index_exists(index_name: str, table_name: str) -> bool: + """Check if an index exists on a table.""" + bind = op.get_bind() + inspector = inspect(bind) + try: + indexes = inspector.get_indexes(table_name) + return any(idx["name"] == index_name for idx in indexes) + except Exception: + return False + + +def upgrade() -> None: + # ========================================================================= + # Step 1: Create conversations table + # ========================================================================= + if not table_exists("conversations"): + op.create_table( + "conversations", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column( + "conversation_type", + sa.Enum( + "admin_vendor", + "vendor_customer", + "admin_customer", + name="conversationtype", + ), + nullable=False, + ), + sa.Column("subject", sa.String(length=500), nullable=False), + sa.Column("vendor_id", sa.Integer(), nullable=True), + sa.Column("is_closed", sa.Boolean(), nullable=False, server_default="0"), + sa.Column("closed_at", sa.DateTime(), nullable=True), + sa.Column( + "closed_by_type", + sa.Enum("admin", "vendor", "customer", name="participanttype"), + nullable=True, + ), + sa.Column("closed_by_id", sa.Integer(), nullable=True), + sa.Column("last_message_at", sa.DateTime(), nullable=True), + sa.Column("message_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column( + "created_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.Column( + "updated_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.ForeignKeyConstraint( + ["vendor_id"], + ["vendors.id"], + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_conversations_id"), "conversations", ["id"], unique=False + ) + op.create_index( + op.f("ix_conversations_conversation_type"), + "conversations", + ["conversation_type"], + unique=False, + ) + op.create_index( + op.f("ix_conversations_vendor_id"), + "conversations", + ["vendor_id"], + unique=False, + ) + op.create_index( + op.f("ix_conversations_last_message_at"), + "conversations", + ["last_message_at"], + unique=False, + ) + op.create_index( + "ix_conversations_type_vendor", + "conversations", + ["conversation_type", "vendor_id"], + unique=False, + ) + + # ========================================================================= + # Step 2: Create conversation_participants table + # ========================================================================= + if not table_exists("conversation_participants"): + op.create_table( + "conversation_participants", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("conversation_id", sa.Integer(), nullable=False), + sa.Column( + "participant_type", + sa.Enum("admin", "vendor", "customer", name="participanttype"), + nullable=False, + ), + sa.Column("participant_id", sa.Integer(), nullable=False), + sa.Column("vendor_id", sa.Integer(), nullable=True), + sa.Column("unread_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column("last_read_at", sa.DateTime(), nullable=True), + sa.Column( + "email_notifications", sa.Boolean(), nullable=False, server_default="1" + ), + sa.Column("muted", sa.Boolean(), nullable=False, server_default="0"), + sa.Column( + "created_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.Column( + "updated_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.ForeignKeyConstraint( + ["conversation_id"], + ["conversations.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["vendor_id"], + ["vendors.id"], + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "conversation_id", + "participant_type", + "participant_id", + name="uq_conversation_participant", + ), + ) + op.create_index( + op.f("ix_conversation_participants_id"), + "conversation_participants", + ["id"], + unique=False, + ) + op.create_index( + op.f("ix_conversation_participants_conversation_id"), + "conversation_participants", + ["conversation_id"], + unique=False, + ) + op.create_index( + op.f("ix_conversation_participants_participant_id"), + "conversation_participants", + ["participant_id"], + unique=False, + ) + op.create_index( + "ix_participant_lookup", + "conversation_participants", + ["participant_type", "participant_id"], + unique=False, + ) + + # ========================================================================= + # Step 3: Create messages table + # ========================================================================= + if not table_exists("messages"): + op.create_table( + "messages", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("conversation_id", sa.Integer(), nullable=False), + sa.Column( + "sender_type", + sa.Enum("admin", "vendor", "customer", name="participanttype"), + nullable=False, + ), + sa.Column("sender_id", sa.Integer(), nullable=False), + sa.Column("content", sa.Text(), nullable=False), + sa.Column( + "is_system_message", sa.Boolean(), nullable=False, server_default="0" + ), + sa.Column("is_deleted", sa.Boolean(), nullable=False, server_default="0"), + sa.Column("deleted_at", sa.DateTime(), nullable=True), + sa.Column( + "deleted_by_type", + sa.Enum("admin", "vendor", "customer", name="participanttype"), + nullable=True, + ), + sa.Column("deleted_by_id", sa.Integer(), nullable=True), + sa.Column( + "created_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.Column( + "updated_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.ForeignKeyConstraint( + ["conversation_id"], + ["conversations.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index(op.f("ix_messages_id"), "messages", ["id"], unique=False) + op.create_index( + op.f("ix_messages_conversation_id"), + "messages", + ["conversation_id"], + unique=False, + ) + op.create_index( + op.f("ix_messages_sender_id"), "messages", ["sender_id"], unique=False + ) + op.create_index( + "ix_messages_conversation_created", + "messages", + ["conversation_id", "created_at"], + unique=False, + ) + + # ========================================================================= + # Step 4: Create message_attachments table + # ========================================================================= + if not table_exists("message_attachments"): + op.create_table( + "message_attachments", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("message_id", sa.Integer(), nullable=False), + sa.Column("filename", sa.String(length=255), nullable=False), + sa.Column("original_filename", sa.String(length=255), nullable=False), + sa.Column("file_path", sa.String(length=1000), nullable=False), + sa.Column("file_size", sa.Integer(), nullable=False), + sa.Column("mime_type", sa.String(length=100), nullable=False), + sa.Column("is_image", sa.Boolean(), nullable=False, server_default="0"), + sa.Column("image_width", sa.Integer(), nullable=True), + sa.Column("image_height", sa.Integer(), nullable=True), + sa.Column("thumbnail_path", sa.String(length=1000), nullable=True), + sa.Column( + "created_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.Column( + "updated_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.ForeignKeyConstraint( + ["message_id"], + ["messages.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_message_attachments_id"), + "message_attachments", + ["id"], + unique=False, + ) + op.create_index( + op.f("ix_message_attachments_message_id"), + "message_attachments", + ["message_id"], + unique=False, + ) + + # ========================================================================= + # Step 5: Add platform setting for attachment size limit + # ========================================================================= + # Note: This will be added via seed script or manually + # Key: message_attachment_max_size_mb + # Value: 10 + # Category: messaging + + +def downgrade() -> None: + # Drop tables in reverse order (respecting foreign keys) + if table_exists("message_attachments"): + op.drop_table("message_attachments") + + if table_exists("messages"): + op.drop_table("messages") + + if table_exists("conversation_participants"): + op.drop_table("conversation_participants") + + if table_exists("conversations"): + op.drop_table("conversations") + + # Note: Enum types are not dropped automatically + # They can be manually dropped with: + # op.execute("DROP TYPE IF EXISTS conversationtype") + # op.execute("DROP TYPE IF EXISTS participanttype") diff --git a/app/exceptions/__init__.py b/app/exceptions/__init__.py index e5b42d14..7aa6a219 100644 --- a/app/exceptions/__init__.py +++ b/app/exceptions/__init__.py @@ -101,6 +101,15 @@ from .inventory import ( NegativeInventoryException, ) +# Message exceptions +from .message import ( + ConversationClosedException, + ConversationNotFoundException, + MessageAttachmentException, + MessageNotFoundException, + UnauthorizedConversationAccessException, +) + # Marketplace import job exceptions from .marketplace_import_job import ( ImportJobAlreadyProcessingException, @@ -374,4 +383,10 @@ __all__ = [ "ScanParseException", "ViolationOperationException", "InvalidViolationStatusException", + # Message exceptions + "ConversationNotFoundException", + "MessageNotFoundException", + "ConversationClosedException", + "MessageAttachmentException", + "UnauthorizedConversationAccessException", ] diff --git a/app/exceptions/message.py b/app/exceptions/message.py new file mode 100644 index 00000000..1d0a2c32 --- /dev/null +++ b/app/exceptions/message.py @@ -0,0 +1,63 @@ +# app/exceptions/message.py +""" +Messaging specific exceptions. +""" + +from .base import BusinessLogicException, ResourceNotFoundException, ValidationException + + +class ConversationNotFoundException(ResourceNotFoundException): + """Raised when a conversation is not found.""" + + def __init__(self, conversation_identifier: str): + super().__init__( + resource_type="Conversation", + identifier=conversation_identifier, + message=f"Conversation '{conversation_identifier}' not found", + error_code="CONVERSATION_NOT_FOUND", + ) + + +class MessageNotFoundException(ResourceNotFoundException): + """Raised when a message is not found.""" + + def __init__(self, message_identifier: str): + super().__init__( + resource_type="Message", + identifier=message_identifier, + message=f"Message '{message_identifier}' not found", + error_code="MESSAGE_NOT_FOUND", + ) + + +class ConversationClosedException(BusinessLogicException): + """Raised when trying to send message to a closed conversation.""" + + def __init__(self, conversation_id: int): + super().__init__( + message=f"Cannot send message to closed conversation {conversation_id}", + error_code="CONVERSATION_CLOSED", + details={"conversation_id": conversation_id}, + ) + + +class MessageAttachmentException(ValidationException): + """Raised when attachment validation fails.""" + + def __init__(self, message: str, details: dict | None = None): + super().__init__( + message=message, + error_code="MESSAGE_ATTACHMENT_INVALID", + details=details, + ) + + +class UnauthorizedConversationAccessException(BusinessLogicException): + """Raised when user tries to access a conversation they don't have access to.""" + + def __init__(self, conversation_id: int): + super().__init__( + message=f"You do not have access to conversation {conversation_id}", + error_code="CONVERSATION_ACCESS_DENIED", + details={"conversation_id": conversation_id}, + ) diff --git a/app/services/message_attachment_service.py b/app/services/message_attachment_service.py new file mode 100644 index 00000000..7fd4cd5d --- /dev/null +++ b/app/services/message_attachment_service.py @@ -0,0 +1,225 @@ +# app/services/message_attachment_service.py +""" +Attachment handling service for messaging system. + +Handles file upload, validation, storage, and retrieval. +""" + +import logging +import os +import uuid +from datetime import datetime +from pathlib import Path + +from fastapi import UploadFile +from sqlalchemy.orm import Session + +from app.services.admin_settings_service import admin_settings_service + +logger = logging.getLogger(__name__) + +# Allowed MIME types for attachments +ALLOWED_MIME_TYPES = { + # Images + "image/jpeg", + "image/png", + "image/gif", + "image/webp", + # Documents + "application/pdf", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.ms-excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + # Archives + "application/zip", + # Text + "text/plain", + "text/csv", +} + +IMAGE_MIME_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} + +# Default max file size in MB +DEFAULT_MAX_FILE_SIZE_MB = 10 + + +class MessageAttachmentService: + """Service for handling message attachments.""" + + def __init__(self, storage_base: str = "uploads/messages"): + self.storage_base = storage_base + + def get_max_file_size_bytes(self, db: Session) -> int: + """Get maximum file size from platform settings.""" + max_mb = admin_settings_service.get_setting_value( + db, + "message_attachment_max_size_mb", + default=DEFAULT_MAX_FILE_SIZE_MB, + ) + try: + max_mb = int(max_mb) + except (TypeError, ValueError): + max_mb = DEFAULT_MAX_FILE_SIZE_MB + return max_mb * 1024 * 1024 # Convert to bytes + + def validate_file_type(self, mime_type: str) -> bool: + """Check if file type is allowed.""" + return mime_type in ALLOWED_MIME_TYPES + + def is_image(self, mime_type: str) -> bool: + """Check if file is an image.""" + return mime_type in IMAGE_MIME_TYPES + + async def validate_and_store( + self, + db: Session, + file: UploadFile, + conversation_id: int, + ) -> dict: + """ + Validate and store an uploaded file. + + Returns dict with file metadata for MessageAttachment creation. + + Raises: + ValueError: If file type or size is invalid + """ + # Validate MIME type + content_type = file.content_type or "application/octet-stream" + if not self.validate_file_type(content_type): + raise ValueError( + f"File type '{content_type}' not allowed. " + "Allowed types: images (JPEG, PNG, GIF, WebP), " + "PDF, Office documents, ZIP, text files." + ) + + # Read file content + content = await file.read() + file_size = len(content) + + # Validate file size + max_size = self.get_max_file_size_bytes(db) + if file_size > max_size: + raise ValueError( + f"File size {file_size / 1024 / 1024:.1f}MB exceeds " + f"maximum allowed size of {max_size / 1024 / 1024:.1f}MB" + ) + + # Generate unique filename + original_filename = file.filename or "attachment" + ext = Path(original_filename).suffix.lower() + unique_filename = f"{uuid.uuid4().hex}{ext}" + + # Create storage path: uploads/messages/YYYY/MM/conversation_id/filename + now = datetime.utcnow() + relative_path = os.path.join( + self.storage_base, + str(now.year), + f"{now.month:02d}", + str(conversation_id), + ) + + # Ensure directory exists + os.makedirs(relative_path, exist_ok=True) + + # Full file path + file_path = os.path.join(relative_path, unique_filename) + + # Write file + with open(file_path, "wb") as f: + f.write(content) + + # Prepare metadata + is_image = self.is_image(content_type) + metadata = { + "filename": unique_filename, + "original_filename": original_filename, + "file_path": file_path, + "file_size": file_size, + "mime_type": content_type, + "is_image": is_image, + } + + # Generate thumbnail for images + if is_image: + thumbnail_data = self._create_thumbnail(content, file_path) + metadata.update(thumbnail_data) + + logger.info( + f"Stored attachment {unique_filename} for conversation {conversation_id} " + f"({file_size} bytes, type: {content_type})" + ) + + return metadata + + def _create_thumbnail(self, content: bytes, original_path: str) -> dict: + """Create thumbnail for image attachments.""" + try: + from PIL import Image + import io + + img = Image.open(io.BytesIO(content)) + width, height = img.size + + # Create thumbnail + img.thumbnail((200, 200)) + + thumb_path = original_path.replace(".", "_thumb.") + img.save(thumb_path) + + return { + "image_width": width, + "image_height": height, + "thumbnail_path": thumb_path, + } + except ImportError: + logger.warning("PIL not installed, skipping thumbnail generation") + return {} + except Exception as e: + logger.error(f"Failed to create thumbnail: {e}") + return {} + + def delete_attachment( + self, file_path: str, thumbnail_path: str | None = None + ) -> bool: + """Delete attachment files from storage.""" + try: + if os.path.exists(file_path): + os.remove(file_path) + logger.info(f"Deleted attachment file: {file_path}") + + if thumbnail_path and os.path.exists(thumbnail_path): + os.remove(thumbnail_path) + logger.info(f"Deleted thumbnail: {thumbnail_path}") + + return True + except Exception as e: + logger.error(f"Failed to delete attachment {file_path}: {e}") + return False + + def get_download_url(self, file_path: str) -> str: + """ + Get download URL for an attachment. + + For local storage, returns a relative path that can be served + by the static file handler or a dedicated download endpoint. + """ + # Convert local path to URL path + # Assumes files are served from /static/uploads or similar + return f"/static/{file_path}" + + def get_file_content(self, file_path: str) -> bytes | None: + """Read file content from storage.""" + try: + if os.path.exists(file_path): + with open(file_path, "rb") as f: + return f.read() + return None + except Exception as e: + logger.error(f"Failed to read file {file_path}: {e}") + return None + + +# Singleton instance +message_attachment_service = MessageAttachmentService() diff --git a/app/services/messaging_service.py b/app/services/messaging_service.py new file mode 100644 index 00000000..c3afdd03 --- /dev/null +++ b/app/services/messaging_service.py @@ -0,0 +1,572 @@ +# app/services/messaging_service.py +""" +Messaging service for conversation and message management. + +Provides functionality for: +- Creating conversations between different participant types +- Sending messages with attachments +- Managing read status and unread counts +- Conversation listing with filters +- Multi-tenant data isolation +""" + +import logging +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy import and_, func, or_ +from sqlalchemy.orm import Session, joinedload + +from models.database.customer import Customer +from models.database.message import ( + Conversation, + ConversationParticipant, + ConversationType, + Message, + MessageAttachment, + ParticipantType, +) +from models.database.user import User + +logger = logging.getLogger(__name__) + + +class MessagingService: + """Service for managing conversations and messages.""" + + # ========================================================================= + # CONVERSATION MANAGEMENT + # ========================================================================= + + def create_conversation( + self, + db: Session, + conversation_type: ConversationType, + subject: str, + initiator_type: ParticipantType, + initiator_id: int, + recipient_type: ParticipantType, + recipient_id: int, + vendor_id: int | None = None, + initial_message: str | None = None, + ) -> Conversation: + """ + Create a new conversation between two participants. + + Args: + db: Database session + conversation_type: Type of conversation channel + subject: Conversation subject line + initiator_type: Type of initiating participant + initiator_id: ID of initiating participant + recipient_type: Type of receiving participant + recipient_id: ID of receiving participant + vendor_id: Required for vendor_customer/admin_customer types + initial_message: Optional first message content + + Returns: + Created Conversation object + """ + # Validate vendor_id requirement + if conversation_type in [ + ConversationType.VENDOR_CUSTOMER, + ConversationType.ADMIN_CUSTOMER, + ]: + if not vendor_id: + raise ValueError( + f"vendor_id required for {conversation_type.value} conversations" + ) + + # Create conversation + conversation = Conversation( + conversation_type=conversation_type, + subject=subject, + vendor_id=vendor_id, + ) + db.add(conversation) + db.flush() + + # Add participants + initiator_vendor_id = ( + vendor_id if initiator_type == ParticipantType.VENDOR else None + ) + recipient_vendor_id = ( + vendor_id if recipient_type == ParticipantType.VENDOR else None + ) + + initiator = ConversationParticipant( + conversation_id=conversation.id, + participant_type=initiator_type, + participant_id=initiator_id, + vendor_id=initiator_vendor_id, + unread_count=0, # Initiator has read their own message + ) + recipient = ConversationParticipant( + conversation_id=conversation.id, + participant_type=recipient_type, + participant_id=recipient_id, + vendor_id=recipient_vendor_id, + unread_count=1 if initial_message else 0, + ) + + db.add(initiator) + db.add(recipient) + db.flush() + + # Add initial message if provided + if initial_message: + self.send_message( + db=db, + conversation_id=conversation.id, + sender_type=initiator_type, + sender_id=initiator_id, + content=initial_message, + _skip_unread_update=True, # Already set above + ) + + logger.info( + f"Created {conversation_type.value} conversation {conversation.id}: " + f"{initiator_type.value}:{initiator_id} -> {recipient_type.value}:{recipient_id}" + ) + + return conversation + + def get_conversation( + self, + db: Session, + conversation_id: int, + participant_type: ParticipantType, + participant_id: int, + ) -> Conversation | None: + """ + Get conversation if participant has access. + + Validates that the requester is a participant. + """ + conversation = ( + db.query(Conversation) + .options( + joinedload(Conversation.participants), + joinedload(Conversation.messages).joinedload(Message.attachments), + ) + .filter(Conversation.id == conversation_id) + .first() + ) + + if not conversation: + return None + + # Verify participant access + has_access = any( + p.participant_type == participant_type + and p.participant_id == participant_id + for p in conversation.participants + ) + + if not has_access: + logger.warning( + f"Access denied to conversation {conversation_id} for " + f"{participant_type.value}:{participant_id}" + ) + return None + + return conversation + + def list_conversations( + self, + db: Session, + participant_type: ParticipantType, + participant_id: int, + vendor_id: int | None = None, + conversation_type: ConversationType | None = None, + is_closed: bool | None = None, + skip: int = 0, + limit: int = 20, + ) -> tuple[list[Conversation], int, int]: + """ + List conversations for a participant with filters. + + Returns: + Tuple of (conversations, total_count, total_unread) + """ + # Base query: conversations where user is a participant + query = ( + db.query(Conversation) + .join(ConversationParticipant) + .filter( + and_( + ConversationParticipant.participant_type == participant_type, + ConversationParticipant.participant_id == participant_id, + ) + ) + ) + + # Multi-tenant filter for vendor users + if participant_type == ParticipantType.VENDOR and vendor_id: + query = query.filter(ConversationParticipant.vendor_id == vendor_id) + + # Customer vendor isolation + if participant_type == ParticipantType.CUSTOMER and vendor_id: + query = query.filter(Conversation.vendor_id == vendor_id) + + # Type filter + if conversation_type: + query = query.filter(Conversation.conversation_type == conversation_type) + + # Status filter + if is_closed is not None: + query = query.filter(Conversation.is_closed == is_closed) + + # Get total count + total = query.count() + + # Get total unread across all conversations + unread_query = db.query( + func.sum(ConversationParticipant.unread_count) + ).filter( + and_( + ConversationParticipant.participant_type == participant_type, + ConversationParticipant.participant_id == participant_id, + ) + ) + + if participant_type == ParticipantType.VENDOR and vendor_id: + unread_query = unread_query.filter( + ConversationParticipant.vendor_id == vendor_id + ) + + total_unread = unread_query.scalar() or 0 + + # Get paginated results, ordered by last activity + conversations = ( + query.options(joinedload(Conversation.participants)) + .order_by(Conversation.last_message_at.desc().nullslast()) + .offset(skip) + .limit(limit) + .all() + ) + + return conversations, total, total_unread + + def close_conversation( + self, + db: Session, + conversation_id: int, + closer_type: ParticipantType, + closer_id: int, + ) -> Conversation | None: + """Close a conversation thread.""" + conversation = self.get_conversation( + db, conversation_id, closer_type, closer_id + ) + + if not conversation: + return None + + conversation.is_closed = True + conversation.closed_at = datetime.now(UTC) + conversation.closed_by_type = closer_type + conversation.closed_by_id = closer_id + + # Add system message + self.send_message( + db=db, + conversation_id=conversation_id, + sender_type=closer_type, + sender_id=closer_id, + content="Conversation closed", + is_system_message=True, + ) + + db.flush() + return conversation + + def reopen_conversation( + self, + db: Session, + conversation_id: int, + opener_type: ParticipantType, + opener_id: int, + ) -> Conversation | None: + """Reopen a closed conversation.""" + conversation = self.get_conversation( + db, conversation_id, opener_type, opener_id + ) + + if not conversation: + return None + + conversation.is_closed = False + conversation.closed_at = None + conversation.closed_by_type = None + conversation.closed_by_id = None + + # Add system message + self.send_message( + db=db, + conversation_id=conversation_id, + sender_type=opener_type, + sender_id=opener_id, + content="Conversation reopened", + is_system_message=True, + ) + + db.flush() + return conversation + + # ========================================================================= + # MESSAGE MANAGEMENT + # ========================================================================= + + def send_message( + self, + db: Session, + conversation_id: int, + sender_type: ParticipantType, + sender_id: int, + content: str, + attachments: list[dict[str, Any]] | None = None, + is_system_message: bool = False, + _skip_unread_update: bool = False, + ) -> Message: + """ + Send a message in a conversation. + + Args: + db: Database session + conversation_id: Target conversation ID + sender_type: Type of sender + sender_id: ID of sender + content: Message text content + attachments: List of attachment dicts with file metadata + is_system_message: Whether this is a system-generated message + _skip_unread_update: Internal flag to skip unread increment + + Returns: + Created Message object + """ + # Create message + message = Message( + conversation_id=conversation_id, + sender_type=sender_type, + sender_id=sender_id, + content=content, + is_system_message=is_system_message, + ) + db.add(message) + db.flush() + + # Add attachments if any + if attachments: + for att_data in attachments: + attachment = MessageAttachment( + message_id=message.id, + filename=att_data["filename"], + original_filename=att_data["original_filename"], + file_path=att_data["file_path"], + file_size=att_data["file_size"], + mime_type=att_data["mime_type"], + is_image=att_data.get("is_image", False), + image_width=att_data.get("image_width"), + image_height=att_data.get("image_height"), + thumbnail_path=att_data.get("thumbnail_path"), + ) + db.add(attachment) + + # Update conversation metadata + conversation = ( + db.query(Conversation).filter(Conversation.id == conversation_id).first() + ) + + if conversation: + conversation.last_message_at = datetime.now(UTC) + conversation.message_count += 1 + + # Update unread counts for other participants + if not _skip_unread_update: + db.query(ConversationParticipant).filter( + and_( + ConversationParticipant.conversation_id == conversation_id, + or_( + ConversationParticipant.participant_type != sender_type, + ConversationParticipant.participant_id != sender_id, + ), + ) + ).update( + { + ConversationParticipant.unread_count: ConversationParticipant.unread_count + + 1 + } + ) + + db.flush() + + logger.info( + f"Message {message.id} sent in conversation {conversation_id} " + f"by {sender_type.value}:{sender_id}" + ) + + return message + + def delete_message( + self, + db: Session, + message_id: int, + deleter_type: ParticipantType, + deleter_id: int, + ) -> Message | None: + """Soft delete a message (for moderation).""" + message = db.query(Message).filter(Message.id == message_id).first() + + if not message: + return None + + # Verify deleter has access to conversation + conversation = self.get_conversation( + db, message.conversation_id, deleter_type, deleter_id + ) + if not conversation: + return None + + message.is_deleted = True + message.deleted_at = datetime.now(UTC) + message.deleted_by_type = deleter_type + message.deleted_by_id = deleter_id + + db.flush() + return message + + def mark_conversation_read( + self, + db: Session, + conversation_id: int, + reader_type: ParticipantType, + reader_id: int, + ) -> bool: + """Mark all messages in conversation as read for participant.""" + result = ( + db.query(ConversationParticipant) + .filter( + and_( + ConversationParticipant.conversation_id == conversation_id, + ConversationParticipant.participant_type == reader_type, + ConversationParticipant.participant_id == reader_id, + ) + ) + .update( + { + ConversationParticipant.unread_count: 0, + ConversationParticipant.last_read_at: datetime.now(UTC), + } + ) + ) + db.flush() + return result > 0 + + def get_unread_count( + self, + db: Session, + participant_type: ParticipantType, + participant_id: int, + vendor_id: int | None = None, + ) -> int: + """Get total unread message count for a participant.""" + query = db.query(func.sum(ConversationParticipant.unread_count)).filter( + and_( + ConversationParticipant.participant_type == participant_type, + ConversationParticipant.participant_id == participant_id, + ) + ) + + if vendor_id: + query = query.filter(ConversationParticipant.vendor_id == vendor_id) + + return query.scalar() or 0 + + # ========================================================================= + # PARTICIPANT HELPERS + # ========================================================================= + + def get_participant_info( + self, + db: Session, + participant_type: ParticipantType, + participant_id: int, + ) -> dict[str, Any] | None: + """Get display info for a participant (name, email, avatar).""" + if participant_type in [ParticipantType.ADMIN, ParticipantType.VENDOR]: + user = db.query(User).filter(User.id == participant_id).first() + if user: + return { + "id": user.id, + "type": participant_type.value, + "name": f"{user.first_name or ''} {user.last_name or ''}".strip() + or user.username, + "email": user.email, + "avatar_url": None, # Could add avatar support later + } + elif participant_type == ParticipantType.CUSTOMER: + customer = db.query(Customer).filter(Customer.id == participant_id).first() + if customer: + return { + "id": customer.id, + "type": participant_type.value, + "name": f"{customer.first_name or ''} {customer.last_name or ''}".strip() + or customer.email, + "email": customer.email, + "avatar_url": None, + } + return None + + def get_other_participant( + self, + conversation: Conversation, + my_type: ParticipantType, + my_id: int, + ) -> ConversationParticipant | None: + """Get the other participant in a conversation.""" + for p in conversation.participants: + if p.participant_type != my_type or p.participant_id != my_id: + return p + return None + + # ========================================================================= + # NOTIFICATION PREFERENCES + # ========================================================================= + + def update_notification_preferences( + self, + db: Session, + conversation_id: int, + participant_type: ParticipantType, + participant_id: int, + email_notifications: bool | None = None, + muted: bool | None = None, + ) -> bool: + """Update notification preferences for a participant in a conversation.""" + updates = {} + if email_notifications is not None: + updates[ConversationParticipant.email_notifications] = email_notifications + if muted is not None: + updates[ConversationParticipant.muted] = muted + + if not updates: + return False + + result = ( + db.query(ConversationParticipant) + .filter( + and_( + ConversationParticipant.conversation_id == conversation_id, + ConversationParticipant.participant_type == participant_type, + ConversationParticipant.participant_id == participant_id, + ) + ) + .update(updates) + ) + db.flush() + return result > 0 + + +# Singleton instance +messaging_service = MessagingService() diff --git a/models/database/__init__.py b/models/database/__init__.py index 520adc97..66c93616 100644 --- a/models/database/__init__.py +++ b/models/database/__init__.py @@ -26,6 +26,14 @@ from .letzshop import ( VendorLetzshopCredentials, ) from .marketplace_import_job import MarketplaceImportError, MarketplaceImportJob +from .message import ( + Conversation, + ConversationParticipant, + ConversationType, + Message, + MessageAttachment, + ParticipantType, +) from .marketplace_product import ( DigitalDeliveryMethod, MarketplaceProduct, @@ -96,4 +104,11 @@ __all__ = [ "LetzshopFulfillmentQueue", "LetzshopSyncLog", "LetzshopHistoricalImportJob", + # Messaging + "Conversation", + "ConversationParticipant", + "ConversationType", + "Message", + "MessageAttachment", + "ParticipantType", ] diff --git a/models/database/message.py b/models/database/message.py new file mode 100644 index 00000000..348dd906 --- /dev/null +++ b/models/database/message.py @@ -0,0 +1,267 @@ +# models/database/message.py +""" +Messaging system database models. + +Supports three communication channels: +- Admin <-> Vendor +- Vendor <-> Customer +- Admin <-> Customer + +Multi-tenant isolation is enforced via vendor_id for conversations +involving customers. +""" + +import enum +from datetime import datetime + +from sqlalchemy import ( + Boolean, + Column, + DateTime, + Enum, + ForeignKey, + Index, + Integer, + String, + Text, + UniqueConstraint, +) +from sqlalchemy.orm import relationship + +from app.core.database import Base +from models.database.base import TimestampMixin + + +class ConversationType(str, enum.Enum): + """Defines the three supported conversation channels.""" + + ADMIN_VENDOR = "admin_vendor" + VENDOR_CUSTOMER = "vendor_customer" + ADMIN_CUSTOMER = "admin_customer" + + +class ParticipantType(str, enum.Enum): + """Type of participant in a conversation.""" + + ADMIN = "admin" # User with role="admin" + VENDOR = "vendor" # User with role="vendor" (via VendorUser) + CUSTOMER = "customer" # Customer model + + +class Conversation(Base, TimestampMixin): + """ + Represents a threaded conversation between participants. + + Multi-tenancy: vendor_id is required for vendor_customer and admin_customer + conversations to ensure customer data isolation. + """ + + __tablename__ = "conversations" + + id = Column(Integer, primary_key=True, index=True) + + # Conversation type determines participant structure + conversation_type = Column( + Enum(ConversationType), + nullable=False, + index=True, + ) + + # Subject line for the conversation thread + subject = Column(String(500), nullable=False) + + # For vendor_customer and admin_customer conversations + # Required for multi-tenant data isolation + vendor_id = Column( + Integer, + ForeignKey("vendors.id"), + nullable=True, + index=True, + ) + + # Status flags + is_closed = Column(Boolean, default=False, nullable=False) + closed_at = Column(DateTime, nullable=True) + closed_by_type = Column(Enum(ParticipantType), nullable=True) + closed_by_id = Column(Integer, nullable=True) + + # Last activity tracking for sorting + last_message_at = Column(DateTime, nullable=True, index=True) + message_count = Column(Integer, default=0, nullable=False) + + # Relationships + vendor = relationship("Vendor", foreign_keys=[vendor_id]) + participants = relationship( + "ConversationParticipant", + back_populates="conversation", + cascade="all, delete-orphan", + ) + messages = relationship( + "Message", + back_populates="conversation", + cascade="all, delete-orphan", + order_by="Message.created_at", + ) + + # Indexes for common queries + __table_args__ = ( + Index("ix_conversations_type_vendor", "conversation_type", "vendor_id"), + ) + + def __repr__(self) -> str: + return ( + f"" + ) + + +class ConversationParticipant(Base, TimestampMixin): + """ + Links participants (users or customers) to conversations. + + Polymorphic relationship: + - participant_type="admin" or "vendor": references users.id + - participant_type="customer": references customers.id + """ + + __tablename__ = "conversation_participants" + + id = Column(Integer, primary_key=True, index=True) + conversation_id = Column( + Integer, + ForeignKey("conversations.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + + # Polymorphic participant reference + participant_type = Column(Enum(ParticipantType), nullable=False) + participant_id = Column(Integer, nullable=False, index=True) + + # For vendor participants, track which vendor they represent + vendor_id = Column( + Integer, + ForeignKey("vendors.id"), + nullable=True, + ) + + # Unread tracking per participant + unread_count = Column(Integer, default=0, nullable=False) + last_read_at = Column(DateTime, nullable=True) + + # Notification preferences for this conversation + email_notifications = Column(Boolean, default=True, nullable=False) + muted = Column(Boolean, default=False, nullable=False) + + # Relationships + conversation = relationship("Conversation", back_populates="participants") + vendor = relationship("Vendor", foreign_keys=[vendor_id]) + + __table_args__ = ( + UniqueConstraint( + "conversation_id", + "participant_type", + "participant_id", + name="uq_conversation_participant", + ), + Index( + "ix_participant_lookup", + "participant_type", + "participant_id", + ), + ) + + def __repr__(self) -> str: + return ( + f"" + ) + + +class Message(Base, TimestampMixin): + """ + Individual message within a conversation thread. + + Sender polymorphism follows same pattern as participant. + """ + + __tablename__ = "messages" + + id = Column(Integer, primary_key=True, index=True) + conversation_id = Column( + Integer, + ForeignKey("conversations.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + + # Polymorphic sender reference + sender_type = Column(Enum(ParticipantType), nullable=False) + sender_id = Column(Integer, nullable=False, index=True) + + # Message content + content = Column(Text, nullable=False) + + # System messages (e.g., "conversation closed") + is_system_message = Column(Boolean, default=False, nullable=False) + + # Soft delete for moderation + is_deleted = Column(Boolean, default=False, nullable=False) + deleted_at = Column(DateTime, nullable=True) + deleted_by_type = Column(Enum(ParticipantType), nullable=True) + deleted_by_id = Column(Integer, nullable=True) + + # Relationships + conversation = relationship("Conversation", back_populates="messages") + attachments = relationship( + "MessageAttachment", + back_populates="message", + cascade="all, delete-orphan", + ) + + __table_args__ = ( + Index("ix_messages_conversation_created", "conversation_id", "created_at"), + ) + + def __repr__(self) -> str: + return ( + f"" + ) + + +class MessageAttachment(Base, TimestampMixin): + """ + File attachments for messages. + + Files are stored in platform storage (local/S3) with references here. + """ + + __tablename__ = "message_attachments" + + id = Column(Integer, primary_key=True, index=True) + message_id = Column( + Integer, + ForeignKey("messages.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + + # File metadata + filename = Column(String(255), nullable=False) + original_filename = Column(String(255), nullable=False) + file_path = Column(String(1000), nullable=False) # Storage path + file_size = Column(Integer, nullable=False) # Size in bytes + mime_type = Column(String(100), nullable=False) + + # For image attachments + is_image = Column(Boolean, default=False, nullable=False) + image_width = Column(Integer, nullable=True) + image_height = Column(Integer, nullable=True) + thumbnail_path = Column(String(1000), nullable=True) + + # Relationships + message = relationship("Message", back_populates="attachments") + + def __repr__(self) -> str: + return f"" diff --git a/models/schema/__init__.py b/models/schema/__init__.py index 0c1ecc7d..225037ad 100644 --- a/models/schema/__init__.py +++ b/models/schema/__init__.py @@ -8,6 +8,7 @@ from . import ( inventory, marketplace_import_job, marketplace_product, + message, stats, vendor, ) @@ -19,6 +20,7 @@ __all__ = [ "base", "auth", "marketplace_product", + "message", "inventory", "vendor", "marketplace_import_job", diff --git a/models/schema/message.py b/models/schema/message.py new file mode 100644 index 00000000..7f0682e2 --- /dev/null +++ b/models/schema/message.py @@ -0,0 +1,308 @@ +# models/schema/message.py +""" +Pydantic schemas for the messaging system. + +Supports three communication channels: +- Admin <-> Vendor +- Vendor <-> Customer +- Admin <-> Customer +""" + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + +from models.database.message import ConversationType, ParticipantType + + +# ============================================================================ +# Attachment Schemas +# ============================================================================ + + +class AttachmentResponse(BaseModel): + """Schema for message attachment in responses.""" + + model_config = ConfigDict(from_attributes=True) + + id: int + filename: str + original_filename: str + file_size: int + mime_type: str + is_image: bool + image_width: int | None = None + image_height: int | None = None + download_url: str | None = None + thumbnail_url: str | None = None + + @property + def file_size_display(self) -> str: + """Human-readable file size.""" + if self.file_size < 1024: + return f"{self.file_size} B" + elif self.file_size < 1024 * 1024: + return f"{self.file_size / 1024:.1f} KB" + else: + return f"{self.file_size / 1024 / 1024:.1f} MB" + + +# ============================================================================ +# Message Schemas +# ============================================================================ + + +class MessageCreate(BaseModel): + """Schema for sending a new message.""" + + content: str = Field(..., min_length=1, max_length=10000) + + +class MessageResponse(BaseModel): + """Schema for a single message in responses.""" + + model_config = ConfigDict(from_attributes=True) + + id: int + conversation_id: int + sender_type: ParticipantType + sender_id: int + content: str + is_system_message: bool + is_deleted: bool + created_at: datetime + + # Enriched sender info (populated by API) + sender_name: str | None = None + sender_email: str | None = None + + # Attachments + attachments: list[AttachmentResponse] = [] + + +# ============================================================================ +# Participant Schemas +# ============================================================================ + + +class ParticipantInfo(BaseModel): + """Schema for participant information.""" + + id: int + type: ParticipantType + name: str + email: str | None = None + avatar_url: str | None = None + + +class ParticipantResponse(BaseModel): + """Schema for conversation participant in responses.""" + + model_config = ConfigDict(from_attributes=True) + + id: int + participant_type: ParticipantType + participant_id: int + unread_count: int + last_read_at: datetime | None + email_notifications: bool + muted: bool + + # Enriched info (populated by API) + participant_info: ParticipantInfo | None = None + + +# ============================================================================ +# Conversation Schemas +# ============================================================================ + + +class ConversationCreate(BaseModel): + """Schema for creating a new conversation.""" + + conversation_type: ConversationType + subject: str = Field(..., min_length=1, max_length=500) + recipient_type: ParticipantType + recipient_id: int + vendor_id: int | None = None + initial_message: str | None = Field(None, min_length=1, max_length=10000) + + +class ConversationSummary(BaseModel): + """Schema for conversation in list views.""" + + model_config = ConfigDict(from_attributes=True) + + id: int + conversation_type: ConversationType + subject: str + vendor_id: int | None = None + is_closed: bool + closed_at: datetime | None + last_message_at: datetime | None + message_count: int + created_at: datetime + + # Unread count for current user (from participant) + unread_count: int = 0 + + # Other participant info (enriched by API) + other_participant: ParticipantInfo | None = None + + # Last message preview + last_message_preview: str | None = None + + +class ConversationDetailResponse(BaseModel): + """Schema for full conversation detail with messages.""" + + model_config = ConfigDict(from_attributes=True) + + id: int + conversation_type: ConversationType + subject: str + vendor_id: int | None = None + is_closed: bool + closed_at: datetime | None + closed_by_type: ParticipantType | None = None + closed_by_id: int | None = None + last_message_at: datetime | None + message_count: int + created_at: datetime + updated_at: datetime + + # Participants with enriched info + participants: list[ParticipantResponse] = [] + + # Messages ordered by created_at + messages: list[MessageResponse] = [] + + # Current user's unread count + unread_count: int = 0 + + # Vendor info if applicable + vendor_name: str | None = None + + +class ConversationListResponse(BaseModel): + """Schema for paginated conversation list.""" + + conversations: list[ConversationSummary] + total: int + total_unread: int + skip: int + limit: int + + +# ============================================================================ +# Unread Count Schemas +# ============================================================================ + + +class UnreadCountResponse(BaseModel): + """Schema for unread message count (for header badge).""" + + total_unread: int + + +# ============================================================================ +# Notification Preferences Schemas +# ============================================================================ + + +class NotificationPreferencesUpdate(BaseModel): + """Schema for updating notification preferences.""" + + email_notifications: bool | None = None + muted: bool | None = None + + +# ============================================================================ +# Conversation Action Schemas +# ============================================================================ + + +class CloseConversationResponse(BaseModel): + """Response after closing a conversation.""" + + success: bool + message: str + conversation_id: int + + +class ReopenConversationResponse(BaseModel): + """Response after reopening a conversation.""" + + success: bool + message: str + conversation_id: int + + +class MarkReadResponse(BaseModel): + """Response after marking conversation as read.""" + + success: bool + conversation_id: int + unread_count: int + + +# ============================================================================ +# Recipient Selection Schemas (for compose modal) +# ============================================================================ + + +class RecipientOption(BaseModel): + """Schema for a selectable recipient in compose modal.""" + + id: int + type: ParticipantType + name: str + email: str | None = None + vendor_id: int | None = None # For vendor users + vendor_name: str | None = None + + +class RecipientListResponse(BaseModel): + """Schema for list of available recipients.""" + + recipients: list[RecipientOption] + total: int + + +# ============================================================================ +# Admin-specific Schemas +# ============================================================================ + + +class AdminConversationSummary(ConversationSummary): + """Extended conversation summary with vendor info for admin views.""" + + vendor_name: str | None = None + vendor_code: str | None = None + + +class AdminConversationListResponse(BaseModel): + """Schema for admin conversation list with vendor info.""" + + conversations: list[AdminConversationSummary] + total: int + total_unread: int + skip: int + limit: int + + +class AdminMessageStats(BaseModel): + """Messaging statistics for admin dashboard.""" + + total_conversations: int = 0 + open_conversations: int = 0 + closed_conversations: int = 0 + total_messages: int = 0 + + # By type + admin_vendor_conversations: int = 0 + vendor_customer_conversations: int = 0 + admin_customer_conversations: int = 0 + + # Unread + unread_admin: int = 0