feat: add messaging system database models and core services

- Add Conversation, ConversationParticipant, Message, MessageAttachment models
- Add ConversationType enum (admin_vendor, vendor_customer, admin_customer)
- Add ParticipantType enum (admin, vendor, customer)
- Add Alembic migration for messaging tables
- Add MessagingService for conversation/message operations
- Add MessageAttachmentService for file upload handling
- Add message-related exceptions (ConversationNotFoundException, etc.)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-21 14:08:31 +01:00
parent 807cee57b2
commit 8b7d2fe312
9 changed files with 1806 additions and 0 deletions

View File

@@ -0,0 +1,339 @@
"""add_messaging_tables
Revision ID: e3f4a5b6c7d8
Revises: c9e22eadf533
Create Date: 2025-12-21
This migration adds the messaging system tables:
- conversations: Threaded conversation threads
- conversation_participants: Links users/customers to conversations
- messages: Individual messages within conversations
- message_attachments: File attachments for messages
Supports three communication channels:
- Admin <-> Vendor
- Vendor <-> Customer
- Admin <-> Customer
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
revision: str = "e3f4a5b6c7d8"
down_revision: Union[str, None] = "c9e22eadf533"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def table_exists(table_name: str) -> bool:
"""Check if a table exists in the database."""
bind = op.get_bind()
inspector = inspect(bind)
return table_name in inspector.get_table_names()
def index_exists(index_name: str, table_name: str) -> bool:
"""Check if an index exists on a table."""
bind = op.get_bind()
inspector = inspect(bind)
try:
indexes = inspector.get_indexes(table_name)
return any(idx["name"] == index_name for idx in indexes)
except Exception:
return False
def upgrade() -> None:
# =========================================================================
# Step 1: Create conversations table
# =========================================================================
if not table_exists("conversations"):
op.create_table(
"conversations",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"conversation_type",
sa.Enum(
"admin_vendor",
"vendor_customer",
"admin_customer",
name="conversationtype",
),
nullable=False,
),
sa.Column("subject", sa.String(length=500), nullable=False),
sa.Column("vendor_id", sa.Integer(), nullable=True),
sa.Column("is_closed", sa.Boolean(), nullable=False, server_default="0"),
sa.Column("closed_at", sa.DateTime(), nullable=True),
sa.Column(
"closed_by_type",
sa.Enum("admin", "vendor", "customer", name="participanttype"),
nullable=True,
),
sa.Column("closed_by_id", sa.Integer(), nullable=True),
sa.Column("last_message_at", sa.DateTime(), nullable=True),
sa.Column("message_count", sa.Integer(), nullable=False, server_default="0"),
sa.Column(
"created_at",
sa.DateTime(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.Column(
"updated_at",
sa.DateTime(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.ForeignKeyConstraint(
["vendor_id"],
["vendors.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_conversations_id"), "conversations", ["id"], unique=False
)
op.create_index(
op.f("ix_conversations_conversation_type"),
"conversations",
["conversation_type"],
unique=False,
)
op.create_index(
op.f("ix_conversations_vendor_id"),
"conversations",
["vendor_id"],
unique=False,
)
op.create_index(
op.f("ix_conversations_last_message_at"),
"conversations",
["last_message_at"],
unique=False,
)
op.create_index(
"ix_conversations_type_vendor",
"conversations",
["conversation_type", "vendor_id"],
unique=False,
)
# =========================================================================
# Step 2: Create conversation_participants table
# =========================================================================
if not table_exists("conversation_participants"):
op.create_table(
"conversation_participants",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("conversation_id", sa.Integer(), nullable=False),
sa.Column(
"participant_type",
sa.Enum("admin", "vendor", "customer", name="participanttype"),
nullable=False,
),
sa.Column("participant_id", sa.Integer(), nullable=False),
sa.Column("vendor_id", sa.Integer(), nullable=True),
sa.Column("unread_count", sa.Integer(), nullable=False, server_default="0"),
sa.Column("last_read_at", sa.DateTime(), nullable=True),
sa.Column(
"email_notifications", sa.Boolean(), nullable=False, server_default="1"
),
sa.Column("muted", sa.Boolean(), nullable=False, server_default="0"),
sa.Column(
"created_at",
sa.DateTime(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.Column(
"updated_at",
sa.DateTime(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.ForeignKeyConstraint(
["conversation_id"],
["conversations.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["vendor_id"],
["vendors.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"conversation_id",
"participant_type",
"participant_id",
name="uq_conversation_participant",
),
)
op.create_index(
op.f("ix_conversation_participants_id"),
"conversation_participants",
["id"],
unique=False,
)
op.create_index(
op.f("ix_conversation_participants_conversation_id"),
"conversation_participants",
["conversation_id"],
unique=False,
)
op.create_index(
op.f("ix_conversation_participants_participant_id"),
"conversation_participants",
["participant_id"],
unique=False,
)
op.create_index(
"ix_participant_lookup",
"conversation_participants",
["participant_type", "participant_id"],
unique=False,
)
# =========================================================================
# Step 3: Create messages table
# =========================================================================
if not table_exists("messages"):
op.create_table(
"messages",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("conversation_id", sa.Integer(), nullable=False),
sa.Column(
"sender_type",
sa.Enum("admin", "vendor", "customer", name="participanttype"),
nullable=False,
),
sa.Column("sender_id", sa.Integer(), nullable=False),
sa.Column("content", sa.Text(), nullable=False),
sa.Column(
"is_system_message", sa.Boolean(), nullable=False, server_default="0"
),
sa.Column("is_deleted", sa.Boolean(), nullable=False, server_default="0"),
sa.Column("deleted_at", sa.DateTime(), nullable=True),
sa.Column(
"deleted_by_type",
sa.Enum("admin", "vendor", "customer", name="participanttype"),
nullable=True,
),
sa.Column("deleted_by_id", sa.Integer(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.Column(
"updated_at",
sa.DateTime(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.ForeignKeyConstraint(
["conversation_id"],
["conversations.id"],
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_messages_id"), "messages", ["id"], unique=False)
op.create_index(
op.f("ix_messages_conversation_id"),
"messages",
["conversation_id"],
unique=False,
)
op.create_index(
op.f("ix_messages_sender_id"), "messages", ["sender_id"], unique=False
)
op.create_index(
"ix_messages_conversation_created",
"messages",
["conversation_id", "created_at"],
unique=False,
)
# =========================================================================
# Step 4: Create message_attachments table
# =========================================================================
if not table_exists("message_attachments"):
op.create_table(
"message_attachments",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("message_id", sa.Integer(), nullable=False),
sa.Column("filename", sa.String(length=255), nullable=False),
sa.Column("original_filename", sa.String(length=255), nullable=False),
sa.Column("file_path", sa.String(length=1000), nullable=False),
sa.Column("file_size", sa.Integer(), nullable=False),
sa.Column("mime_type", sa.String(length=100), nullable=False),
sa.Column("is_image", sa.Boolean(), nullable=False, server_default="0"),
sa.Column("image_width", sa.Integer(), nullable=True),
sa.Column("image_height", sa.Integer(), nullable=True),
sa.Column("thumbnail_path", sa.String(length=1000), nullable=True),
sa.Column(
"created_at",
sa.DateTime(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.Column(
"updated_at",
sa.DateTime(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.ForeignKeyConstraint(
["message_id"],
["messages.id"],
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_message_attachments_id"),
"message_attachments",
["id"],
unique=False,
)
op.create_index(
op.f("ix_message_attachments_message_id"),
"message_attachments",
["message_id"],
unique=False,
)
# =========================================================================
# Step 5: Add platform setting for attachment size limit
# =========================================================================
# Note: This will be added via seed script or manually
# Key: message_attachment_max_size_mb
# Value: 10
# Category: messaging
def downgrade() -> None:
# Drop tables in reverse order (respecting foreign keys)
if table_exists("message_attachments"):
op.drop_table("message_attachments")
if table_exists("messages"):
op.drop_table("messages")
if table_exists("conversation_participants"):
op.drop_table("conversation_participants")
if table_exists("conversations"):
op.drop_table("conversations")
# Note: Enum types are not dropped automatically
# They can be manually dropped with:
# op.execute("DROP TYPE IF EXISTS conversationtype")
# op.execute("DROP TYPE IF EXISTS participanttype")

View File

@@ -101,6 +101,15 @@ from .inventory import (
NegativeInventoryException,
)
# Message exceptions
from .message import (
ConversationClosedException,
ConversationNotFoundException,
MessageAttachmentException,
MessageNotFoundException,
UnauthorizedConversationAccessException,
)
# Marketplace import job exceptions
from .marketplace_import_job import (
ImportJobAlreadyProcessingException,
@@ -374,4 +383,10 @@ __all__ = [
"ScanParseException",
"ViolationOperationException",
"InvalidViolationStatusException",
# Message exceptions
"ConversationNotFoundException",
"MessageNotFoundException",
"ConversationClosedException",
"MessageAttachmentException",
"UnauthorizedConversationAccessException",
]

63
app/exceptions/message.py Normal file
View File

@@ -0,0 +1,63 @@
# app/exceptions/message.py
"""
Messaging specific exceptions.
"""
from .base import BusinessLogicException, ResourceNotFoundException, ValidationException
class ConversationNotFoundException(ResourceNotFoundException):
"""Raised when a conversation is not found."""
def __init__(self, conversation_identifier: str):
super().__init__(
resource_type="Conversation",
identifier=conversation_identifier,
message=f"Conversation '{conversation_identifier}' not found",
error_code="CONVERSATION_NOT_FOUND",
)
class MessageNotFoundException(ResourceNotFoundException):
"""Raised when a message is not found."""
def __init__(self, message_identifier: str):
super().__init__(
resource_type="Message",
identifier=message_identifier,
message=f"Message '{message_identifier}' not found",
error_code="MESSAGE_NOT_FOUND",
)
class ConversationClosedException(BusinessLogicException):
"""Raised when trying to send message to a closed conversation."""
def __init__(self, conversation_id: int):
super().__init__(
message=f"Cannot send message to closed conversation {conversation_id}",
error_code="CONVERSATION_CLOSED",
details={"conversation_id": conversation_id},
)
class MessageAttachmentException(ValidationException):
"""Raised when attachment validation fails."""
def __init__(self, message: str, details: dict | None = None):
super().__init__(
message=message,
error_code="MESSAGE_ATTACHMENT_INVALID",
details=details,
)
class UnauthorizedConversationAccessException(BusinessLogicException):
"""Raised when user tries to access a conversation they don't have access to."""
def __init__(self, conversation_id: int):
super().__init__(
message=f"You do not have access to conversation {conversation_id}",
error_code="CONVERSATION_ACCESS_DENIED",
details={"conversation_id": conversation_id},
)

