检查kafka消费者是否没有任何要返回的记录并且在java中为空的好方法?

5n0oy7gb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(306)

我用的是apache kafkaconsumer。我想检查消费者是否有任何未经轮询就返回的消息。如果我轮询消费者,但没有任何消息,那么我会在无限循环中得到消息“由于组正在重新平衡,尝试心跳失败”,直到超时过期,即使我有 records.isEmpty() 条款。这是我的代码片段:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (records.isEmpty()) {
      log.info("No More Records");
      consumer.close();
    }
else {
      records.iterator().forEachRemaining(record -> log.info("RECORD: " + record);
);

这很管用,直到 records 都是空的。一旦为空,它会多次记录“由于组正在重新平衡,尝试检测信号失败”,记录“不再记录”一次,然后继续记录检测信号错误。我能做些什么来解决这个问题,如何优雅地检查(没有任何心跳消息)没有更多的记录可以轮询?
编辑:我问了另一个问题,完整的代码和上下文在这个链接上:如何在java中从kafka消费者那里逐个获取消息?
提前谢谢!

kh212irz

kh212irz1#

评论外:“由于我有一个用户界面,并希望通过单击“接收”按钮接收一条一条的消息,可能有一种情况,当没有更多的消息进行轮询。”
在这种情况下,您需要创建一个新的 KafkaConsumer 每次有人点击“接收”按钮,然后关闭它。
如果要在客户机的生命周期中使用相同的kafkaconsumer,则需要让代理知道它仍然处于活动状态(通过发送心跳,这是通过调用poll方法隐式完成的)。否则,正如你已经经历过的那样,经纪人认为你的Kafka消费者已经死了,会开始重新平衡。由于没有其他活跃的消费者,这种再平衡不会停止。

相关问题