- 背景:**
通过阅读这么多的源代码,我了解到BlockingCollection<T>
的设计是为了摆脱检查线程之间的共享集合中是否有新数据可用的要求。如果有新数据插入到共享集合中,那么您的消费者线程将立即唤醒。因此,您不必在特定的时间间隔内检查消费者线程是否有新数据可用,通常在while
循环中。
我也有类似的要求:
- 我有一个大小为1的阻塞集合。
- 此集合将从3个位置(3个生产商)填充。
- 当前使用
while
循环检查集合是否有内容。 - 想要在阻塞集合得到一个值时立即执行
ProcessInbox()
方法并清空该集合,而不检查在特定时间间隔内(通常在while
循环中)是否有新数据可供消费者线程使用。我们如何实现?
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
private static BlockingCollection<int> _processingNotificationQueue = new(1);
private static void GetDataFromQueue(CancellationToken cancellationToken)
{
Console.WriteLine("GDFQ called");
int data;
//while (!cancellationToken.IsCancellationRequested)
while(!_processingNotificationQueue.IsCompleted)
{
try
{
if(_processingNotificationQueue.TryTake(out data))
{
Console.WriteLine("Take");
ProcessInbox();
}
}
catch (Exception ex)
{
}
}
}
private static void ProcessInbox()
{
Console.WriteLine("PI called");
}
private static void PostDataToQueue(object state)
{
Console.WriteLine("PDTQ called");
_processingNotificationQueue.TryAdd(1);
}
private void MessageInsertedToTabale()
{
PostDataToQueue(new CancellationToken());
}
private void FewMessagesareNotProcessed()
{
PostDataToQueue(new CancellationToken());
}
static void Main(string[] args)
{
Console.WriteLine("Start");
new Timer(PostDataToQueue, new CancellationToken(), TimeSpan.Zero,
TimeSpan.FromMilliseconds(100));
// new Thread(()=> PostDataToQueue()).Start();
new Thread(() => GetDataFromQueue(new CancellationToken())).Start();
Console.WriteLine("End");
Console.ReadKey();
}
}
}
1条答案
按热度按时间dzjeubhm1#
只要在上面加上
foreach
就行了。它阻塞了。只要它没有标记为completed,foreach就会在集合为空时挂起,并在添加新项时立即唤醒。请看https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1?view=net-6.0中的第一个
ConsumingEnumerableDemo
,想象消费者foreach (var item in bc.GetConsumingEnumerable())
在另一个线程中。那里的生产者在新项目之间有一个延迟,所以你应该能够很容易地修改它,看看消费者是如何“相对立即”醒来的。只有一个生产者,但我不认为多个生产者有问题,Add
是线程安全的。我不能保证在添加新项目和唤醒一个熟睡的消费者之间没有明显的延迟,因为“significant”是一个完全依赖大小写的词,可能会有一些延迟,至少在切换线程时是这样。但我怀疑该集合是否会执行任何额外的节流。我猜想它发出信号,要求在
Add
返回到生产者之前唤醒休眠的使用者。我想对于Inbox
处理的目的来说,这可能是人类的UI问题,可能100 ms的延迟不会引起注意,我也不希望添加/唤醒延迟远低于100 ms。如果“blocking foreach”部分听起来很糟糕,你可能不得不切换到不同的同步机制()。这是一个
BlockingCollection
,对吗?它不是evented collection
或其他什么。TryXXX方法是为了你出于某种原因想要限制异常的情况而提供的,并且可以像这里一样自己处理调度更新()。()差不多了。你发布的这段代码缺少了两个重要的东西。当集合为空时,你的
while
循环以最大速度忙转,这通常是致命的禁忌,特别是对于电池运行的任何东西。考虑添加一些死区时间来拥有if(hasItems) doWork; else sleep(sometime);
。另一件事是try-catch。文档说,当集合抛出时,这意味着收集已经“完成”。2不再有项目。3在一个死的收集上循环是没有意义的。4 try-catch不应该在循环内部,但是应该包含循环,这样当收集完成时循环就停止了。()我个人喜欢RX扩展。在这里,它是一个简单的主题和一个观察者。另外,async/wait/IAsyncEnumerable很诱人,可以通过休眠来帮助防止同步,但如果不小心的话,它仍然会以一个繁忙的循环结束。还有更多的选择,问题是在BlockingCollection上,所以只是供参考。