.net 具有动态MaxDegreeOfParallelism的自定义数据流转换块

cqoc49vn  于 2022-12-14  发布在  .NET
关注(0)|答案(1)|浏览(100)

我接收对象序列(例如ItemGroup[]),每个对象包含多个作业(例如Item[])和最大并行度值,例如:

public record Item(string Name);
public record ItemGroup(Item[] Items, int MaxDegreeOfParallelism);

必须按顺序处理ItemGroup示例序列,但每个ItemGroup的最大并行度可能大于1。例如,管道将按顺序处理A*项组,然后并发处理B*项组:

var groups = new[]
{
    new ItemGroup(new[] { new Item("A0"), new Item("A1"), new Item("A2") }, 1),
    new ItemGroup(new[] { new Item("B0"), new Item("B1"), new Item("B2") }, 3)
};

我认为从IPropagatorBlock<ItemGroup, Item>派生的定制TransformManyBlock实现是一个不错的选择,但是我不清楚如何正确地等待内部动态创建的TransformManyBlock,因为生产者将ItemGroup示例发送到它。
有人能带我去吗?

uhry853o

uhry853o1#

您可以为接收到的每个ItemGroup创建一个内部TransformBlock<Item, Item>。下面是一个具有TInputTChildTOutput泛型参数的通用解决方案。TInput对应于ItemGroupTChild对应于Item,并且TOutput也是Item,因为您传播了这些项而没有对其进行转换:

public static IPropagatorBlock<TInput, TOutput> CreateTransformManyDynamicBlock
    <TInput, TChild, TOutput>(
    Func<TInput, IEnumerable<TChild>> childrenSelector,
    Func<TInput, int> degreeOfParallelismSelector,
    Func<TChild, TOutput> transformChild)
{
    ArgumentNullException.ThrowIfNull(childrenSelector);
    ArgumentNullException.ThrowIfNull(degreeOfParallelismSelector);
    ArgumentNullException.ThrowIfNull(transformChild);

    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        TransformBlock<TChild, TOutput> innerBlock = new(transformChild, new()
        {
            MaxDegreeOfParallelism = degreeOfParallelismSelector(input)
        });
        foreach (var child in childrenSelector(input))
        {
            bool accepted = innerBlock.Post(child);
            if (!accepted) break; // The innerBlock has failed
        }
        innerBlock.Complete();

        // Propagate the results
        List<TOutput> results = new();
        while (await innerBlock.OutputAvailableAsync().ConfigureAwait(false))
            while (innerBlock.TryReceive(out TOutput result))
                results.Add(result);
        try { await innerBlock.Completion.ConfigureAwait(false); }
        catch when (innerBlock.Completion.IsCanceled) { throw; }
        catch { innerBlock.Completion.Wait(); } // Propagate AggregateException
        return results;
    });
}

用法示例:

IPropagatorBlock<ItemGroup, Item> block =
    CreateTransformManyDynamicBlock<ItemGroup, Item, Item>(
        x => x.Items, x => x.MaxDegreeOfParallelism, x => x);

**注意:**上述代码尚未经过测试。
**更新:**我最初的实现(revision 1)是基于.NET 6 API ReceiveAllAsync,以及接受Func<TInput,IAsyncEnumerable<TOutput>>参数的TransformManyBlock构造函数(.NET 7)。问题是ReceiveAllAsync不传播枚举数据流块的异常,所以我改为手动收集和传播结果,方法是填充List<TOutput>,如本答案所示。

相关问题