我在尝试完成以下操作时使用Polly时遇到问题:
- 重新连接逻辑-我尝试创建一个Polly策略,当您尝试在没有Internet连接的情况下执行
StartAsync
时,该策略可以正常工作。但是,当它到达ReceiveLoop
时,策略不再影响该方法,如果我们的连接在该点停止,它永远不会尝试重新连接。它只是抛出以下异常:Disconnected: The remote party closed the WebSocket connection without completing the close handshake.
。也许我应该有两个策略:一个在StartAsync
中,另一个在ReceiveLoop
中,但出于某种原因,我感觉不对,所以这就是我问这个问题的原因。 - 超时-我想为每个
ClientWebSocket
方法调用添加超时。ConnectAsync、SendAsync等我对波莉不太熟悉,但我相信这项政策会自动为我们做到这一点。不过,我需要有人来证实。这里的timeout是指类似于_webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(timeoutMilliseconds)
的逻辑,TimeoutAfter的实现可以在这里找到。一个例子如何其他回购做到这一点可以在这里找到。
简单地说,我想让这个类具有弹性,这意味着无论原因是什么,它都应该快速失败-> 10秒后重试->快速失败->再次重试,以此类推,而不是尝试连接到一个死的Web Socket服务器30秒没有成功。这个 * 等待和重试逻辑 * 应该一直重复,直到我们调用StopAsync
或释放示例。
您可以在GitHub上找到WebSocketDuplexPipe类。
public sealed class Client : IDisposable
{
private const int RetrySeconds = 10;
private readonly WebSocketDuplexPipe _webSocketPipe;
private readonly string _url;
public Client(string url)
{
_url = url;
_webSocketPipe = new WebSocketDuplexPipe();
}
public Task StartAsync(CancellationToken cancellationToken = default)
{
var retryPolicy = Policy
.Handle<Exception>(e => !cancellationToken.IsCancellationRequested)
.WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(RetrySeconds),
(exception, calculatedWaitDuration) =>
{
Console.WriteLine($"{exception.Message}. Retry in {calculatedWaitDuration.TotalSeconds} seconds.");
});
return retryPolicy.ExecuteAsync(async () =>
{
await _webSocketPipe.StartAsync(_url, cancellationToken).ConfigureAwait(false);
_ = ReceiveLoop();
});
}
public Task StopAsync()
{
return _webSocketPipe.StopAsync();
}
public async Task SendAsync(string data, CancellationToken cancellationToken = default)
{
var encoded = Encoding.UTF8.GetBytes(data);
var bufferSend = new ArraySegment<byte>(encoded, 0, encoded.Length);
await _webSocketPipe.Output.WriteAsync(bufferSend, cancellationToken).ConfigureAwait(false);
}
private async Task ReceiveLoop()
{
var input = _webSocketPipe.Input;
try
{
while (true)
{
var result = await input.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
break;
}
if (!buffer.IsEmpty)
{
while (MessageParser.TryParse(ref buffer, out var payload))
{
var message = Encoding.UTF8.GetString(payload);
_messageReceivedSubject.OnNext(message);
}
}
if (result.IsCompleted)
{
break;
}
}
finally
{
input.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Disconnected: {ex.Message}");
}
}
}
1条答案
按热度按时间qvk1mo1f1#
让我通过评论来捕捉我们谈话的本质。
ReceiveLoop
带重试在等待
input.ReadAsync
完成时,您的重试策略将从ExecuteAsync
退出并成功。原因是你没有等待ReceiveLoop
,而是你只是把它踢掉在火和忘记的方式。换句话说,重试逻辑只适用于
StartAsync
和ReceiveLoop
中await之前的代码。修复方法是将重试逻辑移动到
ReceiveLoop
内部。超时
Polly的超时策略可以使用乐观或悲观策略。前者严重依赖于
CancellationToken
。CancellationToken.None
到ExecuteAsync
,那么你基本上说让TimeoutPolicy处理取消过程。请记住,它将抛出
TimeoutRejectedException
而不是OperationCanceledException
。onTimeoutAsync
TimeoutAsync
有几个重载可以接受两个onTimeoutAsync
委托之一或
如果您有一个在
TimeoutRejectedException
上触发的外部策略(例如重试),那么这对于记录发生超时的事实非常有用。链接策略
我建议使用
Policy.WrapAsync
静态方法代替AsyncPolicy
的WrapAsync
示例方法。使用这种方法,重试策略的定义不会显式引用超时策略。相反,您有两个单独的策略和一个链接的策略。