AbstractQueue<Object> queue = new ConcurrentLinkedQueue<>();
int size = queue.size();
for (int i = 0; i < size; i++) {
Object object = queue.poll();
if (object == null) {
// Collection has shronk break
break;
}
// Do processing here
}
AbstractQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("Test1");
queue.add("Test2");
queue.add("Test3");
Object[] objList = queue.toArray();
queue.remove("Test2");
for (Object obj : objList) {
// Make sure you delete it, because we don't use .poll
// Put it at top, to reproduce the poll as much as possible
queue.remove(obj);
// Do processing
}
3条答案
按热度按时间rmbxnbpk1#
如果你可以改变你的应用程序来使用
BlockingQueue
的实现,那么drainTo
方法看起来就像你想要的那样,它移除队列中的当前内容并将它们转移到目标集合。BlockingQueue
有多种实现;奇怪的是,没有指定drainTo
是原子的,尽管在我检查的实现(ArrayBlockingQueue
和LinkedBlockingQueue
)中是这样的。vhipe2zx2#
看起来你需要一些"暂停"的时刻。一种方法是:
通过将大小存储到一个局部变量中,大小将不会改变,并且您可以使用该数量的元素来处理。如果在处理过程中添加元素,它将不会受到无限循环的影响。
.iterator()
可能比我的第一个示例更好用:返回的迭代器是一个"弱一致性"迭代器,它永远不会抛出ConcurrentModificationException,并保证遍历迭代器构造时存在的元素,**可以(但不保证)反映构造后的任何修改。
这将输出:
由于复制到新的
objList[]
,它也将显示"Test2"。如果重复项添加到列表中,则此示例可能会导致.remove()
方法出现问题。因为如果添加的元素与.remove()
中的对象重复,则该对象将立即删除。还要注意,这种方法比较慢,因为
.remove()
需要遍历元素才能找到它,这是O(N1),其中.poll
是即时O(1)。xqk2d5yq3#
虽然我知道OP不想使用
poll()
方法,但如果异步任务向列表提供记录的速度比poll()方法消耗记录的速度快,我会感到惊讶。如果是这种情况,那么可能需要一个不同的高吞吐量集群架构(Kinesis Firehose或类似的)。我在装有Java11的MacBookProM1上运行了一个测试,发现下面显示的循环可以以每秒150万到200万条记录的速度消耗记录。
我建议采取以下简单的排水方法: