我正在尝试创建一个SelectAwait
(和其他)的并发版本,作为System.Linq.Async
的一部分,它提供IAsyncEnumerable
的扩展方法。
private async IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
var sem = new SemaphoreSlim(1, 10);
var retVal = enumerable.Select(item => {
var task = Task.Run(async () => {
await sem.WaitAsync();
var retVal = await predicate(item);
sem.Release();
return retVal;
});
return task;
});
await foreach (var item in retVal)
yield return await item;
}
Enumerable是0-1000之间的简单可枚举值。代码被调用为
.SelectParallelAsync(async i =>
{
Console.WriteLine($"In Select : {i}");
await Task.Delay(1000);
return i + 5;
});
我希望所有的任务都能立即启动,一次运行10个。然而,它们一个接一个地被触发。有什么方法可以让我实现这样的事情吗?非常感谢。
编辑:我使用信号量而不是Parallel.ForEach
或.AsParallel().WithMaxDegreeOfParallelism
,因为我想在多个方法之间共享这个信号量。此外,PLINQ的可扩展性不是很好,我不能向它添加我自己的扩展方法。
1条答案
按热度按时间1tuwyuhd1#
源
IAsyncEnumerable<T> enumerable
的枚举由结果AsyncEnumerable<TOut>
的枚举驱动。当结果序列的消费者请求序列的第一个TOut
元素时,此时将从源IAsyncEnumerable<T> enumerable
请求T
值。然后将该值投影到Task<TOut>
,然后等待此任务。最后任务的结果将返回给消费者。2所有的事情都是顺序发生的。3没有并发性。4在消费者请求一个元素之前,以及在元素被交付给消费者之后,没有内部活动。向LINQ运算符添加并发性比乍看上去要复杂得多。这意味着,当使用者请求第一个元素时,必须同时启动10个任务。当其中任何一个任务完成时,必须在其位置自动启动另一个任务,而不需要使用者请求它。并且必须限制可以在内部存储的任务数量。消费者还没有请求的任务。当达到这个限制时,不应该再启动任务,直到消费者获取一个任务并创建一个空槽。你必须考虑如何处理主动启动任务并监视其完成的内部机制,以防消费者决定它已经足够了,并且不会请求更多的元素(通过退出消费循环)。你还必须考虑如何处理存储的任务,以防将要交付给消费者的任务失败。如果多个任务失败了呢?如果枚举被
CancellationToken
取消了呢?只使用像
TaskCompletionSource
s和SemaphoreSlim
s这样的基本工具,而不使用像Channel<T>
这样的高级工具,正确地完成所有这些任务是极其困难的。如果您不熟悉Channel<T>
,我的建议是花一些时间熟悉它。这是一个相当简单的机制。如果您了解BlockingCollection<T>
类,Channel<T>
是它的异步版本。在另一个问题中,我已经发布了一个
AwaitResults
方法,它可以很容易地实现SelectParallelAsync
操作符:您可以研究该实现,并对其进行更改以满足您的需要。