如何将WebSocket与Django API视图集成以进行消息传递

kiayqfof  于 2023-10-20  发布在  Go
关注(0)|答案(1)|浏览(126)

我是Djnago的新手,也是WebSockets的新手,所以我对在处理消息时使用的正确集成有点不确定。
我有一个API视图来向用户发送消息,该视图将该消息添加到数据库中,并创建一个WebSocket消息。我也有一个API视图来获取2个用户之间的对话。
我的逻辑是,当用户在前端打开他们的对话时,get_conversation API视图将运行以获取他们以前的文本,然后在前端,WebSocket将用于动态更新他们的屏幕,其中包含他们当前使用send_message API视图发送的新文本。
我目前的逻辑/实现是否是通常使用的正确方法,或者是否有一些可以不同的方法?
这是我用来发送消息的API视图:

@api_view(['POST'])
@permission_classes([IsAuthenticated])
def send_message(request, receiver_id):
    sender = request.user
    content = request.data.get('content')

    # Ensures there is content within the message
    if not content:
        return Response({"error": "Message content is required"}, status=status.HTTP_400_BAD_REQUEST)

    # Ensures the receiving user exists
    try:
        receiver = User.objects.get(id=receiver_id)
    except User.DoesNotExist:
        return Response({"error": "Receiver not found"}, status=status.HTTP_404_NOT_FOUND)

    # Ensures the user is not sending a message to themselves
    if sender == receiver:
        return Response({"error": "Cannot send message to yourself"}, status=status.HTTP_400_BAD_REQUEST)

    # Create a unique room group name using both sender and receiver IDs
    room_group_name = f"group_{min(sender.id, receiver.id)}_{max(sender.id, receiver.id)}"

    try:
        # Use an atomic transaction for creating the Message instance, and informing the WebSocket of the new message
        with transaction.atomic():
            # Create the message
            message = Message.objects.create(sender=sender, receiver=receiver, content=content, is_delivered=True)

            # Notify WebSocket group about the new message
            channel_layer = get_channel_layer()
            async_to_sync(channel_layer.group_send)(
                room_group_name,
                {
                    "type": "chat.message",
                    "content": content,
                    "unique_identifier": str(message.id)  # Use message's ID as unique_identifier
                }
            )
    except Exception as e:
        return Response({"error": "An error occurred while sending the message"},
                        status=status.HTTP_500_INTERNAL_SERVER_ERROR)

    serializer = MessageSerializer(message)

    return Response(serializer.data, status=status.HTTP_201_CREATED)

这是我用来获取两个用户之间的消息对话的API视图:

@permission_classes([IsAuthenticated])
def get_conversation(request, user_id):
    # Obtain the user the requesting user has the conversation with
    try:
        user = User.objects.get(id=user_id)
    except User.DoesNotExist:
        return Response({"error": "User not found"}, status=status.HTTP_404_NOT_FOUND)

    # Set a default page size of 20 returned datasets per page
    default_page_size = 20
    # Utility function to get current page number and page size from the request's query parameters and calculate the pagination slicing indeces
    start_index, end_index, validation_response = get_pagination_indeces(request, default_page_size)
    if validation_response:
        return validation_response

    # Get all messages between the requesting user and the receiver
    messages = Message.objects.filter(
        Q(sender=request.user, receiver=user) | Q(sender=user, receiver=request.user))[start_index:end_index]

    # Get the most recent message in the conversation
    most_recent_message = messages.first()

    # Determine the status of the most recent message for the sender (if the receiver has seen their message or not)
    most_recent_sender_status = None  # if the requesting user is the receiver then they don't need a status
    if most_recent_message and most_recent_message.sender == request.user:
        most_recent_sender_status = {
            "id": most_recent_message.id,
            "is_read": most_recent_message.is_read
        }

    # Update unread messages sent by the `user` since the `requesting user` has viewed them after calling this API
    Message.objects.filter(sender=user, receiver=request.user, is_read=False).update(is_read=True)

    serializer = MessageSerializer(messages, many=True)

    return Response({
        "messages": serializer.data,
        "most_recent_sender_status": most_recent_sender_status,
    }, status=status.HTTP_200_OK)

