websocket 使用Polly进行重新连接和超时

ffdz8vbo  于 2023-06-06  发布在  其他
关注(0)|答案(1)|浏览(468)

我在尝试完成以下操作时使用Polly时遇到问题:

  • 重新连接逻辑-我尝试创建一个Polly策略,当您尝试在没有Internet连接的情况下执行StartAsync时,该策略可以正常工作。但是,当它到达ReceiveLoop时,策略不再影响该方法,如果我们的连接在该点停止,它永远不会尝试重新连接。它只是抛出以下异常:Disconnected: The remote party closed the WebSocket connection without completing the close handshake.。也许我应该有两个策略:一个在StartAsync中,另一个在ReceiveLoop中,但出于某种原因,我感觉不对,所以这就是我问这个问题的原因。
  • 超时-我想为每个ClientWebSocket方法调用添加超时。ConnectAsync、SendAsync等我对波莉不太熟悉,但我相信这项政策会自动为我们做到这一点。不过,我需要有人来证实。这里的timeout是指类似于_webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(timeoutMilliseconds)的逻辑,TimeoutAfter的实现可以在这里找到。一个例子如何其他回购做到这一点可以在这里找到。

简单地说,我想让这个类具有弹性,这意味着无论原因是什么,它都应该快速失败-> 10秒后重试->快速失败->再次重试,以此类推,而不是尝试连接到一个死的Web Socket服务器30秒没有成功。这个 * 等待和重试逻辑 * 应该一直重复,直到我们调用StopAsync或释放示例。
您可以在GitHub上找到WebSocketDuplexPipe类。

public sealed class Client : IDisposable
{
    private const int RetrySeconds = 10;
    private readonly WebSocketDuplexPipe _webSocketPipe;
    private readonly string _url;

    public Client(string url)
    {
        _url = url;
        _webSocketPipe = new WebSocketDuplexPipe();
    }

    public Task StartAsync(CancellationToken cancellationToken = default)
    {
        var retryPolicy = Policy
            .Handle<Exception>(e => !cancellationToken.IsCancellationRequested)
            .WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(RetrySeconds),
                (exception, calculatedWaitDuration) =>
                {
                    Console.WriteLine($"{exception.Message}. Retry in {calculatedWaitDuration.TotalSeconds} seconds.");
                });

        return retryPolicy.ExecuteAsync(async () =>
        {
            await _webSocketPipe.StartAsync(_url, cancellationToken).ConfigureAwait(false);
            _ = ReceiveLoop();
        });
    }

    public Task StopAsync()
    {
        return _webSocketPipe.StopAsync();
    }

    public async Task SendAsync(string data, CancellationToken cancellationToken = default)
    {
        var encoded = Encoding.UTF8.GetBytes(data);
        var bufferSend = new ArraySegment<byte>(encoded, 0, encoded.Length);
        await _webSocketPipe.Output.WriteAsync(bufferSend, cancellationToken).ConfigureAwait(false);
    }

    private async Task ReceiveLoop()
    {
        var input = _webSocketPipe.Input;

        try
        {
            while (true)
            {
                var result = await input.ReadAsync().ConfigureAwait(false);
                var buffer = result.Buffer;

                try
                {
                    if (result.IsCanceled)
                    {
                        break;
                    }

                    if (!buffer.IsEmpty)
                    {
                        while (MessageParser.TryParse(ref buffer, out var payload))
                        {
                            var message = Encoding.UTF8.GetString(payload);

                            _messageReceivedSubject.OnNext(message);
                        }
                    }

                    if (result.IsCompleted)
                    {
                        break;
                    }
                }
                finally
                {
                    input.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Disconnected: {ex.Message}");
        }
    }
}
qvk1mo1f

qvk1mo1f1#

让我通过评论来捕捉我们谈话的本质。

ReceiveLoop带重试

在等待input.ReadAsync完成时,您的重试策略将从ExecuteAsync退出并成功。原因是你没有等待ReceiveLoop,而是你只是把它踢掉在火和忘记的方式。
换句话说,重试逻辑只适用于StartAsyncReceiveLoop中await之前的代码。
修复方法是将重试逻辑移动到ReceiveLoop内部。

超时

Polly的超时策略可以使用乐观或悲观策略。前者严重依赖于CancellationToken

  • 所以,如果你传递CancellationToken.NoneExecuteAsync,那么你基本上说让TimeoutPolicy处理取消过程。
  • 如果传递一个已经存在的令牌,则可以通过TimeoutPolicy或提供的令牌取消修饰的Task。

请记住,它将抛出TimeoutRejectedException而不是OperationCanceledException

onTimeoutAsync

TimeoutAsync有几个重载可以接受两个onTimeoutAsync委托之一

Func<Context, TimeSpan, Task, Task> onTimeoutAsync

Func<Context, TimeSpan, Task, Exception, Task> onTimeoutAsync

如果您有一个在TimeoutRejectedException上触发的外部策略(例如重试),那么这对于记录发生超时的事实非常有用。

链接策略

我建议使用Policy.WrapAsync静态方法代替AsyncPolicyWrapAsync示例方法。

var timeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(timeoutMs), TimeoutStrategy.Optimistic,
    (context, timeSpan, task, ex) =>
    {
        Console.WriteLine($"Timeout {timeSpan.TotalSeconds} seconds");
        return Task.CompletedTask;
    });

var retryPolicy = Policy
    .Handle<Exception>(ex =>
    {
        Console.WriteLine($"Exception tralal: {ex.Message}");
        return true;
    })
    .WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(retryBackOffMs),
    (ex, retryCount, calculatedWaitDuration) =>
    {
        Console.WriteLine(
            $"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {ex.Message}) (Retry count: {retryCount})");
    });

var resilientStrategy = Policy.WrapAsync(retryPolicy, timeoutPolicy);

使用这种方法,重试策略的定义不会显式引用超时策略。相反,您有两个单独的策略和一个链接的策略。

相关问题