View File

@@ -0,0 +1,225 @@
# app/services/message_attachment_service.py
"""
Attachment handling service for messaging system.
Handles file upload, validation, storage, and retrieval.
"""
import logging
import os
import uuid
from datetime import datetime
from pathlib import Path
from fastapi import UploadFile
from sqlalchemy.orm import Session
from app.services.admin_settings_service import admin_settings_service
logger = logging.getLogger(__name__)
# Allowed MIME types for attachments
ALLOWED_MIME_TYPES = {
# Images
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
# Documents
"application/pdf",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
# Archives
"application/zip",
# Text
"text/plain",
"text/csv",
}
IMAGE_MIME_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
# Default max file size in MB
DEFAULT_MAX_FILE_SIZE_MB = 10
class MessageAttachmentService:
"""Service for handling message attachments."""
def __init__(self, storage_base: str = "uploads/messages"):
self.storage_base = storage_base
def get_max_file_size_bytes(self, db: Session) -> int:
"""Get maximum file size from platform settings."""
max_mb = admin_settings_service.get_setting_value(
db,
"message_attachment_max_size_mb",
default=DEFAULT_MAX_FILE_SIZE_MB,
)
try:
max_mb = int(max_mb)
except (TypeError, ValueError):
max_mb = DEFAULT_MAX_FILE_SIZE_MB
return max_mb * 1024 * 1024 # Convert to bytes
def validate_file_type(self, mime_type: str) -> bool:
"""Check if file type is allowed."""
return mime_type in ALLOWED_MIME_TYPES
def is_image(self, mime_type: str) -> bool:
"""Check if file is an image."""
return mime_type in IMAGE_MIME_TYPES
async def validate_and_store(
self,
db: Session,
file: UploadFile,
conversation_id: int,
) -> dict:
"""
Validate and store an uploaded file.
Returns dict with file metadata for MessageAttachment creation.
Raises:
ValueError: If file type or size is invalid
"""
# Validate MIME type
content_type = file.content_type or "application/octet-stream"
if not self.validate_file_type(content_type):
raise ValueError(
f"File type '{content_type}' not allowed. "
"Allowed types: images (JPEG, PNG, GIF, WebP), "
"PDF, Office documents, ZIP, text files."
)
# Read file content
content = await file.read()
file_size = len(content)
# Validate file size
max_size = self.get_max_file_size_bytes(db)
if file_size > max_size:
raise ValueError(
f"File size {file_size / 1024 / 1024:.1f}MB exceeds "
f"maximum allowed size of {max_size / 1024 / 1024:.1f}MB"
)
# Generate unique filename
original_filename = file.filename or "attachment"
ext = Path(original_filename).suffix.lower()
unique_filename = f"{uuid.uuid4().hex}{ext}"
# Create storage path: uploads/messages/YYYY/MM/conversation_id/filename
now = datetime.utcnow()
relative_path = os.path.join(
self.storage_base,
str(now.year),
f"{now.month:02d}",
str(conversation_id),
)
# Ensure directory exists
os.makedirs(relative_path, exist_ok=True)
# Full file path
file_path = os.path.join(relative_path, unique_filename)
# Write file
with open(file_path, "wb") as f:
f.write(content)
# Prepare metadata
is_image = self.is_image(content_type)
metadata = {
"filename": unique_filename,
"original_filename": original_filename,
"file_path": file_path,
"file_size": file_size,
"mime_type": content_type,
"is_image": is_image,
}
# Generate thumbnail for images
if is_image:
thumbnail_data = self._create_thumbnail(content, file_path)
metadata.update(thumbnail_data)
logger.info(
f"Stored attachment {unique_filename} for conversation {conversation_id} "
f"({file_size} bytes, type: {content_type})"
)
return metadata
def _create_thumbnail(self, content: bytes, original_path: str) -> dict:
"""Create thumbnail for image attachments."""
try:
from PIL import Image
import io
img = Image.open(io.BytesIO(content))
width, height = img.size
# Create thumbnail
img.thumbnail((200, 200))
thumb_path = original_path.replace(".", "_thumb.")
img.save(thumb_path)
return {
"image_width": width,
"image_height": height,
"thumbnail_path": thumb_path,
}
except ImportError:
logger.warning("PIL not installed, skipping thumbnail generation")
return {}
except Exception as e:
logger.error(f"Failed to create thumbnail: {e}")
return {}
def delete_attachment(
self, file_path: str, thumbnail_path: str | None = None
) -> bool:
"""Delete attachment files from storage."""
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Deleted attachment file: {file_path}")
if thumbnail_path and os.path.exists(thumbnail_path):
os.remove(thumbnail_path)
logger.info(f"Deleted thumbnail: {thumbnail_path}")
return True
except Exception as e:
logger.error(f"Failed to delete attachment {file_path}: {e}")
return False
def get_download_url(self, file_path: str) -> str:
"""
Get download URL for an attachment.
For local storage, returns a relative path that can be served
by the static file handler or a dedicated download endpoint.
"""
# Convert local path to URL path
# Assumes files are served from /static/uploads or similar
return f"/static/{file_path}"
def get_file_content(self, file_path: str) -> bytes | None:
"""Read file content from storage."""
try:
if os.path.exists(file_path):
with open(file_path, "rb") as f:
return f.read()
return None
except Exception as e:
logger.error(f"Failed to read file {file_path}: {e}")
return None
# Singleton instance
message_attachment_service = MessageAttachmentService()

