自定义LINQ运算符未并发运行的任务

tquggr8v  于 2023-01-28  发布在  其他
关注(0)|答案(1)|浏览(137)

我正在尝试创建一个SelectAwait(和其他)的并发版本,作为System.Linq.Async的一部分,它提供IAsyncEnumerable的扩展方法。

private async IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
    this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
    var sem = new SemaphoreSlim(1, 10);
    
    var retVal = enumerable.Select(item => {
        var task = Task.Run(async () => {
            await sem.WaitAsync();
            var retVal = await predicate(item);
            sem.Release();

            return retVal;
        });

        return task;
    });

    await foreach (var item in retVal)
        yield return await item;
}

Enumerable是0-1000之间的简单可枚举值。代码被调用为

.SelectParallelAsync(async i =>
{
    Console.WriteLine($"In Select : {i}");
    await Task.Delay(1000);
    return i + 5;
});

我希望所有的任务都能立即启动,一次运行10个。然而,它们一个接一个地被触发。有什么方法可以让我实现这样的事情吗?非常感谢。
编辑:我使用信号量而不是Parallel.ForEach.AsParallel().WithMaxDegreeOfParallelism,因为我想在多个方法之间共享这个信号量。此外,PLINQ的可扩展性不是很好,我不能向它添加我自己的扩展方法。

1tuwyuhd

1tuwyuhd1#

IAsyncEnumerable<T> enumerable的枚举由结果AsyncEnumerable<TOut>的枚举驱动。当结果序列的消费者请求序列的第一个TOut元素时,此时将从源IAsyncEnumerable<T> enumerable请求T值。然后将该值投影到Task<TOut>,然后等待此任务。最后任务的结果将返回给消费者。2所有的事情都是顺序发生的。3没有并发性。4在消费者请求一个元素之前,以及在元素被交付给消费者之后,没有内部活动。
向LINQ运算符添加并发性比乍看上去要复杂得多。这意味着,当使用者请求第一个元素时,必须同时启动10个任务。当其中任何一个任务完成时,必须在其位置自动启动另一个任务,而不需要使用者请求它。并且必须限制可以在内部存储的任务数量。消费者还没有请求的任务。当达到这个限制时,不应该再启动任务,直到消费者获取一个任务并创建一个空槽。你必须考虑如何处理主动启动任务并监视其完成的内部机制,以防消费者决定它已经足够了,并且不会请求更多的元素(通过退出消费循环)。你还必须考虑如何处理存储的任务,以防将要交付给消费者的任务失败。如果多个任务失败了呢?如果枚举被CancellationToken取消了呢?
只使用像TaskCompletionSource s和SemaphoreSlim s这样的基本工具,而不使用像Channel<T>这样的高级工具,正确地完成所有这些任务是极其困难的。如果您不熟悉Channel<T>,我的建议是花一些时间熟悉它。这是一个相当简单的机制。如果您了解BlockingCollection<T>类,Channel<T>是它的异步版本。
在另一个问题中,我已经发布了一个AwaitResults方法,它可以很容易地实现SelectParallelAsync操作符:

private IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
    this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
    return enumerable
        .Select(item => predicate(item))
        .AwaitResults(maxConcurrency: 10);
}

您可以研究该实现,并对其进行更改以满足您的需要。

相关问题