这是WebSocket接收消息事件处理程序。但是,这些订阅正在等待,并且它们当前没有并行运行,因此存在处理时间提示消息。其想法是使这些数据处理程序并行运行,这意味着应该使用Task.WhenAll
。数据处理程序的类型为Func<T, ValueTask>
,因此并行执行是有意义的。这样,如果其中一个数据处理程序包含await Task.Delay(5000)
,它就不会阻塞其他数据处理程序。
private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
var timestamp = DateTimeOffset.Now;
var messageEvent = new MessageEvent(e.Message, timestamp);
foreach (var subscription in _subscriptions
.GetAll()
.Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request)))
{
var userProcessTime = await MeasureUserProcessTime(async () => await subscription.DataHandler(messageEvent));
if (userProcessTime.TotalMilliseconds > 500)
{
_logger.LogTrace("Detected slow data handler ({UserProcessTimeMs} ms user code), consider offloading data handling to another thread. Data from this socket may arrive late or not at all if message processing is continuously slow.",
userProcessTime.TotalMilliseconds);
}
}
}
下面的版本呢?
private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
var timestamp = DateTimeOffset.Now;
var messageEvent = new MessageEvent(e.Message, timestamp);
var handlers = _subscriptions
.GetAll()
.Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request))
.Select(subscription => subscription.DataHandler(messageEvent).AsTask());
_ = Task.WhenAll(handlers);
}
或
private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
var timestamp = DateTimeOffset.Now;
var messageEvent = new MessageEvent(e.Message, timestamp);
var handlers = _subscriptions
.GetAll()
.Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request))
.Select(subscription => Task.Run(() => subscription.DataHandler(messageEvent)));
await Task.WhenAll(handlers);
}
2条答案
按热度按时间nfs0ujit1#
带有
await Task.WhenAll
的版本就可以了,不过我想您仍然希望保留MeasureUserProcessTime
,因为这将为您提供有用的长处理程序警告。67up9zun2#
我想您在某些地方有一些与SemaphoreSlim实体同步的机制,但是无论如何......如果您不对任务的结果做任何事情,为什么不简单地触发并忘记呢?