View File

@@ -0,0 +1,572 @@
# app/services/messaging_service.py
"""
Messaging service for conversation and message management.
Provides functionality for:
- Creating conversations between different participant types
- Sending messages with attachments
- Managing read status and unread counts
- Conversation listing with filters
- Multi-tenant data isolation
"""
import logging
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import and_, func, or_
from sqlalchemy.orm import Session, joinedload
from models.database.customer import Customer
from models.database.message import (
Conversation,
ConversationParticipant,
ConversationType,
Message,
MessageAttachment,
ParticipantType,
)
from models.database.user import User
logger = logging.getLogger(__name__)
class MessagingService:
"""Service for managing conversations and messages."""
# =========================================================================
# CONVERSATION MANAGEMENT
# =========================================================================
def create_conversation(
self,
db: Session,
conversation_type: ConversationType,
subject: str,
initiator_type: ParticipantType,
initiator_id: int,
recipient_type: ParticipantType,
recipient_id: int,
vendor_id: int | None = None,
initial_message: str | None = None,
) -> Conversation:
"""
Create a new conversation between two participants.
Args:
db: Database session
conversation_type: Type of conversation channel
subject: Conversation subject line
initiator_type: Type of initiating participant
initiator_id: ID of initiating participant
recipient_type: Type of receiving participant
recipient_id: ID of receiving participant
vendor_id: Required for vendor_customer/admin_customer types
initial_message: Optional first message content
Returns:
Created Conversation object
"""
# Validate vendor_id requirement
if conversation_type in [
ConversationType.VENDOR_CUSTOMER,
ConversationType.ADMIN_CUSTOMER,
]:
if not vendor_id:
raise ValueError(
f"vendor_id required for {conversation_type.value} conversations"
)
# Create conversation
conversation = Conversation(
conversation_type=conversation_type,
subject=subject,
vendor_id=vendor_id,
)
db.add(conversation)
db.flush()
# Add participants
initiator_vendor_id = (
vendor_id if initiator_type == ParticipantType.VENDOR else None
)
recipient_vendor_id = (
vendor_id if recipient_type == ParticipantType.VENDOR else None
)
initiator = ConversationParticipant(
conversation_id=conversation.id,
participant_type=initiator_type,
participant_id=initiator_id,
vendor_id=initiator_vendor_id,
unread_count=0, # Initiator has read their own message
)
recipient = ConversationParticipant(
conversation_id=conversation.id,
participant_type=recipient_type,
participant_id=recipient_id,
vendor_id=recipient_vendor_id,
unread_count=1 if initial_message else 0,
)
db.add(initiator)
db.add(recipient)
db.flush()
# Add initial message if provided
if initial_message:
self.send_message(
db=db,
conversation_id=conversation.id,
sender_type=initiator_type,
sender_id=initiator_id,
content=initial_message,
_skip_unread_update=True, # Already set above
)
logger.info(
f"Created {conversation_type.value} conversation {conversation.id}: "
f"{initiator_type.value}:{initiator_id} -> {recipient_type.value}:{recipient_id}"
)
return conversation
def get_conversation(
self,
db: Session,
conversation_id: int,
participant_type: ParticipantType,
participant_id: int,
) -> Conversation | None:
"""
Get conversation if participant has access.
Validates that the requester is a participant.
"""
conversation = (
db.query(Conversation)
.options(
joinedload(Conversation.participants),
joinedload(Conversation.messages).joinedload(Message.attachments),
)
.filter(Conversation.id == conversation_id)
.first()
)
if not conversation:
return None
# Verify participant access
has_access = any(
p.participant_type == participant_type
and p.participant_id == participant_id
for p in conversation.participants
)
if not has_access:
logger.warning(
f"Access denied to conversation {conversation_id} for "
f"{participant_type.value}:{participant_id}"
)
return None
return conversation
def list_conversations(
self,
db: Session,
participant_type: ParticipantType,
participant_id: int,
vendor_id: int | None = None,
conversation_type: ConversationType | None = None,
is_closed: bool | None = None,
skip: int = 0,
limit: int = 20,
) -> tuple[list[Conversation], int, int]:
"""
List conversations for a participant with filters.
Returns:
Tuple of (conversations, total_count, total_unread)
"""
# Base query: conversations where user is a participant
query = (
db.query(Conversation)
.join(ConversationParticipant)
.filter(
and_(
ConversationParticipant.participant_type == participant_type,
ConversationParticipant.participant_id == participant_id,
)
)
)
# Multi-tenant filter for vendor users
if participant_type == ParticipantType.VENDOR and vendor_id:
query = query.filter(ConversationParticipant.vendor_id == vendor_id)
# Customer vendor isolation
if participant_type == ParticipantType.CUSTOMER and vendor_id:
query = query.filter(Conversation.vendor_id == vendor_id)
# Type filter
if conversation_type:
query = query.filter(Conversation.conversation_type == conversation_type)
# Status filter
if is_closed is not None:
query = query.filter(Conversation.is_closed == is_closed)
# Get total count
total = query.count()
# Get total unread across all conversations
unread_query = db.query(
func.sum(ConversationParticipant.unread_count)
).filter(
and_(
ConversationParticipant.participant_type == participant_type,
ConversationParticipant.participant_id == participant_id,
)
)
if participant_type == ParticipantType.VENDOR and vendor_id:
unread_query = unread_query.filter(
ConversationParticipant.vendor_id == vendor_id
)
total_unread = unread_query.scalar() or 0
# Get paginated results, ordered by last activity
conversations = (
query.options(joinedload(Conversation.participants))
.order_by(Conversation.last_message_at.desc().nullslast())
.offset(skip)
.limit(limit)
.all()
)
return conversations, total, total_unread
def close_conversation(
self,
db: Session,
conversation_id: int,
closer_type: ParticipantType,
closer_id: int,
) -> Conversation | None:
"""Close a conversation thread."""
conversation = self.get_conversation(
db, conversation_id, closer_type, closer_id
)
if not conversation:
return None
conversation.is_closed = True
conversation.closed_at = datetime.now(UTC)
conversation.closed_by_type = closer_type
conversation.closed_by_id = closer_id
# Add system message
self.send_message(
db=db,
conversation_id=conversation_id,
sender_type=closer_type,
sender_id=closer_id,
content="Conversation closed",
is_system_message=True,
)
db.flush()
return conversation
def reopen_conversation(
self,
db: Session,
conversation_id: int,
opener_type: ParticipantType,
opener_id: int,
) -> Conversation | None:
"""Reopen a closed conversation."""
conversation = self.get_conversation(
db, conversation_id, opener_type, opener_id
)
if not conversation:
return None
conversation.is_closed = False
conversation.closed_at = None
conversation.closed_by_type = None
conversation.closed_by_id = None
# Add system message
self.send_message(
db=db,
conversation_id=conversation_id,
sender_type=opener_type,
sender_id=opener_id,
content="Conversation reopened",
is_system_message=True,
)
db.flush()
return conversation
# =========================================================================
# MESSAGE MANAGEMENT
# =========================================================================
def send_message(
self,
db: Session,
conversation_id: int,
sender_type: ParticipantType,
sender_id: int,
content: str,
attachments: list[dict[str, Any]] | None = None,
is_system_message: bool = False,
_skip_unread_update: bool = False,
) -> Message:
"""
Send a message in a conversation.
Args:
db: Database session
conversation_id: Target conversation ID
sender_type: Type of sender
sender_id: ID of sender
content: Message text content
attachments: List of attachment dicts with file metadata
is_system_message: Whether this is a system-generated message
_skip_unread_update: Internal flag to skip unread increment
Returns:
Created Message object
"""
# Create message
message = Message(
conversation_id=conversation_id,
sender_type=sender_type,
sender_id=sender_id,
content=content,
is_system_message=is_system_message,
)
db.add(message)
db.flush()
# Add attachments if any
if attachments:
for att_data in attachments:
attachment = MessageAttachment(
message_id=message.id,
filename=att_data["filename"],
original_filename=att_data["original_filename"],
file_path=att_data["file_path"],
file_size=att_data["file_size"],
mime_type=att_data["mime_type"],
is_image=att_data.get("is_image", False),
image_width=att_data.get("image_width"),
image_height=att_data.get("image_height"),
thumbnail_path=att_data.get("thumbnail_path"),
)
db.add(attachment)
# Update conversation metadata
conversation = (
db.query(Conversation).filter(Conversation.id == conversation_id).first()
)
if conversation:
conversation.last_message_at = datetime.now(UTC)
conversation.message_count += 1
# Update unread counts for other participants
if not _skip_unread_update:
db.query(ConversationParticipant).filter(
and_(
ConversationParticipant.conversation_id == conversation_id,
or_(
ConversationParticipant.participant_type != sender_type,
ConversationParticipant.participant_id != sender_id,
),
)
).update(
{
ConversationParticipant.unread_count: ConversationParticipant.unread_count
+ 1
}
)
db.flush()
logger.info(
f"Message {message.id} sent in conversation {conversation_id} "
f"by {sender_type.value}:{sender_id}"
)
return message
def delete_message(
self,
db: Session,
message_id: int,
deleter_type: ParticipantType,
deleter_id: int,
) -> Message | None:
"""Soft delete a message (for moderation)."""
message = db.query(Message).filter(Message.id == message_id).first()
if not message:
return None
# Verify deleter has access to conversation
conversation = self.get_conversation(
db, message.conversation_id, deleter_type, deleter_id
)
if not conversation:
return None
message.is_deleted = True
message.deleted_at = datetime.now(UTC)
message.deleted_by_type = deleter_type
message.deleted_by_id = deleter_id
db.flush()
return message
def mark_conversation_read(
self,
db: Session,
conversation_id: int,
reader_type: ParticipantType,
reader_id: int,
) -> bool:
"""Mark all messages in conversation as read for participant."""
result = (
db.query(ConversationParticipant)
.filter(
and_(
ConversationParticipant.conversation_id == conversation_id,
ConversationParticipant.participant_type == reader_type,
ConversationParticipant.participant_id == reader_id,
)
)
.update(
{
ConversationParticipant.unread_count: 0,
ConversationParticipant.last_read_at: datetime.now(UTC),
}
)
)
db.flush()
return result > 0
def get_unread_count(
self,
db: Session,
participant_type: ParticipantType,
participant_id: int,
vendor_id: int | None = None,
) -> int:
"""Get total unread message count for a participant."""
query = db.query(func.sum(ConversationParticipant.unread_count)).filter(
and_(
ConversationParticipant.participant_type == participant_type,
ConversationParticipant.participant_id == participant_id,
)
)
if vendor_id:
query = query.filter(ConversationParticipant.vendor_id == vendor_id)
return query.scalar() or 0
# =========================================================================
# PARTICIPANT HELPERS
# =========================================================================
def get_participant_info(
self,
db: Session,
participant_type: ParticipantType,
participant_id: int,
) -> dict[str, Any] | None:
"""Get display info for a participant (name, email, avatar)."""
if participant_type in [ParticipantType.ADMIN, ParticipantType.VENDOR]:
user = db.query(User).filter(User.id == participant_id).first()
if user:
return {
"id": user.id,
"type": participant_type.value,
"name": f"{user.first_name or ''} {user.last_name or ''}".strip()
or user.username,
"email": user.email,
"avatar_url": None, # Could add avatar support later
}
elif participant_type == ParticipantType.CUSTOMER:
customer = db.query(Customer).filter(Customer.id == participant_id).first()
if customer:
return {
"id": customer.id,
"type": participant_type.value,
"name": f"{customer.first_name or ''} {customer.last_name or ''}".strip()
or customer.email,
"email": customer.email,
"avatar_url": None,
}
return None
def get_other_participant(
self,
conversation: Conversation,
my_type: ParticipantType,
my_id: int,
) -> ConversationParticipant | None:
"""Get the other participant in a conversation."""
for p in conversation.participants:
if p.participant_type != my_type or p.participant_id != my_id:
return p
return None
# =========================================================================
# NOTIFICATION PREFERENCES
# =========================================================================
def update_notification_preferences(
self,
db: Session,
conversation_id: int,
participant_type: ParticipantType,
participant_id: int,
email_notifications: bool | None = None,
muted: bool | None = None,
) -> bool:
"""Update notification preferences for a participant in a conversation."""
updates = {}
if email_notifications is not None:
updates[ConversationParticipant.email_notifications] = email_notifications
if muted is not None:
updates[ConversationParticipant.muted] = muted
if not updates:
return False
result = (
db.query(ConversationParticipant)
.filter(
and_(
ConversationParticipant.conversation_id == conversation_id,
ConversationParticipant.participant_type == participant_type,
ConversationParticipant.participant_id == participant_id,
)
)
.update(updates)
)
db.flush()
return result > 0
# Singleton instance
messaging_service = MessagingService()

