# app/api/v1/admin/messages.py """ Admin messaging endpoints. Provides endpoints for: - Viewing conversations (admin_vendor and admin_customer channels) - Sending and receiving messages - Managing conversation status - File attachments """ import logging from typing import Any from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile from pydantic import BaseModel from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db from app.services.message_attachment_service import message_attachment_service from app.services.messaging_service import messaging_service from models.database.message import ConversationType, ParticipantType from models.database.user import User from models.schema.message import ( AdminConversationListResponse, AdminConversationSummary, CloseConversationResponse, ConversationCreate, ConversationDetailResponse, MarkReadResponse, MessageCreate, MessageResponse, NotificationPreferencesUpdate, ParticipantInfo, ParticipantResponse, RecipientListResponse, RecipientOption, ReopenConversationResponse, UnreadCountResponse, ) from models.schema.message import AttachmentResponse router = APIRouter(prefix="/messages") logger = logging.getLogger(__name__) # ============================================================================ # HELPER FUNCTIONS # ============================================================================ def _enrich_message( db: Session, message: Any, include_attachments: bool = True ) -> MessageResponse: """Enrich message with sender info and attachments.""" sender_info = messaging_service.get_participant_info( db, message.sender_type, message.sender_id ) attachments = [] if include_attachments and message.attachments: for att in message.attachments: attachments.append( AttachmentResponse( id=att.id, filename=att.filename, original_filename=att.original_filename, file_size=att.file_size, mime_type=att.mime_type, is_image=att.is_image, image_width=att.image_width, image_height=att.image_height, download_url=message_attachment_service.get_download_url( att.file_path ), thumbnail_url=( message_attachment_service.get_download_url(att.thumbnail_path) if att.thumbnail_path else None ), ) ) return MessageResponse( id=message.id, conversation_id=message.conversation_id, sender_type=message.sender_type, sender_id=message.sender_id, content=message.content, is_system_message=message.is_system_message, is_deleted=message.is_deleted, created_at=message.created_at, sender_name=sender_info["name"] if sender_info else None, sender_email=sender_info["email"] if sender_info else None, attachments=attachments, ) def _enrich_conversation_summary( db: Session, conversation: Any, current_user_id: int ) -> AdminConversationSummary: """Enrich conversation with other participant info and unread count.""" # Get current user's participant record my_participant = next( ( p for p in conversation.participants if p.participant_type == ParticipantType.ADMIN and p.participant_id == current_user_id ), None, ) unread_count = my_participant.unread_count if my_participant else 0 # Get other participant info other = messaging_service.get_other_participant( conversation, ParticipantType.ADMIN, current_user_id ) other_info = None if other: info = messaging_service.get_participant_info( db, other.participant_type, other.participant_id ) if info: other_info = ParticipantInfo( id=info["id"], type=info["type"], name=info["name"], email=info.get("email"), ) # Get last message preview last_message_preview = None if conversation.messages: last_msg = conversation.messages[-1] if conversation.messages else None if last_msg: preview = last_msg.content[:100] if len(last_msg.content) > 100: preview += "..." last_message_preview = preview # Get vendor info if applicable vendor_name = None vendor_code = None if conversation.vendor: vendor_name = conversation.vendor.name vendor_code = conversation.vendor.vendor_code return AdminConversationSummary( id=conversation.id, conversation_type=conversation.conversation_type, subject=conversation.subject, vendor_id=conversation.vendor_id, is_closed=conversation.is_closed, closed_at=conversation.closed_at, last_message_at=conversation.last_message_at, message_count=conversation.message_count, created_at=conversation.created_at, unread_count=unread_count, other_participant=other_info, last_message_preview=last_message_preview, vendor_name=vendor_name, vendor_code=vendor_code, ) # ============================================================================ # CONVERSATION LIST # ============================================================================ @router.get("", response_model=AdminConversationListResponse) def list_conversations( conversation_type: ConversationType | None = Query(None, description="Filter by type"), is_closed: bool | None = Query(None, description="Filter by status"), skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> AdminConversationListResponse: """List conversations for admin (admin_vendor and admin_customer channels).""" conversations, total, total_unread = messaging_service.list_conversations( db=db, participant_type=ParticipantType.ADMIN, participant_id=current_admin.id, conversation_type=conversation_type, is_closed=is_closed, skip=skip, limit=limit, ) return AdminConversationListResponse( conversations=[ _enrich_conversation_summary(db, c, current_admin.id) for c in conversations ], total=total, total_unread=total_unread, skip=skip, limit=limit, ) @router.get("/unread-count", response_model=UnreadCountResponse) def get_unread_count( db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> UnreadCountResponse: """Get total unread message count for header badge.""" count = messaging_service.get_unread_count( db=db, participant_type=ParticipantType.ADMIN, participant_id=current_admin.id, ) return UnreadCountResponse(total_unread=count) # ============================================================================ # RECIPIENTS # ============================================================================ @router.get("/recipients", response_model=RecipientListResponse) def get_recipients( recipient_type: ParticipantType = Query(..., description="Type of recipients to list"), search: str | None = Query(None, description="Search by name/email"), vendor_id: int | None = Query(None, description="Filter by vendor"), skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=100), db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> RecipientListResponse: """Get list of available recipients for compose modal.""" from models.database.customer import Customer from models.database.vendor import VendorUser recipients = [] if recipient_type == ParticipantType.VENDOR: # List vendor users (for admin_vendor conversations) query = ( db.query(User, VendorUser) .join(VendorUser, User.id == VendorUser.user_id) .filter(User.is_active == True) # noqa: E712 ) if vendor_id: query = query.filter(VendorUser.vendor_id == vendor_id) if search: search_pattern = f"%{search}%" query = query.filter( (User.username.ilike(search_pattern)) | (User.email.ilike(search_pattern)) | (User.first_name.ilike(search_pattern)) | (User.last_name.ilike(search_pattern)) ) total = query.count() results = query.offset(skip).limit(limit).all() for user, vendor_user in results: name = f"{user.first_name or ''} {user.last_name or ''}".strip() or user.username recipients.append( RecipientOption( id=user.id, type=ParticipantType.VENDOR, name=name, email=user.email, vendor_id=vendor_user.vendor_id, vendor_name=vendor_user.vendor.name if vendor_user.vendor else None, ) ) elif recipient_type == ParticipantType.CUSTOMER: # List customers (for admin_customer conversations) query = db.query(Customer).filter(Customer.is_active == True) # noqa: E712 if vendor_id: query = query.filter(Customer.vendor_id == vendor_id) if search: search_pattern = f"%{search}%" query = query.filter( (Customer.email.ilike(search_pattern)) | (Customer.first_name.ilike(search_pattern)) | (Customer.last_name.ilike(search_pattern)) ) total = query.count() results = query.offset(skip).limit(limit).all() for customer in results: name = f"{customer.first_name or ''} {customer.last_name or ''}".strip() recipients.append( RecipientOption( id=customer.id, type=ParticipantType.CUSTOMER, name=name or customer.email, email=customer.email, vendor_id=customer.vendor_id, ) ) else: total = 0 return RecipientListResponse(recipients=recipients, total=total) # ============================================================================ # CREATE CONVERSATION # ============================================================================ @router.post("", response_model=ConversationDetailResponse) def create_conversation( data: ConversationCreate, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> ConversationDetailResponse: """Create a new conversation.""" # Validate conversation type for admin if data.conversation_type not in [ ConversationType.ADMIN_VENDOR, ConversationType.ADMIN_CUSTOMER, ]: raise HTTPException( status_code=400, detail="Admin can only create admin_vendor or admin_customer conversations", ) # Validate recipient type matches conversation type if ( data.conversation_type == ConversationType.ADMIN_VENDOR and data.recipient_type != ParticipantType.VENDOR ): raise HTTPException( status_code=400, detail="admin_vendor conversations require a vendor recipient", ) if ( data.conversation_type == ConversationType.ADMIN_CUSTOMER and data.recipient_type != ParticipantType.CUSTOMER ): raise HTTPException( status_code=400, detail="admin_customer conversations require a customer recipient", ) # Create conversation conversation = messaging_service.create_conversation( db=db, conversation_type=data.conversation_type, subject=data.subject, initiator_type=ParticipantType.ADMIN, initiator_id=current_admin.id, recipient_type=data.recipient_type, recipient_id=data.recipient_id, vendor_id=data.vendor_id, initial_message=data.initial_message, ) db.commit() db.refresh(conversation) logger.info( f"Admin {current_admin.username} created conversation {conversation.id} " f"with {data.recipient_type.value}:{data.recipient_id}" ) # Return full detail response return _build_conversation_detail(db, conversation, current_admin.id) # ============================================================================ # CONVERSATION DETAIL # ============================================================================ def _build_conversation_detail( db: Session, conversation: Any, current_user_id: int ) -> ConversationDetailResponse: """Build full conversation detail response.""" # Get my participant for unread count my_participant = next( ( p for p in conversation.participants if p.participant_type == ParticipantType.ADMIN and p.participant_id == current_user_id ), None, ) unread_count = my_participant.unread_count if my_participant else 0 # Build participant responses participants = [] for p in conversation.participants: info = messaging_service.get_participant_info( db, p.participant_type, p.participant_id ) participants.append( ParticipantResponse( id=p.id, participant_type=p.participant_type, participant_id=p.participant_id, unread_count=p.unread_count, last_read_at=p.last_read_at, email_notifications=p.email_notifications, muted=p.muted, participant_info=( ParticipantInfo( id=info["id"], type=info["type"], name=info["name"], email=info.get("email"), ) if info else None ), ) ) # Build message responses messages = [_enrich_message(db, m) for m in conversation.messages] # Get vendor name if applicable vendor_name = None if conversation.vendor: vendor_name = conversation.vendor.name return ConversationDetailResponse( id=conversation.id, conversation_type=conversation.conversation_type, subject=conversation.subject, vendor_id=conversation.vendor_id, is_closed=conversation.is_closed, closed_at=conversation.closed_at, closed_by_type=conversation.closed_by_type, closed_by_id=conversation.closed_by_id, last_message_at=conversation.last_message_at, message_count=conversation.message_count, created_at=conversation.created_at, updated_at=conversation.updated_at, participants=participants, messages=messages, unread_count=unread_count, vendor_name=vendor_name, ) @router.get("/{conversation_id}", response_model=ConversationDetailResponse) def get_conversation( conversation_id: int, mark_read: bool = Query(True, description="Automatically mark as read"), db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> ConversationDetailResponse: """Get conversation detail with messages.""" conversation = messaging_service.get_conversation( db=db, conversation_id=conversation_id, participant_type=ParticipantType.ADMIN, participant_id=current_admin.id, ) if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") # Mark as read if requested if mark_read: messaging_service.mark_conversation_read( db=db, conversation_id=conversation_id, reader_type=ParticipantType.ADMIN, reader_id=current_admin.id, ) db.commit() return _build_conversation_detail(db, conversation, current_admin.id) # ============================================================================ # SEND MESSAGE # ============================================================================ @router.post("/{conversation_id}/messages", response_model=MessageResponse) async def send_message( conversation_id: int, content: str = Form(...), files: list[UploadFile] = File(default=[]), db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> MessageResponse: """Send a message in a conversation, optionally with attachments.""" # Verify access conversation = messaging_service.get_conversation( db=db, conversation_id=conversation_id, participant_type=ParticipantType.ADMIN, participant_id=current_admin.id, ) if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") if conversation.is_closed: raise HTTPException( status_code=400, detail="Cannot send messages to a closed conversation" ) # Process attachments attachments = [] for file in files: try: att_data = await message_attachment_service.validate_and_store( db=db, file=file, conversation_id=conversation_id ) attachments.append(att_data) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Send message message = messaging_service.send_message( db=db, conversation_id=conversation_id, sender_type=ParticipantType.ADMIN, sender_id=current_admin.id, content=content, attachments=attachments if attachments else None, ) db.commit() db.refresh(message) logger.info( f"Admin {current_admin.username} sent message {message.id} " f"in conversation {conversation_id}" ) return _enrich_message(db, message) # ============================================================================ # CONVERSATION ACTIONS # ============================================================================ @router.post("/{conversation_id}/close", response_model=CloseConversationResponse) def close_conversation( conversation_id: int, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> CloseConversationResponse: """Close a conversation.""" conversation = messaging_service.close_conversation( db=db, conversation_id=conversation_id, closer_type=ParticipantType.ADMIN, closer_id=current_admin.id, ) if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") db.commit() logger.info( f"Admin {current_admin.username} closed conversation {conversation_id}" ) return CloseConversationResponse( success=True, message="Conversation closed", conversation_id=conversation_id, ) @router.post("/{conversation_id}/reopen", response_model=ReopenConversationResponse) def reopen_conversation( conversation_id: int, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> ReopenConversationResponse: """Reopen a closed conversation.""" conversation = messaging_service.reopen_conversation( db=db, conversation_id=conversation_id, opener_type=ParticipantType.ADMIN, opener_id=current_admin.id, ) if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") db.commit() logger.info( f"Admin {current_admin.username} reopened conversation {conversation_id}" ) return ReopenConversationResponse( success=True, message="Conversation reopened", conversation_id=conversation_id, ) @router.put("/{conversation_id}/read", response_model=MarkReadResponse) def mark_read( conversation_id: int, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> MarkReadResponse: """Mark conversation as read.""" success = messaging_service.mark_conversation_read( db=db, conversation_id=conversation_id, reader_type=ParticipantType.ADMIN, reader_id=current_admin.id, ) db.commit() return MarkReadResponse( success=success, conversation_id=conversation_id, unread_count=0, ) class PreferencesUpdateResponse(BaseModel): """Response for preferences update.""" success: bool @router.put("/{conversation_id}/preferences", response_model=PreferencesUpdateResponse) def update_preferences( conversation_id: int, preferences: NotificationPreferencesUpdate, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ) -> PreferencesUpdateResponse: """Update notification preferences for a conversation.""" success = messaging_service.update_notification_preferences( db=db, conversation_id=conversation_id, participant_type=ParticipantType.ADMIN, participant_id=current_admin.id, email_notifications=preferences.email_notifications, muted=preferences.muted, ) db.commit() return PreferencesUpdateResponse(success=success)