这是我的WebSocket消费者:

# WebSocket consumer to handle live messaging between users
class MessageConsumer(AsyncWebsocketConsumer):

    # Retrieve a user from the database by their authentication token.
    # wrapped with @database_sync_to_async to allow database access in an asynchronous context.
    @database_sync_to_async
    def get_user_by_token(self, token):
        try:
            return User.objects.get(auth_token=token)
        except User.DoesNotExist:
            return None

    # Retrieve a message from the database by its ID
    @database_sync_to_async
    def get_message(self, message_id):
        try:
            return Message.objects.get(id=message_id)
        except Message.DoesNotExist:
            return None

    # Initiates a WebSocket connection for live messaging.
    async def connect(self):
        # Get headers from the connection's scope
        headers = dict(self.scope["headers"])

        # If user is not authenticated, send an authentication required message and close the connection
        if b"authorization" not in headers:
            await self.accept()
            await self.send(text_data=json.dumps({
                "type": "authentication_required",
                "message": "Authentication is required to access notifications."
            }))
            await self.close()
            return

        # Extract the token from the Authorization header and get the sender user associated with the token
        token = headers[b"authorization"].decode("utf-8").split()[1]
        sender = await self.get_user_by_token(token)
        if sender is None:
            await self.close()
            return
        sender_id = int(sender.id)

        # Store the authenticated user's id to be used in the mark_message_as_read function
        self.auth_user = sender

        # Extract receiver ID from the WebSocket URL parameter
        receiver_id = int(self.scope['url_route']['kwargs']['receiver_id'])

        # Create a unique room group name using both sender and receiver IDs
        self.room_group_name = f"group_{min(sender_id, receiver_id)}_{max(sender_id, receiver_id)}"

        # Join the conversation group
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)

        # Accept the WebSocket connection
        await self.accept()

    # Handles disconnection from the messaging session.
    async def disconnect(self, close_code):
        if hasattr(self, 'notification_group'):
            await self.channel_layer.group_discard(
                self.room_group_name,
                self.channel_name
            )

    # Marks a message in the Live WebSocket conversation as read if the user reading the message is the receiver
    async def mark_message_as_read(self, unique_identifier):
        # Find the message by its unique identifier and mark it as read
        try:
            message_id = int(unique_identifier)
            message = await self.get_message(message_id)

            # Check the receiver of the notification is the authenticated user and the message is not read
            if message.receiver == self.auth_user and not message.is_read:
                message.is_read = True
                message.save()
        except (ValueError, Message.DoesNotExist):
            pass

    # Receives incoming messages from the WebSocket connection and relays the messages to other users in the same chat group.
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        content = text_data_json["content"]
        unique_identifier = text_data_json["unique_identifier"]

        # Update is_read status for the received message
        await self.mark_message_as_read(unique_identifier)

        # Send the received message to chat room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                "type": "chat.message",
                "content": content,
                "unique_identifier": unique_identifier,
            }
        )

    # Relays messages to users in the chat group and sends the message to the original consumer (WebSocket connection).
    async def chat_message(self, event):
        content = event["content"]
        unique_identifier = event["unique_identifier"]

        await self.send(text_data=json.dumps({
            "type": "message",
            "content": content,
            "unique_identifier": unique_identifier,
        }))

    # Send a WebSocket message to the client indicating that a message should be removed
    async def remove_message(self, event):
        unique_identifier = event["unique_identifier"]

        await self.send(text_data=json.dumps({
            "type": "remove_message",
            "unique_identifier": unique_identifier,
        }))
oaxa6hgo

oaxa6hgo1#

您当前使用WebSocket和Django API视图进行消息传递是正确的。您正在使用WebSocket进行实时更新,并且每个对话都有唯一的房间组名称。您正在处理消息的发送和接收,而WebSocket Consumer结构适用于使用WebSocket的消息传递系统。但是,您的方法可能需要根据应用程序的具体要求进行调整。

相关问题