我接收对象序列(例如ItemGroup[]
),每个对象包含多个作业(例如Item[]
)和最大并行度值,例如:
public record Item(string Name);
public record ItemGroup(Item[] Items, int MaxDegreeOfParallelism);
必须按顺序处理ItemGroup
示例序列,但每个ItemGroup
的最大并行度可能大于1。例如,管道将按顺序处理A*
项组,然后并发处理B*
项组:
var groups = new[]
{
new ItemGroup(new[] { new Item("A0"), new Item("A1"), new Item("A2") }, 1),
new ItemGroup(new[] { new Item("B0"), new Item("B1"), new Item("B2") }, 3)
};
我认为从IPropagatorBlock<ItemGroup, Item>
派生的定制TransformManyBlock
实现是一个不错的选择,但是我不清楚如何正确地等待内部动态创建的TransformManyBlock
,因为生产者将ItemGroup
示例发送到它。
有人能带我去吗?
1条答案
按热度按时间uhry853o1#
您可以为接收到的每个
ItemGroup
创建一个内部TransformBlock<Item, Item>
。下面是一个具有TInput
、TChild
和TOutput
泛型参数的通用解决方案。TInput
对应于ItemGroup
,TChild
对应于Item
,并且TOutput
也是Item
,因为您传播了这些项而没有对其进行转换:用法示例:
**注意:**上述代码尚未经过测试。
**更新:**我最初的实现(revision 1)是基于.NET 6 API
ReceiveAllAsync
,以及接受Func<TInput,IAsyncEnumerable<TOutput>>
参数的TransformManyBlock
构造函数(.NET 7)。问题是ReceiveAllAsync
不传播枚举数据流块的异常,所以我改为手动收集和传播结果,方法是填充List<TOutput>
,如本答案所示。