.net 一个很好的方法来并行处理“流”的元素,同时保持输出的顺序

zfciruhq  于 2023-03-24  发布在  .NET
关注(0)|答案(2)|浏览(143)

我有一个从Kafka接收XML事件流的应用程序。这些事件必须被反序列化/解析或转换,然后才能按顺序传递给一些业务逻辑。(这个逻辑然后在输出端发出其他事件)。
解析/转换代码是无状态的,而域代码是有状态的,必须按顺序接收事件。这两个步骤通过使用System.Threading通道解耦,以便解析步骤获得完整的'thread'/'cpu'(异步任务)。
我的挑战是解析是CPU密集型的,并且它在一个核心上占用100%的CPU,从而成为服务吞吐量的瓶颈。我尝试使用多线程/并行处理,这在一定程度上提高了吞吐量。然而,我的方法似乎并不优雅,并且可能有很多开销。
在解析步骤中,我使用Task.Run()为每个“item”生成一个Task,然后将Task添加到输出队列中,确保Task是根据输入顺序添加的。然后消费者从Channel中一次提取一个任务,并等待它完成并获得结果,然后继续。
这意味着我正在创建和提交大量的任务,并且通常看起来我在热路径中使用了很多线程协调操作。
希望这里有人能有一个好的方法来处理项目的顺序,同时尊重输出的顺序。

lf5gs5x2

lf5gs5x21#

所以你有一个Channel<Task<T>>作为传送带,生产者用channel.Writer.TryWrite(Task.Run(() => Parse(item)))添加任务,消费者读取任务并一个接一个地等待它们:

await foreach (Task<T> task in channel.Reader.ReadAllAsync())
{
    T result = await task;
    // Do something with the result
}

这是一个非常好的设置。缺点是您无法控制并行度。因此在某些时候,您可能会有太多的Task.Run操作并行运行,导致ThreadPool饥饿,这可能会对应用程序的其他部分产生负面影响。您可以通过使用更高级的Task.Factory.StartNew而不是Task.Run来调度工作来解决此问题。并使用共享ConcurrentExclusiveSchedulerPair示例的ConcurrentScheduler属性配置scheduler参数。
另一种方法是用TPL Dataflow库中的TransformBlock<TInput,TOutput>替换通道。该组件结合了输入缓冲区,输出缓冲区和将TInput转换为TOutput的处理器。它配备了开箱即用的并行功能和顺序保留。下面是一个示例:

