.net 如何批量使用BlockingCollection< T>

z4bn682m  于 2023-11-20  发布在  .NET
关注(0)|答案(3)|浏览(169)

我已经提出了一些代码来消耗队列中的所有等待项,而不是逐个处理这些项,将所有等待项作为一个集合来处理是有意义的。
我已经像这样声明了我的队列。

private BlockingCollection<Item> items = 
    new BlockingCollection<Item>(new ConcurrentQueue<Item>);

字符串
然后,在一个消费者线程上,我计划像这样批量读取这些项目,

Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
    var workToDo = new List<Item>();
    workToDo.Add(nextItem);

    while(this.items.TryTake(out nextItem))
    {
        workToDo.Add(nextItem);
    }

    // process workToDo, then go back to the queue.
}


这种方法缺乏GetConsumingEnumerable的实用性,我不禁想知道我是否错过了更好的方法,或者我的方法是否有缺陷。
有没有更好的方法来批量消耗BlockingCollection<T>

yhuiod9q

yhuiod9q1#

一个解决方案是使用System.Threading.Tasks.Dataflow中的BufferBlock<T>(包含在.net core 3+中)。它不使用GetConsumingEnumerable(),但它仍然允许您使用相同的实用程序,主要是:

  • 允许多个(对称和/或不对称)消费者和生产者并行处理
  • 线程安全(考虑到上述情况)-无需担心竞态条件
  • 可以通过取消令牌和/或收集完成来取消
  • 消费者阻塞,直到数据可用,避免在轮询上浪费CPU周期

还有一个BatchBlock<T>,但这限制了你固定大小的批次。

var buffer = new BufferBlock<Item>();
while (await buffer.OutputAvailableAsync())
{
    if (buffer.TryReceiveAll(out var items))
        //process items
}

字符串
下面是一个工作示例,演示了以下内容:

  • 并行处理可变长度批次的多个对称消费者
  • 多个对称的生产者(在这个例子中不是真正的并行操作)
  • 当生产者完成收集时,
  • 为了使示例简短,我没有演示CancellationToken的使用
  • 能够等待生产者和/或消费者完成
  • 能够从不允许JavaScript的区域(如构造函数)进行调用
  • Thread.Sleep()调用不是必需的,但有助于模拟在更繁重的场景中可能发生的一些处理时间
  • Task.WaitAll()Thread.Sleep()都可以选择性地转换为它们的等价物
  • 无需使用任何外部库
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
{
    static void Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        List<Task> consumers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            consumers.Add(Task.Factory.StartNew(async () =>
            {
                // need to copy this due to lambda variable capture
                var num = i; 
                while (await buffer.OutputAvailableAsync())
                {
                    if (buffer.TryReceiveAll(out var items))
                        Console.WriteLine($"Consumer {num}:    " + 
                            items.Aggregate((a, b) => a + ", " + b));

                        // real life processing would take some time
                        await Task.Delay(500); 
                }

                Console.WriteLine($"Consumer {num} complete");
            }));

            // give consumer tasks time to activate for a better demo
            Thread.Sleep(100); 
        }

        // Kick off producer task(s)
        List<Task> producers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            producers.Add(Task.Factory.StartNew(() =>
            {
                for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
                    buffer.Post(j.ToString());
            }));

            // space out the producers for a better demo
            Thread.Sleep(10); 
        }

        // may also use the async equivalent
        Task.WaitAll(producers.ToArray());
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete(); 

        // may also use the async equivalent
        Task.WaitAll(consumers.ToArray()); 
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }
}


