在.NET中使用websockets时处理接收多个消息/多个块

p3rjfoxz  于 2023-08-05  发布在  .NET
关注(0)|答案(1)|浏览(101)

我正在实现一个WebSocket服务器,并使用二进制数据(而不是文本)在服务器和客户端之间进行通信。当等待消息接收并处理时,我是否应该关心在多个块中接收到的消息,以及当我使用以下代码时同时接收到的两条消息?

var buffer = new byte[256];
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

字符串

  • 我确定所有由(有效)客户端/服务器发送的消息都在256字节长的缓冲区内,并且在使用WebSocket.SendAsync()时EndOfMessage标志设置为true;

如果是的话,我希望看到一个最佳实践代码,当有一个“await Process(byte[] data)”方法一次只处理一条消息时,它能正确地处理接收消息。
我使用下面的代码来处理多个块的消息,但不确定它是否过于工程化。

while (socket.State == WebSocketState.Open)
{
    var buffer = new byte[256];
    await using var dataStream = new MemoryStream();
    WebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
            return;
        }
        await dataStream.WriteAsync(buffer);
    } while (!result.EndOfMessage);
    await Process(dataStream.ToArray()); //Process functions do not detect multiple messages, and just can process one message at a time
    dataStream.SetLength(0); //Clear memory stream data
}

nhjlsmyf

nhjlsmyf1#

如果能看到正确处理接收消息的最佳实践代码,我将不胜感激
不确定最佳实践,但对于上面的评论,我会做一些重构:

var buffer = new Memory<byte>(new byte[256]);    

while (socket.State == WebSocketState.Open)
{
    ValueWebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(buffer, CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
            return;
        }            
    } 
    while (!result.EndOfMessage);        
    //Process functions do not detect multiple messages, and just can process one message at a time
    await Process(buffer[..result.Count]); // range op for Memory<byte> is quite cheap         
}

字符串
因此,所做的更改:

  1. byte[]替换为Memory<byte>(),因为Range运算符使用(buffer[..result.Count])不会导致该类型的内存分配。
  2. socket.ReceiveAsync()的其他重载用于返回ValueTask<WebSocketReceiveResult>(而不是Task<>),因此,每次调用内存分配时节省8字节。
    1.如果消息的大小不完全是256,福尔斯该范围内,则必须考虑result.Count
    1.有趣的部分:由于数据是按顺序处理的,我们可以进一步使用内存,在MemoryStream中根本不需要。
    4.2.如果我们想为这段逻辑获得更多的带宽,我们不能摆脱MemoryStream,必须使用额外的分配来复制Process(..)方法调用的数据。
    代码会有点不同:
var buffer = new Memory<byte>(new byte[256]);
await using var dataStream = new MemoryStream();

while (socket.State == WebSocketState.Open)
{
    ValueWebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(buffer, CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
            return;
        }
        await dataStream.WriteAsync(buffer[..result.Count]);
    } while (!result.EndOfMessage);
    //Process asynchronously, so we don't wait 
    //but have to handle all of the branches there: normal/error/cancelled
    // have to make extra allocation for data copy
    _ = Process(dataStream.ToArray());
    dataStream.SetLength(0); //Clear memory stream data
}

PS:如果你知道你的消息大小(在这个简单的情况下-你的协议),就可以摆脱while (!result.EndOfMessage);指令。这将是相当不可靠的,但只是为了指出最佳带宽的情况。

相关问题