Java -并发链接队列-全部轮询

kcugc4gi  于 2022-12-28  发布在  Java
关注(0)|答案(3)|浏览(168)

假设我有一个ConcurrentLinkedQueue类型的类字段,这个类的一些方法正在向这个队列提供新元素,还有一些方法需要轮询队列中的所有元素。
我不能在循环中使用poll(),因为当循环还未完成时,可能会有一些元素被提供给这个队列。如果新元素提供的速度比我轮询它们的速度快,我想它甚至可能是一个无限循环。所以我需要某种pollAll()
有没有办法可以做到这一点?也许有适合这一点的收藏?

rmbxnbpk

rmbxnbpk1#

如果你可以改变你的应用程序来使用BlockingQueue的实现,那么drainTo方法看起来就像你想要的那样,它移除队列中的当前内容并将它们转移到目标集合。
BlockingQueue有多种实现;奇怪的是,没有指定drainTo是原子的,尽管在我检查的实现(ArrayBlockingQueueLinkedBlockingQueue)中是这样的。

vhipe2zx

vhipe2zx2#

看起来你需要一些"暂停"的时刻。一种方法是:

AbstractQueue<Object> queue = new ConcurrentLinkedQueue<>();
    int size = queue.size();

    for (int i = 0; i < size; i++) {
        Object object = queue.poll();
        if (object == null) {
            // Collection has shronk break
            break;
        }
        // Do processing here
    }

通过将大小存储到一个局部变量中,大小将不会改变,并且您可以使用该数量的元素来处理。如果在处理过程中添加元素,它将不会受到无限循环的影响。

    • UPDATE**:.iterator()可能比我的第一个示例更好用:

返回的迭代器是一个"弱一致性"迭代器,它永远不会抛出ConcurrentModificationException,并保证遍历迭代器构造时存在的元素,**可以(但不保证)反映构造后的任何修改。

    • 更新2**:这是一个方法,它将把所有的元素也删除一次,并处理它。
AbstractQueue<Object> queue = new ConcurrentLinkedQueue<>();
        queue.add("Test1");
        queue.add("Test2");
        queue.add("Test3");

        Object[] objList = queue.toArray();

        queue.remove("Test2");

        for (Object obj : objList) {
            // Make sure you delete it, because we don't use .poll
            // Put it at top, to reproduce the poll as much as possible
            queue.remove(obj);

            // Do processing
        }

这将输出:

Test1
Test2
Test3

由于复制到新的objList[],它也将显示"Test2"。如果重复项添加到列表中,则此示例可能会导致.remove()方法出现问题。因为如果添加的元素与.remove()中的对象重复,则该对象将立即删除。
还要注意,这种方法比较慢,因为.remove()需要遍历元素才能找到它,这是O(N1),其中.poll是即时O(1)。

xqk2d5yq

xqk2d5yq3#

虽然我知道OP不想使用poll()方法,但如果异步任务向列表提供记录的速度比poll()方法消耗记录的速度快,我会感到惊讶。如果是这种情况,那么可能需要一个不同的高吞吐量集群架构(Kinesis Firehose或类似的)。
我在装有Java11的MacBookProM1上运行了一个测试,发现下面显示的循环可以以每秒150万到200万条记录的速度消耗记录。
我建议采取以下简单的排水方法:

public List<MyRecord> getRecords(){
        List<MyRecord> result = new ArrayList<>();
        while (!records.isEmpty())
            result.add(records.poll());
        return result;
    }

相关问题