kafka recover from commit失败异常

ki1q1bka  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(569)

我有一个问题,我的提交失败,因为poll()太长(我不知道为什么会发生这种情况,没有消息,它只是在一个空队列上读取/提交,我的轮询间隔设置为小时)。然后,当它再次点击read()时,由于某种原因它不会重新平衡。但是,这只在我的代码在bluemix上运行时发生,在本地,当我复制异常时,next read()会导致重新平衡。
从失败的异常中恢复的正确方法是什么?我应该关闭()并重新创建我的消费者吗?还是调用read()应该重新平衡并让我继续?

t1qtbnec

t1qtbnec1#

@所以我相信使用默认的kafkajava客户机,消费者会每3秒心跳一次,会话超时时间是10秒,所以你的消费者应该留在组中,而不会被取出,并且会发生重新平衡。你的邮件里有什么信息 CommitFailedException ? 我想是因为你被踢出去了,所以你的承诺失败了。
其他一些问题:
您是否有多个消费者来来去去,和/或您是否有意使用消费者群体而不是单一消费者?
你说的“我的投票间隔设置为小时”是什么意思?
“在空队列上提交”是什么意思?
你能分享一段你的消费者循环代码吗,因为这可能有助于更好地解释你在做什么

ssm49v7z

ssm49v7z2#

这个 commitSync 方法将自动无限期重试,因此如果 CommitFailedException 那么它就不是一个可重试的条件,再次调用commit也不太可能有帮助。你得到这个例外,因为你的消费者已经被踢出了消费群体。
如果你用的是 commitAsync 若要提交偏移量,则重试不是自动的,您可能会收到 RetriableCommitFailedException 指示一个潜在的暂时性错误,您可以再次手动重试该错误。听起来这不是你的情况,但我包括它的完整性这个答案。
一旦你的消费者被赶出了小组,你就会得到这个 CommitFailedException 例外:您可以继续调用poll(),直到重新平衡完成,并且您被允许返回到使用者组(可能有一组比以前更新的分区),然后它将继续。
如果您的应用程序不能容忍您在中间流中接收到的分区(因此也包括密钥)发生更改的情况,那么您应该实现一个重新平衡侦听器,它将在分区分配发生更改时被调用。看到了吗http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/consumerrebalancelistener.html
如果您只是想解决偏移量每24小时过期一次的问题,那么除了定期调用poll()以保持在使用者组中之外,您还需要每天至少调用一次commit以保持偏移量的最新状态

相关问题