我用的是apache kafkaconsumer。我想检查消费者是否有任何未经轮询就返回的消息。如果我轮询消费者,但没有任何消息,那么我会在无限循环中得到消息“由于组正在重新平衡,尝试心跳失败”,直到超时过期,即使我有 records.isEmpty()
条款。这是我的代码片段:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (records.isEmpty()) {
log.info("No More Records");
consumer.close();
}
else {
records.iterator().forEachRemaining(record -> log.info("RECORD: " + record);
);
这很管用,直到 records
都是空的。一旦为空,它会多次记录“由于组正在重新平衡,尝试检测信号失败”,记录“不再记录”一次,然后继续记录检测信号错误。我能做些什么来解决这个问题,如何优雅地检查(没有任何心跳消息)没有更多的记录可以轮询?
编辑:我问了另一个问题,完整的代码和上下文在这个链接上:如何在java中从kafka消费者那里逐个获取消息?
提前谢谢!
1条答案
按热度按时间kh212irz1#
评论外:“由于我有一个用户界面,并希望通过单击“接收”按钮接收一条一条的消息,可能有一种情况,当没有更多的消息进行轮询。”
在这种情况下,您需要创建一个新的
KafkaConsumer
每次有人点击“接收”按钮,然后关闭它。如果要在客户机的生命周期中使用相同的kafkaconsumer,则需要让代理知道它仍然处于活动状态(通过发送心跳,这是通过调用poll方法隐式完成的)。否则,正如你已经经历过的那样,经纪人认为你的Kafka消费者已经死了,会开始重新平衡。由于没有其他活跃的消费者,这种再平衡不会停止。