我已经提出了一些代码来消耗队列中的所有等待项,而不是逐个处理这些项,将所有等待项作为一个集合来处理是有意义的。
我已经像这样声明了我的队列。
private BlockingCollection<Item> items =
new BlockingCollection<Item>(new ConcurrentQueue<Item>);
字符串
然后,在一个消费者线程上,我计划像这样批量读取这些项目,
Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
var workToDo = new List<Item>();
workToDo.Add(nextItem);
while(this.items.TryTake(out nextItem))
{
workToDo.Add(nextItem);
}
// process workToDo, then go back to the queue.
}
型
这种方法缺乏GetConsumingEnumerable
的实用性,我不禁想知道我是否错过了更好的方法,或者我的方法是否有缺陷。
有没有更好的方法来批量消耗BlockingCollection<T>
?
3条答案
按热度按时间yhuiod9q1#
一个解决方案是使用System.Threading.Tasks.Dataflow中的
BufferBlock<T>
(包含在.net core 3+中)。它不使用GetConsumingEnumerable()
,但它仍然允许您使用相同的实用程序,主要是:还有一个
BatchBlock<T>
,但这限制了你固定大小的批次。字符串
下面是一个工作示例,演示了以下内容:
CancellationToken
的使用Thread.Sleep()
调用不是必需的,但有助于模拟在更繁重的场景中可能发生的一些处理时间Task.WaitAll()
和Thread.Sleep()
都可以选择性地转换为它们的等价物型
这里是一个现代化的和简化的版本的代码。
型
tyu7yeag2#
虽然在某些方面不如
ConcurrentQueue<T>
好,但我自己的LLQueue<T>
允许使用AtomicDequeueAll方法进行批处理出队,在该方法中,(原子和线程安全)操作,然后在一个非线程安全的集合中由单个线程使用。虽然这不是阻塞,但它可以很容易地用于创建一个阻塞集合:
字符串
这是一个不做以下事情的起点:
1.处理一个待处理的等待读卡器。
1.担心多个读取器之间的潜在竞争,这两个读取器都是由一个阅读时发生的写入触发的(它只是认为偶尔的空结果是可以接受的)。
1.将任何上限放在写作上。
所有这些都可以添加,但我想保持最低限度的一些实际用途,希望在上面定义的限制内没有bug。
c86crjj03#
不,没有更好的办法了,你的方法基本上是正确的。
为了方便使用,可以将“consume-in-batches”功能 Package 在扩展方法中。下面的实现在整个枚举过程中使用相同的
List<T>
作为缓冲区,目的是防止在每次迭代时分配新的缓冲区。它还包括一个可选的maxSize
参数,用于限制发出的批处理的大小:字符串
使用示例:
型
每当发出的批小于缓冲区容量的四分之一时,缓冲区就会缩小一半。这将使缓冲区处于控制之中,以防它在枚举期间的某个时候变得过大。
if (source.IsCompleted) yield break
行的目的是复制内置GetConsumingEnumerable
方法的行为,当它提供了一个已经取消的令牌,并且集合为空并完成时。在取消的情况下,没有缓冲的消息有丢失的危险。只有当
buffer
为空时,cancellationToken
才会被检查。在这个答案的first revision中可以找到一个没有内存管理功能的更简单的实现。