TransformBlock<Item, Result> block = new(item =>
{
    return Parse(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2, // Configurable, the default is 1
    EnsureOrdered = true, // This is the default
});

生产者使用block.Post(item)提供块,消费者使用ReceiveAllAsync方法枚举块的输出缓冲区:

await foreach (var result in block.ReceiveAllAsync())
{
    // Do something with the result
}
await block.Completion;

最后的await block.Completion;是需要的,因为ReceiveAllAsync方法当前有a bug,并且不会将可能的异常作为枚举的一部分传播。
我的期望是TransformBlock方法应该比您当前的设置具有更少的开销,并且消耗更少的内存。Microsoft宣称TPL Dataflow library适合“* 粗粒度数据流和流水线任务”*。这意味着您的Parse方法应该是矮胖的。如果它是轻量级的,比如解析单个数字,并行化的好处很可能会被同步开销抵消。在这种情况下,解决方案可能是使用BatchBlock<T>将工作分块。
TPL Dataflow库并不完全是尖端技术。它早于ValueTask s,因此它没有利用它们。它还带有一些怪癖,比如吞下OperationCanceledException s,这可能是由transform委托抛出的。它也很难扩展。虽然它应该比你已经拥有的更好,但它不是绝对最佳的解决方案,但也许能满足你的需求

but5z9lq

but5z9lq2#

我理解的要求是:

  • 从表示必须转换的XML事件的数据流中读取。
  • 将每个XML事件交给多个任务中的一个来执行转换。
  • 您希望按原始顺序使用转换后的流

这个Resequencer类就是为了实现这个目的而设计的,它在序列号(或其他排序标准)从生产者(在本例中是XML事件的来源)进入时维护一个队列,在字典中缓冲来自消费者(XML转换器)的结果,并提供一个按顺序读取结果的方法。缓冲区中元素的数量不会超过您使用的消费者任务的数量。
这是一个简单的演示,可以在许多方面进行增强,包括更强大的错误处理,它可以实现IEnumerable<TData>
该实现立即按顺序检查字典中的下一个键,如果失败,则在每次一个使用者完成时再检查一次。

/// <summary>
/// This class assumes input is registered by a single producer processing data in
/// the desired order. It supports multiple consumers performing the processing
/// </summary>
public class Resequencer<TKey, TData> where TKey : notnull where TData : class
{
    bool producerDone = false;
    
    readonly ManualResetEventSlim _resetEvent = new ManualResetEventSlim();
    
    readonly ConcurrentQueue<TKey> _identifiersInOrder = new ConcurrentQueue<TKey>();
    readonly ConcurrentDictionary<TKey, TData> _processedData = 
        new ConcurrentDictionary<TKey, TData>();

    public void RegisterNextInputKey(TKey key) => 
        _identifiersInOrder.Enqueue(key);
    
    public void ProducerIsDone()
    {
        producerDone = true;
    }
    public void RegisterConsumerOutput(TKey key, TData data)
    {
        _processedData.TryAdd(key, data);
        _resetEvent.Set();
        
    }
    public TData? GetNext()
    {
        do
        {                       
            if (_identifiersInOrder.TryDequeue(out var nextUp))
            {
                while (true)
                {
                    if (_processedData.TryGetValue(nextUp, out var data))
                    {
                        return data;
                    }

                    _resetEvent.Wait();
                }
            }

        }
        while (!producerDone || _identifiersInOrder.Any());
        
        return null;
    }
}

下面的演示基于this article中的Channel演示代码。

static readonly Resequencer<int, Output> _resequencer = new Resequencer<int, Output>();

async Task Main()
{
    var channel = Channel.CreateUnbounded<Input>();

    // In this example, multiple consumers are needed to keep up with a fast producer

    Task outputTask = Task.Run(() => OutputOrderedResult());

    var producer1 = new Producer(channel.Writer, 1);
    var consumer1 = new Consumer(channel.Reader, 1);
    var consumer2 = new Consumer(channel.Reader, 2);
    var consumer3 = new Consumer(channel.Reader, 3);

    Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
    Task consumerTask2 = consumer2.ConsumeData(); // begin consuming
    Task consumerTask3 = consumer3.ConsumeData(); // begin consuming

    Task producerTask1 = producer1.BeginProducing();

    await producerTask1.ContinueWith(_ => channel.Writer.Complete());
    

    await Task.WhenAll(outputTask, consumerTask1, consumerTask2, consumerTask3);
    
    Console.WriteLine($"Max queue size: {_resequencer.maxQueueSize}");
}

void OutputOrderedResult()
{
    while (true)
    {
        var next = _resequencer.GetNext();
        if (next == null)
            break;
        Console.WriteLine($"SequenceNr: {next.SequenceNr} Data: {next.TransformedData}");
    }
}

internal class Input
{
    public int SequenceNr { get; set; }
    public string OriginalData { get; set; } = string.Empty;
}

internal class Output
{
    public int SequenceNr { get; set; }
    public string TransformedData { get; set; } = string.Empty;
}

// These classes based on https://github.com/stevejgordon/ChannelSample
// See also https://www.stevejgordon.co.uk/an-introduction-to-system-threading-channels
// Modifications relevant to this post commented in ALL CAPS
internal class Producer
{
    private readonly ChannelWriter<Input> _writer;
    private readonly int _identifier;

    public Producer(ChannelWriter<Input> writer, int identifier)
    {
        _writer = writer;
        _identifier = identifier;
    }

    public async Task BeginProducing()
    {
        for (var i = 0; i < 1000; i++)
        {
            await Task.Delay(50); // simulate producer building/fetching some data

            var input = new Input()
            {
                SequenceNr = i,
                OriginalData = $"Item {i}"
            };

            _resequencer.RegisterNextInputKey(input.SequenceNr);
            await _writer.WriteAsync(input);
        }

        _resequencer.ProducerIsDone();
    }
}

internal class Consumer
{
    private readonly ChannelReader<Input> _reader;
    private readonly int _identifier;
    private readonly Random rnd = new Random();

    public Consumer(ChannelReader<Input> reader, int identifier)
    {
        _reader = reader;
        _identifier = identifier;
    }

    public async Task ConsumeData()
    {
        while (await _reader.WaitToReadAsync())
        {
            if (_reader.TryRead(out var input))
            {
                await Task.Delay(rnd.Next(50, 200)); // simulate processing time
                var output = new Output() { SequenceNr = input.SequenceNr, TransformedData = new string(input.OriginalData.Reverse().ToArray()) };
                _resequencer.RegisterConsumerOutput(output.SequenceNr, output);
            }
        }
    }
}

相关问题