我是Djnago的新手,也是WebSockets的新手,所以我对在处理消息时使用的正确集成有点不确定。
我有一个API视图来向用户发送消息,该视图将该消息添加到数据库中,并创建一个WebSocket消息。我也有一个API视图来获取2个用户之间的对话。
我的逻辑是,当用户在前端打开他们的对话时,get_conversation API视图将运行以获取他们以前的文本,然后在前端,WebSocket将用于动态更新他们的屏幕,其中包含他们当前使用send_message
API视图发送的新文本。
我目前的逻辑/实现是否是通常使用的正确方法,或者是否有一些可以不同的方法?
这是我用来发送消息的API视图:
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def send_message(request, receiver_id):
sender = request.user
content = request.data.get('content')
# Ensures there is content within the message
if not content:
return Response({"error": "Message content is required"}, status=status.HTTP_400_BAD_REQUEST)
# Ensures the receiving user exists
try:
receiver = User.objects.get(id=receiver_id)
except User.DoesNotExist:
return Response({"error": "Receiver not found"}, status=status.HTTP_404_NOT_FOUND)
# Ensures the user is not sending a message to themselves
if sender == receiver:
return Response({"error": "Cannot send message to yourself"}, status=status.HTTP_400_BAD_REQUEST)
# Create a unique room group name using both sender and receiver IDs
room_group_name = f"group_{min(sender.id, receiver.id)}_{max(sender.id, receiver.id)}"
try:
# Use an atomic transaction for creating the Message instance, and informing the WebSocket of the new message
with transaction.atomic():
# Create the message
message = Message.objects.create(sender=sender, receiver=receiver, content=content, is_delivered=True)
# Notify WebSocket group about the new message
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "chat.message",
"content": content,
"unique_identifier": str(message.id) # Use message's ID as unique_identifier
}
)
except Exception as e:
return Response({"error": "An error occurred while sending the message"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
serializer = MessageSerializer(message)
return Response(serializer.data, status=status.HTTP_201_CREATED)
这是我用来获取两个用户之间的消息对话的API视图:
@permission_classes([IsAuthenticated])
def get_conversation(request, user_id):
# Obtain the user the requesting user has the conversation with
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return Response({"error": "User not found"}, status=status.HTTP_404_NOT_FOUND)
# Set a default page size of 20 returned datasets per page
default_page_size = 20
# Utility function to get current page number and page size from the request's query parameters and calculate the pagination slicing indeces
start_index, end_index, validation_response = get_pagination_indeces(request, default_page_size)
if validation_response:
return validation_response
# Get all messages between the requesting user and the receiver
messages = Message.objects.filter(
Q(sender=request.user, receiver=user) | Q(sender=user, receiver=request.user))[start_index:end_index]
# Get the most recent message in the conversation
most_recent_message = messages.first()
# Determine the status of the most recent message for the sender (if the receiver has seen their message or not)
most_recent_sender_status = None # if the requesting user is the receiver then they don't need a status
if most_recent_message and most_recent_message.sender == request.user:
most_recent_sender_status = {
"id": most_recent_message.id,
"is_read": most_recent_message.is_read
}
# Update unread messages sent by the `user` since the `requesting user` has viewed them after calling this API
Message.objects.filter(sender=user, receiver=request.user, is_read=False).update(is_read=True)
serializer = MessageSerializer(messages, many=True)
return Response({
"messages": serializer.data,
"most_recent_sender_status": most_recent_sender_status,
}, status=status.HTTP_200_OK)
这是我的WebSocket消费者:
# WebSocket consumer to handle live messaging between users
class MessageConsumer(AsyncWebsocketConsumer):
# Retrieve a user from the database by their authentication token.
# wrapped with @database_sync_to_async to allow database access in an asynchronous context.
@database_sync_to_async
def get_user_by_token(self, token):
try:
return User.objects.get(auth_token=token)
except User.DoesNotExist:
return None
# Retrieve a message from the database by its ID
@database_sync_to_async
def get_message(self, message_id):
try:
return Message.objects.get(id=message_id)
except Message.DoesNotExist:
return None
# Initiates a WebSocket connection for live messaging.
async def connect(self):
# Get headers from the connection's scope
headers = dict(self.scope["headers"])
# If user is not authenticated, send an authentication required message and close the connection
if b"authorization" not in headers:
await self.accept()
await self.send(text_data=json.dumps({
"type": "authentication_required",
"message": "Authentication is required to access notifications."
}))
await self.close()
return
# Extract the token from the Authorization header and get the sender user associated with the token
token = headers[b"authorization"].decode("utf-8").split()[1]
sender = await self.get_user_by_token(token)
if sender is None:
await self.close()
return
sender_id = int(sender.id)
# Store the authenticated user's id to be used in the mark_message_as_read function
self.auth_user = sender
# Extract receiver ID from the WebSocket URL parameter
receiver_id = int(self.scope['url_route']['kwargs']['receiver_id'])
# Create a unique room group name using both sender and receiver IDs
self.room_group_name = f"group_{min(sender_id, receiver_id)}_{max(sender_id, receiver_id)}"
# Join the conversation group
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
# Accept the WebSocket connection
await self.accept()
# Handles disconnection from the messaging session.
async def disconnect(self, close_code):
if hasattr(self, 'notification_group'):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
# Marks a message in the Live WebSocket conversation as read if the user reading the message is the receiver
async def mark_message_as_read(self, unique_identifier):
# Find the message by its unique identifier and mark it as read
try:
message_id = int(unique_identifier)
message = await self.get_message(message_id)
# Check the receiver of the notification is the authenticated user and the message is not read
if message.receiver == self.auth_user and not message.is_read:
message.is_read = True
message.save()
except (ValueError, Message.DoesNotExist):
pass
# Receives incoming messages from the WebSocket connection and relays the messages to other users in the same chat group.
async def receive(self, text_data):
text_data_json = json.loads(text_data)
content = text_data_json["content"]
unique_identifier = text_data_json["unique_identifier"]
# Update is_read status for the received message
await self.mark_message_as_read(unique_identifier)
# Send the received message to chat room group
await self.channel_layer.group_send(
self.room_group_name,
{
"type": "chat.message",
"content": content,
"unique_identifier": unique_identifier,
}
)
# Relays messages to users in the chat group and sends the message to the original consumer (WebSocket connection).
async def chat_message(self, event):
content = event["content"]
unique_identifier = event["unique_identifier"]
await self.send(text_data=json.dumps({
"type": "message",
"content": content,
"unique_identifier": unique_identifier,
}))
# Send a WebSocket message to the client indicating that a message should be removed
async def remove_message(self, event):
unique_identifier = event["unique_identifier"]
await self.send(text_data=json.dumps({
"type": "remove_message",
"unique_identifier": unique_identifier,
}))
1条答案
按热度按时间oaxa6hgo1#
您当前使用WebSocket和Django API视图进行消息传递是正确的。您正在使用WebSocket进行实时更新,并且每个对话都有唯一的房间组名称。您正在处理消息的发送和接收,而WebSocket Consumer结构适用于使用WebSocket的消息传递系统。但是,您的方法可能需要根据应用程序的具体要求进行调整。