我有一个从Kafka接收XML事件流的应用程序。这些事件必须被反序列化/解析或转换,然后才能按顺序传递给一些业务逻辑。(这个逻辑然后在输出端发出其他事件)。
解析/转换代码是无状态的,而域代码是有状态的,必须按顺序接收事件。这两个步骤通过使用System.Threading通道解耦,以便解析步骤获得完整的'thread'/'cpu'(异步任务)。
我的挑战是解析是CPU密集型的,并且它在一个核心上占用100%的CPU,从而成为服务吞吐量的瓶颈。我尝试使用多线程/并行处理,这在一定程度上提高了吞吐量。然而,我的方法似乎并不优雅,并且可能有很多开销。
在解析步骤中,我使用Task.Run()为每个“item”生成一个Task,然后将Task添加到输出队列中,确保Task是根据输入顺序添加的。然后消费者从Channel中一次提取一个任务,并等待它完成并获得结果,然后继续。
这意味着我正在创建和提交大量的任务,并且通常看起来我在热路径中使用了很多线程协调操作。
希望这里有人能有一个好的方法来处理项目的顺序,同时尊重输出的顺序。
2条答案
按热度按时间lf5gs5x21#
所以你有一个
Channel<Task<T>>
作为传送带,生产者用channel.Writer.TryWrite(Task.Run(() => Parse(item)))
添加任务,消费者读取任务并一个接一个地等待它们:这是一个非常好的设置。缺点是您无法控制并行度。因此在某些时候,您可能会有太多的
Task.Run
操作并行运行,导致ThreadPool
饥饿,这可能会对应用程序的其他部分产生负面影响。您可以通过使用更高级的Task.Factory.StartNew
而不是Task.Run
来调度工作来解决此问题。并使用共享ConcurrentExclusiveSchedulerPair
示例的ConcurrentScheduler
属性配置scheduler
参数。另一种方法是用TPL Dataflow库中的
TransformBlock<TInput,TOutput>
替换通道。该组件结合了输入缓冲区,输出缓冲区和将TInput
转换为TOutput
的处理器。它配备了开箱即用的并行功能和顺序保留。下面是一个示例:生产者使用
block.Post(item)
提供块,消费者使用ReceiveAllAsync
方法枚举块的输出缓冲区:最后的
await block.Completion;
是需要的,因为ReceiveAllAsync
方法当前有a bug,并且不会将可能的异常作为枚举的一部分传播。我的期望是
TransformBlock
方法应该比您当前的设置具有更少的开销,并且消耗更少的内存。Microsoft宣称TPL Dataflow library适合“* 粗粒度数据流和流水线任务”*。这意味着您的Parse
方法应该是矮胖的。如果它是轻量级的,比如解析单个数字,并行化的好处很可能会被同步开销抵消。在这种情况下,解决方案可能是使用BatchBlock<T>
将工作分块。TPL Dataflow库并不完全是尖端技术。它早于
ValueTask
s,因此它没有利用它们。它还带有一些怪癖,比如吞下OperationCanceledException
s,这可能是由transform
委托抛出的。它也很难扩展。虽然它应该比你已经拥有的更好,但它不是绝对最佳的解决方案,但也许能满足你的需求but5z9lq2#
我理解的要求是:
这个
Resequencer
类就是为了实现这个目的而设计的,它在序列号(或其他排序标准)从生产者(在本例中是XML事件的来源)进入时维护一个队列,在字典中缓冲来自消费者(XML转换器)的结果,并提供一个按顺序读取结果的方法。缓冲区中元素的数量不会超过您使用的消费者任务的数量。这是一个简单的演示,可以在许多方面进行增强,包括更强大的错误处理,它可以实现
IEnumerable<TData>
。该实现立即按顺序检查字典中的下一个键,如果失败,则在每次一个使用者完成时再检查一次。
下面的演示基于this article中的Channel演示代码。