View File

@@ -26,6 +26,14 @@ from .letzshop import (
VendorLetzshopCredentials,
)
from .marketplace_import_job import MarketplaceImportError, MarketplaceImportJob
from .message import (
Conversation,
ConversationParticipant,
ConversationType,
Message,
MessageAttachment,
ParticipantType,
)
from .marketplace_product import (
DigitalDeliveryMethod,
MarketplaceProduct,
@@ -96,4 +104,11 @@ __all__ = [
"LetzshopFulfillmentQueue",
"LetzshopSyncLog",
"LetzshopHistoricalImportJob",
# Messaging
"Conversation",
"ConversationParticipant",
"ConversationType",
"Message",
"MessageAttachment",
"ParticipantType",
]

267
models/database/message.py Normal file
View File

@@ -0,0 +1,267 @@
# models/database/message.py
"""
Messaging system database models.
Supports three communication channels:
- Admin <-> Vendor
- Vendor <-> Customer
- Admin <-> Customer
Multi-tenant isolation is enforced via vendor_id for conversations
involving customers.
"""
import enum
from datetime import datetime
from sqlalchemy import (
Boolean,
Column,
DateTime,
Enum,
ForeignKey,
Index,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import relationship
from app.core.database import Base
from models.database.base import TimestampMixin
class ConversationType(str, enum.Enum):
"""Defines the three supported conversation channels."""
ADMIN_VENDOR = "admin_vendor"
VENDOR_CUSTOMER = "vendor_customer"
ADMIN_CUSTOMER = "admin_customer"
class ParticipantType(str, enum.Enum):
"""Type of participant in a conversation."""
ADMIN = "admin" # User with role="admin"
VENDOR = "vendor" # User with role="vendor" (via VendorUser)
CUSTOMER = "customer" # Customer model
class Conversation(Base, TimestampMixin):
"""
Represents a threaded conversation between participants.
Multi-tenancy: vendor_id is required for vendor_customer and admin_customer
conversations to ensure customer data isolation.
"""
__tablename__ = "conversations"
id = Column(Integer, primary_key=True, index=True)
# Conversation type determines participant structure
conversation_type = Column(
Enum(ConversationType),
nullable=False,
index=True,
)
# Subject line for the conversation thread
subject = Column(String(500), nullable=False)
# For vendor_customer and admin_customer conversations
# Required for multi-tenant data isolation
vendor_id = Column(
Integer,
ForeignKey("vendors.id"),
nullable=True,
index=True,
)
# Status flags
is_closed = Column(Boolean, default=False, nullable=False)
closed_at = Column(DateTime, nullable=True)
closed_by_type = Column(Enum(ParticipantType), nullable=True)
closed_by_id = Column(Integer, nullable=True)
# Last activity tracking for sorting
last_message_at = Column(DateTime, nullable=True, index=True)
message_count = Column(Integer, default=0, nullable=False)
# Relationships
vendor = relationship("Vendor", foreign_keys=[vendor_id])
participants = relationship(
"ConversationParticipant",
back_populates="conversation",
cascade="all, delete-orphan",
)
messages = relationship(
"Message",
back_populates="conversation",
cascade="all, delete-orphan",
order_by="Message.created_at",
)
# Indexes for common queries
__table_args__ = (
Index("ix_conversations_type_vendor", "conversation_type", "vendor_id"),
)
def __repr__(self) -> str:
return (
f"<Conversation(id={self.id}, type='{self.conversation_type.value}', "
f"subject='{self.subject[:30]}...')>"
)
class ConversationParticipant(Base, TimestampMixin):
"""
Links participants (users or customers) to conversations.
Polymorphic relationship:
- participant_type="admin" or "vendor": references users.id
- participant_type="customer": references customers.id
"""
__tablename__ = "conversation_participants"
id = Column(Integer, primary_key=True, index=True)
conversation_id = Column(
Integer,
ForeignKey("conversations.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
# Polymorphic participant reference
participant_type = Column(Enum(ParticipantType), nullable=False)
participant_id = Column(Integer, nullable=False, index=True)
# For vendor participants, track which vendor they represent
vendor_id = Column(
Integer,
ForeignKey("vendors.id"),
nullable=True,
)
# Unread tracking per participant
unread_count = Column(Integer, default=0, nullable=False)
last_read_at = Column(DateTime, nullable=True)
# Notification preferences for this conversation
email_notifications = Column(Boolean, default=True, nullable=False)
muted = Column(Boolean, default=False, nullable=False)
# Relationships
conversation = relationship("Conversation", back_populates="participants")
vendor = relationship("Vendor", foreign_keys=[vendor_id])
__table_args__ = (
UniqueConstraint(
"conversation_id",
"participant_type",
"participant_id",
name="uq_conversation_participant",
),
Index(
"ix_participant_lookup",
"participant_type",
"participant_id",
),
)
def __repr__(self) -> str:
return (
f"<ConversationParticipant(conversation_id={self.conversation_id}, "
f"type='{self.participant_type.value}', id={self.participant_id})>"
)
class Message(Base, TimestampMixin):
"""
Individual message within a conversation thread.
Sender polymorphism follows same pattern as participant.
"""
__tablename__ = "messages"
id = Column(Integer, primary_key=True, index=True)
conversation_id = Column(
Integer,
ForeignKey("conversations.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
# Polymorphic sender reference
sender_type = Column(Enum(ParticipantType), nullable=False)
sender_id = Column(Integer, nullable=False, index=True)
# Message content
content = Column(Text, nullable=False)
# System messages (e.g., "conversation closed")
is_system_message = Column(Boolean, default=False, nullable=False)
# Soft delete for moderation
is_deleted = Column(Boolean, default=False, nullable=False)
deleted_at = Column(DateTime, nullable=True)
deleted_by_type = Column(Enum(ParticipantType), nullable=True)
deleted_by_id = Column(Integer, nullable=True)
# Relationships
conversation = relationship("Conversation", back_populates="messages")
attachments = relationship(
"MessageAttachment",
back_populates="message",
cascade="all, delete-orphan",
)
__table_args__ = (
Index("ix_messages_conversation_created", "conversation_id", "created_at"),
)
def __repr__(self) -> str:
return (
f"<Message(id={self.id}, conversation_id={self.conversation_id}, "
f"sender={self.sender_type.value}:{self.sender_id})>"
)
class MessageAttachment(Base, TimestampMixin):
"""
File attachments for messages.
Files are stored in platform storage (local/S3) with references here.
"""
__tablename__ = "message_attachments"
id = Column(Integer, primary_key=True, index=True)
message_id = Column(
Integer,
ForeignKey("messages.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
# File metadata
filename = Column(String(255), nullable=False)
original_filename = Column(String(255), nullable=False)
file_path = Column(String(1000), nullable=False) # Storage path
file_size = Column(Integer, nullable=False) # Size in bytes
mime_type = Column(String(100), nullable=False)
# For image attachments
is_image = Column(Boolean, default=False, nullable=False)
image_width = Column(Integer, nullable=True)
image_height = Column(Integer, nullable=True)
thumbnail_path = Column(String(1000), nullable=True)
# Relationships
message = relationship("Message", back_populates="attachments")
def __repr__(self) -> str:
return f"<MessageAttachment(id={self.id}, filename='{self.original_filename}')>"

View File

@@ -8,6 +8,7 @@ from . import (
inventory,
marketplace_import_job,
marketplace_product,
message,
stats,
vendor,
)
@@ -19,6 +20,7 @@ __all__ = [
"base",
"auth",
"marketplace_product",
"message",
"inventory",
"vendor",
"marketplace_import_job",

308
models/schema/message.py Normal file
View File

@@ -0,0 +1,308 @@
# models/schema/message.py
"""
Pydantic schemas for the messaging system.
Supports three communication channels:
- Admin <-> Vendor
- Vendor <-> Customer
- Admin <-> Customer
"""
from datetime import datetime
from pydantic import BaseModel, ConfigDict, Field
from models.database.message import ConversationType, ParticipantType
# ============================================================================
# Attachment Schemas
# ============================================================================
class AttachmentResponse(BaseModel):
"""Schema for message attachment in responses."""
model_config = ConfigDict(from_attributes=True)
id: int
filename: str
original_filename: str
file_size: int
mime_type: str
is_image: bool
image_width: int | None = None
image_height: int | None = None
download_url: str | None = None
thumbnail_url: str | None = None
@property
def file_size_display(self) -> str:
"""Human-readable file size."""
if self.file_size < 1024:
return f"{self.file_size} B"
elif self.file_size < 1024 * 1024:
return f"{self.file_size / 1024:.1f} KB"
else:
return f"{self.file_size / 1024 / 1024:.1f} MB"
# ============================================================================
# Message Schemas
# ============================================================================
class MessageCreate(BaseModel):
"""Schema for sending a new message."""
content: str = Field(..., min_length=1, max_length=10000)
class MessageResponse(BaseModel):
"""Schema for a single message in responses."""
model_config = ConfigDict(from_attributes=True)
id: int
conversation_id: int
sender_type: ParticipantType
sender_id: int
content: str
is_system_message: bool
is_deleted: bool
created_at: datetime
# Enriched sender info (populated by API)
sender_name: str | None = None
sender_email: str | None = None
# Attachments
attachments: list[AttachmentResponse] = []
# ============================================================================
# Participant Schemas
# ============================================================================
class ParticipantInfo(BaseModel):
"""Schema for participant information."""
id: int
type: ParticipantType
name: str
email: str | None = None
avatar_url: str | None = None
class ParticipantResponse(BaseModel):
"""Schema for conversation participant in responses."""
model_config = ConfigDict(from_attributes=True)
id: int
participant_type: ParticipantType
participant_id: int
unread_count: int
last_read_at: datetime | None
email_notifications: bool
muted: bool
# Enriched info (populated by API)
participant_info: ParticipantInfo | None = None
# ============================================================================
# Conversation Schemas
# ============================================================================
class ConversationCreate(BaseModel):
"""Schema for creating a new conversation."""
conversation_type: ConversationType
subject: str = Field(..., min_length=1, max_length=500)
recipient_type: ParticipantType
recipient_id: int
vendor_id: int | None = None
initial_message: str | None = Field(None, min_length=1, max_length=10000)
class ConversationSummary(BaseModel):
"""Schema for conversation in list views."""
model_config = ConfigDict(from_attributes=True)
id: int
conversation_type: ConversationType
subject: str
vendor_id: int | None = None
is_closed: bool
closed_at: datetime | None
last_message_at: datetime | None
message_count: int
created_at: datetime
# Unread count for current user (from participant)
unread_count: int = 0
# Other participant info (enriched by API)
other_participant: ParticipantInfo | None = None
# Last message preview
last_message_preview: str | None = None
class ConversationDetailResponse(BaseModel):
"""Schema for full conversation detail with messages."""
model_config = ConfigDict(from_attributes=True)
id: int
conversation_type: ConversationType
subject: str
vendor_id: int | None = None
is_closed: bool
closed_at: datetime | None
closed_by_type: ParticipantType | None = None
closed_by_id: int | None = None
last_message_at: datetime | None
message_count: int
created_at: datetime
updated_at: datetime
# Participants with enriched info
participants: list[ParticipantResponse] = []
# Messages ordered by created_at
messages: list[MessageResponse] = []
# Current user's unread count
unread_count: int = 0
# Vendor info if applicable
vendor_name: str | None = None
class ConversationListResponse(BaseModel):
"""Schema for paginated conversation list."""
conversations: list[ConversationSummary]
total: int
total_unread: int
skip: int
limit: int
# ============================================================================
# Unread Count Schemas
# ============================================================================
class UnreadCountResponse(BaseModel):
"""Schema for unread message count (for header badge)."""
total_unread: int
# ============================================================================
# Notification Preferences Schemas
# ============================================================================
class NotificationPreferencesUpdate(BaseModel):
"""Schema for updating notification preferences."""
email_notifications: bool | None = None
muted: bool | None = None
# ============================================================================
# Conversation Action Schemas
# ============================================================================
class CloseConversationResponse(BaseModel):
"""Response after closing a conversation."""
success: bool
message: str
conversation_id: int
class ReopenConversationResponse(BaseModel):
"""Response after reopening a conversation."""
success: bool
message: str
conversation_id: int
class MarkReadResponse(BaseModel):
"""Response after marking conversation as read."""
success: bool
conversation_id: int
unread_count: int
# ============================================================================
# Recipient Selection Schemas (for compose modal)
# ============================================================================
class RecipientOption(BaseModel):
"""Schema for a selectable recipient in compose modal."""
id: int
type: ParticipantType
name: str
email: str | None = None
vendor_id: int | None = None # For vendor users
vendor_name: str | None = None
class RecipientListResponse(BaseModel):
"""Schema for list of available recipients."""
recipients: list[RecipientOption]
total: int
# ============================================================================
# Admin-specific Schemas
# ============================================================================
class AdminConversationSummary(ConversationSummary):
"""Extended conversation summary with vendor info for admin views."""
vendor_name: str | None = None
vendor_code: str | None = None
class AdminConversationListResponse(BaseModel):
"""Schema for admin conversation list with vendor info."""
conversations: list[AdminConversationSummary]
total: int
total_unread: int
skip: int
limit: int
class AdminMessageStats(BaseModel):
"""Messaging statistics for admin dashboard."""
total_conversations: int = 0
open_conversations: int = 0
closed_conversations: int = 0
total_messages: int = 0
# By type
admin_vendor_conversations: int = 0
vendor_customer_conversations: int = 0
admin_customer_conversations: int = 0
# Unread
unread_admin: int = 0