.net 使代码并行运行,而不是等待每个数据处理程序

c90pui9n  于 2023-01-27  发布在  .NET
关注(0)|答案(2)|浏览(72)

这是WebSocket接收消息事件处理程序。但是,这些订阅正在等待,并且它们当前没有并行运行,因此存在处理时间提示消息。其想法是使这些数据处理程序并行运行,这意味着应该使用Task.WhenAll。数据处理程序的类型为Func<T, ValueTask>,因此并行执行是有意义的。这样,如果其中一个数据处理程序包含await Task.Delay(5000),它就不会阻塞其他数据处理程序。

private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
    var timestamp = DateTimeOffset.Now;

    var messageEvent = new MessageEvent(e.Message, timestamp);

    foreach (var subscription in _subscriptions
                    .GetAll()
                    .Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request)))
    {
        var userProcessTime = await MeasureUserProcessTime(async () => await subscription.DataHandler(messageEvent));

        if (userProcessTime.TotalMilliseconds > 500)
        {
            _logger.LogTrace("Detected slow data handler ({UserProcessTimeMs} ms user code), consider offloading data handling to another thread. Data from this socket may arrive late or not at all if message processing is continuously slow.",
                userProcessTime.TotalMilliseconds);
        }
    }
}

下面的版本呢?

private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
    var timestamp = DateTimeOffset.Now;

    var messageEvent = new MessageEvent(e.Message, timestamp);

    var handlers = _subscriptions
        .GetAll()
        .Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request))
        .Select(subscription => subscription.DataHandler(messageEvent).AsTask());

    _ = Task.WhenAll(handlers);
}

private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
    var timestamp = DateTimeOffset.Now;

    var messageEvent = new MessageEvent(e.Message, timestamp);

    var handlers = _subscriptions
        .GetAll()
        .Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request))
        .Select(subscription => Task.Run(() => subscription.DataHandler(messageEvent)));

    await Task.WhenAll(handlers);
}
nfs0ujit

nfs0ujit1#

带有await Task.WhenAll的版本就可以了,不过我想您仍然希望保留MeasureUserProcessTime,因为这将为您提供有用的长处理程序警告。

67up9zun

67up9zun2#

我想您在某些地方有一些与SemaphoreSlim实体同步的机制,但是无论如何......如果您不对任务的结果做任何事情,为什么不简单地触发并忘记呢?

private  void OnDataReceived(DataReceivedEventArgs e)
    {
        if (e is null)
        {
            _logger.Warning("!!!!!!!");
            return;
        }
        var timestamp = DateTimeOffset.Now;

        var messageEvent = new MessageEvent(e.Message, timestamp);

        foreach (var subscription in _subscriptions
              .GetAll()
              .Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request)))
        {
            Task.Run(() =>
            {
                try
                {
                    var handlers = _subscriptions
                        .GetAll()
                        .Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request))
                        .Select(subscription => subscription.DataHandler(messageEvent).AsTask());
                    var userProcessTime = await MeasureUserProcessTime(async () => await subscription.DataHandler(messageEvent));

                    if (userProcessTime.TotalMilliseconds > 500)
                    {
                        _logger.LogTrace("Detected slow data handler ({UserProcessTimeMs} ms user code), consider offloading data handling to another thread. Data from this socket may arrive late or not at all if message processing is continuously slow.",
                            userProcessTime.TotalMilliseconds);
                    }
                }
                catch (Exception ex)
                {
                }
            });
        }             
    }

相关问题