- Refactor messaging API endpoints for admin, shop, and vendor - Add message-specific exceptions (ConversationNotFoundException, etc.) - Enhance messaging service with additional helper methods - Add comprehensive test fixtures for messaging - Add integration tests for admin and vendor messaging APIs - Add unit tests for messaging and attachment services 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
685 lines
22 KiB
Python
685 lines
22 KiB
Python
# app/services/messaging_service.py
|
|
"""
|
|
Messaging service for conversation and message management.
|
|
|
|
Provides functionality for:
|
|
- Creating conversations between different participant types
|
|
- Sending messages with attachments
|
|
- Managing read status and unread counts
|
|
- Conversation listing with filters
|
|
- Multi-tenant data isolation
|
|
"""
|
|
|
|
import logging
|
|
from datetime import UTC, datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import and_, func, or_
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from models.database.customer import Customer
|
|
from models.database.message import (
|
|
Conversation,
|
|
ConversationParticipant,
|
|
ConversationType,
|
|
Message,
|
|
MessageAttachment,
|
|
ParticipantType,
|
|
)
|
|
from models.database.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MessagingService:
|
|
"""Service for managing conversations and messages."""
|
|
|
|
# =========================================================================
|
|
# CONVERSATION MANAGEMENT
|
|
# =========================================================================
|
|
|
|
def create_conversation(
|
|
self,
|
|
db: Session,
|
|
conversation_type: ConversationType,
|
|
subject: str,
|
|
initiator_type: ParticipantType,
|
|
initiator_id: int,
|
|
recipient_type: ParticipantType,
|
|
recipient_id: int,
|
|
vendor_id: int | None = None,
|
|
initial_message: str | None = None,
|
|
) -> Conversation:
|
|
"""
|
|
Create a new conversation between two participants.
|
|
|
|
Args:
|
|
db: Database session
|
|
conversation_type: Type of conversation channel
|
|
subject: Conversation subject line
|
|
initiator_type: Type of initiating participant
|
|
initiator_id: ID of initiating participant
|
|
recipient_type: Type of receiving participant
|
|
recipient_id: ID of receiving participant
|
|
vendor_id: Required for vendor_customer/admin_customer types
|
|
initial_message: Optional first message content
|
|
|
|
Returns:
|
|
Created Conversation object
|
|
"""
|
|
# Validate vendor_id requirement
|
|
if conversation_type in [
|
|
ConversationType.VENDOR_CUSTOMER,
|
|
ConversationType.ADMIN_CUSTOMER,
|
|
]:
|
|
if not vendor_id:
|
|
raise ValueError(
|
|
f"vendor_id required for {conversation_type.value} conversations"
|
|
)
|
|
|
|
# Create conversation
|
|
conversation = Conversation(
|
|
conversation_type=conversation_type,
|
|
subject=subject,
|
|
vendor_id=vendor_id,
|
|
)
|
|
db.add(conversation)
|
|
db.flush()
|
|
|
|
# Add participants
|
|
initiator_vendor_id = (
|
|
vendor_id if initiator_type == ParticipantType.VENDOR else None
|
|
)
|
|
recipient_vendor_id = (
|
|
vendor_id if recipient_type == ParticipantType.VENDOR else None
|
|
)
|
|
|
|
initiator = ConversationParticipant(
|
|
conversation_id=conversation.id,
|
|
participant_type=initiator_type,
|
|
participant_id=initiator_id,
|
|
vendor_id=initiator_vendor_id,
|
|
unread_count=0, # Initiator has read their own message
|
|
)
|
|
recipient = ConversationParticipant(
|
|
conversation_id=conversation.id,
|
|
participant_type=recipient_type,
|
|
participant_id=recipient_id,
|
|
vendor_id=recipient_vendor_id,
|
|
unread_count=1 if initial_message else 0,
|
|
)
|
|
|
|
db.add(initiator)
|
|
db.add(recipient)
|
|
db.flush()
|
|
|
|
# Add initial message if provided
|
|
if initial_message:
|
|
self.send_message(
|
|
db=db,
|
|
conversation_id=conversation.id,
|
|
sender_type=initiator_type,
|
|
sender_id=initiator_id,
|
|
content=initial_message,
|
|
_skip_unread_update=True, # Already set above
|
|
)
|
|
|
|
logger.info(
|
|
f"Created {conversation_type.value} conversation {conversation.id}: "
|
|
f"{initiator_type.value}:{initiator_id} -> {recipient_type.value}:{recipient_id}"
|
|
)
|
|
|
|
return conversation
|
|
|
|
def get_conversation(
|
|
self,
|
|
db: Session,
|
|
conversation_id: int,
|
|
participant_type: ParticipantType,
|
|
participant_id: int,
|
|
) -> Conversation | None:
|
|
"""
|
|
Get conversation if participant has access.
|
|
|
|
Validates that the requester is a participant.
|
|
"""
|
|
conversation = (
|
|
db.query(Conversation)
|
|
.options(
|
|
joinedload(Conversation.participants),
|
|
joinedload(Conversation.messages).joinedload(Message.attachments),
|
|
)
|
|
.filter(Conversation.id == conversation_id)
|
|
.first()
|
|
)
|
|
|
|
if not conversation:
|
|
return None
|
|
|
|
# Verify participant access
|
|
has_access = any(
|
|
p.participant_type == participant_type
|
|
and p.participant_id == participant_id
|
|
for p in conversation.participants
|
|
)
|
|
|
|
if not has_access:
|
|
logger.warning(
|
|
f"Access denied to conversation {conversation_id} for "
|
|
f"{participant_type.value}:{participant_id}"
|
|
)
|
|
return None
|
|
|
|
return conversation
|
|
|
|
def list_conversations(
|
|
self,
|
|
db: Session,
|
|
participant_type: ParticipantType,
|
|
participant_id: int,
|
|
vendor_id: int | None = None,
|
|
conversation_type: ConversationType | None = None,
|
|
is_closed: bool | None = None,
|
|
skip: int = 0,
|
|
limit: int = 20,
|
|
) -> tuple[list[Conversation], int, int]:
|
|
"""
|
|
List conversations for a participant with filters.
|
|
|
|
Returns:
|
|
Tuple of (conversations, total_count, total_unread)
|
|
"""
|
|
# Base query: conversations where user is a participant
|
|
query = (
|
|
db.query(Conversation)
|
|
.join(ConversationParticipant)
|
|
.filter(
|
|
and_(
|
|
ConversationParticipant.participant_type == participant_type,
|
|
ConversationParticipant.participant_id == participant_id,
|
|
)
|
|
)
|
|
)
|
|
|
|
# Multi-tenant filter for vendor users
|
|
if participant_type == ParticipantType.VENDOR and vendor_id:
|
|
query = query.filter(ConversationParticipant.vendor_id == vendor_id)
|
|
|
|
# Customer vendor isolation
|
|
if participant_type == ParticipantType.CUSTOMER and vendor_id:
|
|
query = query.filter(Conversation.vendor_id == vendor_id)
|
|
|
|
# Type filter
|
|
if conversation_type:
|
|
query = query.filter(Conversation.conversation_type == conversation_type)
|
|
|
|
# Status filter
|
|
if is_closed is not None:
|
|
query = query.filter(Conversation.is_closed == is_closed)
|
|
|
|
# Get total count
|
|
total = query.count()
|
|
|
|
# Get total unread across all conversations
|
|
unread_query = db.query(
|
|
func.sum(ConversationParticipant.unread_count)
|
|
).filter(
|
|
and_(
|
|
ConversationParticipant.participant_type == participant_type,
|
|
ConversationParticipant.participant_id == participant_id,
|
|
)
|
|
)
|
|
|
|
if participant_type == ParticipantType.VENDOR and vendor_id:
|
|
unread_query = unread_query.filter(
|
|
ConversationParticipant.vendor_id == vendor_id
|
|
)
|
|
|
|
total_unread = unread_query.scalar() or 0
|
|
|
|
# Get paginated results, ordered by last activity
|
|
conversations = (
|
|
query.options(joinedload(Conversation.participants))
|
|
.order_by(Conversation.last_message_at.desc().nullslast())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
return conversations, total, total_unread
|
|
|
|
def close_conversation(
|
|
self,
|
|
db: Session,
|
|
conversation_id: int,
|
|
closer_type: ParticipantType,
|
|
closer_id: int,
|
|
) -> Conversation | None:
|
|
"""Close a conversation thread."""
|
|
conversation = self.get_conversation(
|
|
db, conversation_id, closer_type, closer_id
|
|
)
|
|
|
|
if not conversation:
|
|
return None
|
|
|
|
conversation.is_closed = True
|
|
conversation.closed_at = datetime.now(UTC)
|
|
conversation.closed_by_type = closer_type
|
|
conversation.closed_by_id = closer_id
|
|
|
|
# Add system message
|
|
self.send_message(
|
|
db=db,
|
|
conversation_id=conversation_id,
|
|
sender_type=closer_type,
|
|
sender_id=closer_id,
|
|
content="Conversation closed",
|
|
is_system_message=True,
|
|
)
|
|
|
|
db.flush()
|
|
return conversation
|
|
|
|
def reopen_conversation(
|
|
self,
|
|
db: Session,
|
|
conversation_id: int,
|
|
opener_type: ParticipantType,
|
|
opener_id: int,
|
|
) -> Conversation | None:
|
|
"""Reopen a closed conversation."""
|
|
conversation = self.get_conversation(
|
|
db, conversation_id, opener_type, opener_id
|
|
)
|
|
|
|
if not conversation:
|
|
return None
|
|
|
|
conversation.is_closed = False
|
|
conversation.closed_at = None
|
|
conversation.closed_by_type = None
|
|
conversation.closed_by_id = None
|
|
|
|
# Add system message
|
|
self.send_message(
|
|
db=db,
|
|
conversation_id=conversation_id,
|
|
sender_type=opener_type,
|
|
sender_id=opener_id,
|
|
content="Conversation reopened",
|
|
is_system_message=True,
|
|
)
|
|
|
|
db.flush()
|
|
return conversation
|
|
|
|
# =========================================================================
|
|
# MESSAGE MANAGEMENT
|
|
# =========================================================================
|
|
|
|
def send_message(
|
|
self,
|
|
db: Session,
|
|
conversation_id: int,
|
|
sender_type: ParticipantType,
|
|
sender_id: int,
|
|
content: str,
|
|
attachments: list[dict[str, Any]] | None = None,
|
|
is_system_message: bool = False,
|
|
_skip_unread_update: bool = False,
|
|
) -> Message:
|
|
"""
|
|
Send a message in a conversation.
|
|
|
|
Args:
|
|
db: Database session
|
|
conversation_id: Target conversation ID
|
|
sender_type: Type of sender
|
|
sender_id: ID of sender
|
|
content: Message text content
|
|
attachments: List of attachment dicts with file metadata
|
|
is_system_message: Whether this is a system-generated message
|
|
_skip_unread_update: Internal flag to skip unread increment
|
|
|
|
Returns:
|
|
Created Message object
|
|
"""
|
|
# Create message
|
|
message = Message(
|
|
conversation_id=conversation_id,
|
|
sender_type=sender_type,
|
|
sender_id=sender_id,
|
|
content=content,
|
|
is_system_message=is_system_message,
|
|
)
|
|
db.add(message)
|
|
db.flush()
|
|
|
|
# Add attachments if any
|
|
if attachments:
|
|
for att_data in attachments:
|
|
attachment = MessageAttachment(
|
|
message_id=message.id,
|
|
filename=att_data["filename"],
|
|
original_filename=att_data["original_filename"],
|
|
file_path=att_data["file_path"],
|
|
file_size=att_data["file_size"],
|
|
mime_type=att_data["mime_type"],
|
|
is_image=att_data.get("is_image", False),
|
|
image_width=att_data.get("image_width"),
|
|
image_height=att_data.get("image_height"),
|
|
thumbnail_path=att_data.get("thumbnail_path"),
|
|
)
|
|
db.add(attachment)
|
|
|
|
# Update conversation metadata
|
|
conversation = (
|
|
db.query(Conversation).filter(Conversation.id == conversation_id).first()
|
|
)
|
|
|
|
if conversation:
|
|
conversation.last_message_at = datetime.now(UTC)
|
|
conversation.message_count += 1
|
|
|
|
# Update unread counts for other participants
|
|
if not _skip_unread_update:
|
|
db.query(ConversationParticipant).filter(
|
|
and_(
|
|
ConversationParticipant.conversation_id == conversation_id,
|
|
or_(
|
|
ConversationParticipant.participant_type != sender_type,
|
|
ConversationParticipant.participant_id != sender_id,
|
|
),
|
|
)
|
|
).update(
|
|
{
|
|
ConversationParticipant.unread_count: ConversationParticipant.unread_count
|
|
+ 1
|
|
}
|
|
)
|
|
|
|
db.flush()
|
|
|
|
logger.info(
|
|
f"Message {message.id} sent in conversation {conversation_id} "
|
|
f"by {sender_type.value}:{sender_id}"
|
|
)
|
|
|
|
return message
|
|
|
|
def delete_message(
|
|
self,
|
|
db: Session,
|
|
message_id: int,
|
|
deleter_type: ParticipantType,
|
|
deleter_id: int,
|
|
) -> Message | None:
|
|
"""Soft delete a message (for moderation)."""
|
|
message = db.query(Message).filter(Message.id == message_id).first()
|
|
|
|
if not message:
|
|
return None
|
|
|
|
# Verify deleter has access to conversation
|
|
conversation = self.get_conversation(
|
|
db, message.conversation_id, deleter_type, deleter_id
|
|
)
|
|
if not conversation:
|
|
return None
|
|
|
|
message.is_deleted = True
|
|
message.deleted_at = datetime.now(UTC)
|
|
message.deleted_by_type = deleter_type
|
|
message.deleted_by_id = deleter_id
|
|
|
|
db.flush()
|
|
return message
|
|
|
|
def mark_conversation_read(
|
|
self,
|
|
db: Session,
|
|
conversation_id: int,
|
|
reader_type: ParticipantType,
|
|
reader_id: int,
|
|
) -> bool:
|
|
"""Mark all messages in conversation as read for participant."""
|
|
result = (
|
|
db.query(ConversationParticipant)
|
|
.filter(
|
|
and_(
|
|
ConversationParticipant.conversation_id == conversation_id,
|
|
ConversationParticipant.participant_type == reader_type,
|
|
ConversationParticipant.participant_id == reader_id,
|
|
)
|
|
)
|
|
.update(
|
|
{
|
|
ConversationParticipant.unread_count: 0,
|
|
ConversationParticipant.last_read_at: datetime.now(UTC),
|
|
}
|
|
)
|
|
)
|
|
db.flush()
|
|
return result > 0
|
|
|
|
def get_unread_count(
|
|
self,
|
|
db: Session,
|
|
participant_type: ParticipantType,
|
|
participant_id: int,
|
|
vendor_id: int | None = None,
|
|
) -> int:
|
|
"""Get total unread message count for a participant."""
|
|
query = db.query(func.sum(ConversationParticipant.unread_count)).filter(
|
|
and_(
|
|
ConversationParticipant.participant_type == participant_type,
|
|
ConversationParticipant.participant_id == participant_id,
|
|
)
|
|
)
|
|
|
|
if vendor_id:
|
|
query = query.filter(ConversationParticipant.vendor_id == vendor_id)
|
|
|
|
return query.scalar() or 0
|
|
|
|
# =========================================================================
|
|
# PARTICIPANT HELPERS
|
|
# =========================================================================
|
|
|
|
def get_participant_info(
|
|
self,
|
|
db: Session,
|
|
participant_type: ParticipantType,
|
|
participant_id: int,
|
|
) -> dict[str, Any] | None:
|
|
"""Get display info for a participant (name, email, avatar)."""
|
|
if participant_type in [ParticipantType.ADMIN, ParticipantType.VENDOR]:
|
|
user = db.query(User).filter(User.id == participant_id).first()
|
|
if user:
|
|
return {
|
|
"id": user.id,
|
|
"type": participant_type.value,
|
|
"name": f"{user.first_name or ''} {user.last_name or ''}".strip()
|
|
or user.username,
|
|
"email": user.email,
|
|
"avatar_url": None, # Could add avatar support later
|
|
}
|
|
elif participant_type == ParticipantType.CUSTOMER:
|
|
customer = db.query(Customer).filter(Customer.id == participant_id).first()
|
|
if customer:
|
|
return {
|
|
"id": customer.id,
|
|
"type": participant_type.value,
|
|
"name": f"{customer.first_name or ''} {customer.last_name or ''}".strip()
|
|
or customer.email,
|
|
"email": customer.email,
|
|
"avatar_url": None,
|
|
}
|
|
return None
|
|
|
|
def get_other_participant(
|
|
self,
|
|
conversation: Conversation,
|
|
my_type: ParticipantType,
|
|
my_id: int,
|
|
) -> ConversationParticipant | None:
|
|
"""Get the other participant in a conversation."""
|
|
for p in conversation.participants:
|
|
if p.participant_type != my_type or p.participant_id != my_id:
|
|
return p
|
|
return None
|
|
|
|
# =========================================================================
|
|
# NOTIFICATION PREFERENCES
|
|
# =========================================================================
|
|
|
|
def update_notification_preferences(
|
|
self,
|
|
db: Session,
|
|
conversation_id: int,
|
|
participant_type: ParticipantType,
|
|
participant_id: int,
|
|
email_notifications: bool | None = None,
|
|
muted: bool | None = None,
|
|
) -> bool:
|
|
"""Update notification preferences for a participant in a conversation."""
|
|
updates = {}
|
|
if email_notifications is not None:
|
|
updates[ConversationParticipant.email_notifications] = email_notifications
|
|
if muted is not None:
|
|
updates[ConversationParticipant.muted] = muted
|
|
|
|
if not updates:
|
|
return False
|
|
|
|
result = (
|
|
db.query(ConversationParticipant)
|
|
.filter(
|
|
and_(
|
|
ConversationParticipant.conversation_id == conversation_id,
|
|
ConversationParticipant.participant_type == participant_type,
|
|
ConversationParticipant.participant_id == participant_id,
|
|
)
|
|
)
|
|
.update(updates)
|
|
)
|
|
db.flush()
|
|
return result > 0
|
|
|
|
# =========================================================================
|
|
# RECIPIENT QUERIES
|
|
# =========================================================================
|
|
|
|
def get_vendor_recipients(
|
|
self,
|
|
db: Session,
|
|
vendor_id: int | None = None,
|
|
search: str | None = None,
|
|
skip: int = 0,
|
|
limit: int = 50,
|
|
) -> tuple[list[dict], int]:
|
|
"""
|
|
Get list of vendor users as potential recipients.
|
|
|
|
Args:
|
|
db: Database session
|
|
vendor_id: Optional vendor ID filter
|
|
search: Search term for name/email
|
|
skip: Pagination offset
|
|
limit: Max results
|
|
|
|
Returns:
|
|
Tuple of (recipients list, total count)
|
|
"""
|
|
from models.database.vendor import VendorUser
|
|
|
|
query = (
|
|
db.query(User, VendorUser)
|
|
.join(VendorUser, User.id == VendorUser.user_id)
|
|
.filter(User.is_active == True) # noqa: E712
|
|
)
|
|
|
|
if vendor_id:
|
|
query = query.filter(VendorUser.vendor_id == vendor_id)
|
|
|
|
if search:
|
|
search_pattern = f"%{search}%"
|
|
query = query.filter(
|
|
(User.username.ilike(search_pattern))
|
|
| (User.email.ilike(search_pattern))
|
|
| (User.first_name.ilike(search_pattern))
|
|
| (User.last_name.ilike(search_pattern))
|
|
)
|
|
|
|
total = query.count()
|
|
results = query.offset(skip).limit(limit).all()
|
|
|
|
recipients = []
|
|
for user, vendor_user in results:
|
|
name = f"{user.first_name or ''} {user.last_name or ''}".strip() or user.username
|
|
recipients.append({
|
|
"id": user.id,
|
|
"type": ParticipantType.VENDOR,
|
|
"name": name,
|
|
"email": user.email,
|
|
"vendor_id": vendor_user.vendor_id,
|
|
"vendor_name": vendor_user.vendor.name if vendor_user.vendor else None,
|
|
})
|
|
|
|
return recipients, total
|
|
|
|
def get_customer_recipients(
|
|
self,
|
|
db: Session,
|
|
vendor_id: int | None = None,
|
|
search: str | None = None,
|
|
skip: int = 0,
|
|
limit: int = 50,
|
|
) -> tuple[list[dict], int]:
|
|
"""
|
|
Get list of customers as potential recipients.
|
|
|
|
Args:
|
|
db: Database session
|
|
vendor_id: Optional vendor ID filter (required for vendor users)
|
|
search: Search term for name/email
|
|
skip: Pagination offset
|
|
limit: Max results
|
|
|
|
Returns:
|
|
Tuple of (recipients list, total count)
|
|
"""
|
|
query = db.query(Customer).filter(Customer.is_active == True) # noqa: E712
|
|
|
|
if vendor_id:
|
|
query = query.filter(Customer.vendor_id == vendor_id)
|
|
|
|
if search:
|
|
search_pattern = f"%{search}%"
|
|
query = query.filter(
|
|
(Customer.email.ilike(search_pattern))
|
|
| (Customer.first_name.ilike(search_pattern))
|
|
| (Customer.last_name.ilike(search_pattern))
|
|
)
|
|
|
|
total = query.count()
|
|
results = query.offset(skip).limit(limit).all()
|
|
|
|
recipients = []
|
|
for customer in results:
|
|
name = f"{customer.first_name or ''} {customer.last_name or ''}".strip()
|
|
recipients.append({
|
|
"id": customer.id,
|
|
"type": ParticipantType.CUSTOMER,
|
|
"name": name or customer.email,
|
|
"email": customer.email,
|
|
"vendor_id": customer.vendor_id,
|
|
})
|
|
|
|
return recipients, total
|
|
|
|
|
|
# Singleton instance
|
|
messaging_service = MessagingService()
|