这里是一个现代化的和简化的版本的代码。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    private static async Task Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        var consumers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var id = i;
            consumers.Add(Task.Run(() => StartConsumer(id, buffer)));

            // give consumer tasks time to activate for a better demo
            await Task.Delay(100);
        }

        // Kick off producer task(s)
        var producers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var pid = i;
            producers.Add(Task.Run(() => StartProducer(pid, buffer)));

            // space out the producers for a better demo
            await Task.Delay(10);
        }

        // may also use the async equivalent
        await Task.WhenAll(producers);
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete();

        // may also use the async equivalent
        await Task.WhenAll(consumers);
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }

    private static async Task StartConsumer(
            int id,
            IReceivableSourceBlock<string> buffer)
    {
        while (await buffer.OutputAvailableAsync())
        {
            if (buffer.TryReceiveAll(out var items))
            {
                Console.WriteLine($"Consumer {id}: " + 
                    items.Aggregate((a, b) => a + ", " + b));
            }

            // real life processing would take some time
            await Task.Delay(500);
        }

        Console.WriteLine($"Consumer {id} complete");
    }

    private static Task StartProducer(int pid, ITargetBlock<string> buffer)
    {
        for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
        {
            buffer.Post(j.ToString());
        }

        return Task.CompletedTask;
    }
}

tyu7yeag

tyu7yeag2#

虽然在某些方面不如ConcurrentQueue<T>好,但我自己的LLQueue<T>允许使用AtomicDequeueAll方法进行批处理出队,在该方法中,(原子和线程安全)操作,然后在一个非线程安全的集合中由单个线程使用。
虽然这不是阻塞,但它可以很容易地用于创建一个阻塞集合:

public BlockingBatchedQueue<T>
{
  private readonly AutoResetEvent _are = new AutoResetEvent(false);
  private readonly LLQueue<T> _store;
  public void Add(T item)
  {
    _store.Enqueue(item);
    _are.Set();
  }
  public IEnumerable<T> Take()
  {
    _are.WaitOne();
    return _store.AtomicDequeueAll();
  }
  public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
  {
    if(_are.WaitOne(millisecTimeout))
    {
      items = _store.AtomicDequeueAll();
      return true;
    }
    items = null;
    return false;
  }
}

字符串
这是一个不做以下事情的起点:
1.处理一个待处理的等待读卡器。
1.担心多个读取器之间的潜在竞争,这两个读取器都是由一个阅读时发生的写入触发的(它只是认为偶尔的空结果是可以接受的)。
1.将任何上限放在写作上。
所有这些都可以添加,但我想保持最低限度的一些实际用途,希望在上面定义的限制内没有bug。

c86crjj0

c86crjj03#

不,没有更好的办法了,你的方法基本上是正确的。
为了方便使用,可以将“consume-in-batches”功能 Package 在扩展方法中。下面的实现在整个枚举过程中使用相同的List<T>作为缓冲区,目的是防止在每次迭代时分配新的缓冲区。它还包括一个可选的maxSize参数,用于限制发出的批处理的大小:

/// <summary>
/// Consumes the items in the collection in batches. Each batch contains all
/// the items that are immediately available, up to a specified maximum number.
/// </summary>
public static IEnumerable<T[]> GetConsumingEnumerableBatch<T>(
    this BlockingCollection<T> source, int maxSize = -1,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    if (maxSize == -1) maxSize = Array.MaxLength;
    if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
    if (source.IsCompleted) yield break;
    List<T> buffer = new();
    while (source.TryTake(out T item, Timeout.Infinite, cancellationToken))
    {
        Debug.Assert(buffer.Count == 0);
        buffer.Add(item);
        while (buffer.Count < maxSize && source.TryTake(out item))
            buffer.Add(item);
        T[] batch = buffer.ToArray();
        int batchSize = batch.Length;
        buffer.Clear();
        yield return batch;
        if (batchSize < buffer.Capacity / 4)
            buffer.Capacity = buffer.Capacity / 2; // Shrink oversized buffer
    }
}

字符串
使用示例:

foreach (Item[] batch in this.items.GetConsumingEnumerableBatch())
{
    // Process the batch
}


每当发出的批小于缓冲区容量的四分之一时,缓冲区就会缩小一半。这将使缓冲区处于控制之中,以防它在枚举期间的某个时候变得过大。
if (source.IsCompleted) yield break行的目的是复制内置GetConsumingEnumerable方法的行为,当它提供了一个已经取消的令牌,并且集合为空并完成时。
在取消的情况下,没有缓冲的消息有丢失的危险。只有当buffer为空时,cancellationToken才会被检查。
在这个答案的first revision中可以找到一个没有内存管理功能的更简单的实现。

相关问题