今天,在我的spring boot和单示例kafka应用程序中,我面临以下问题:
org.apache.kafka.clients.consumer.commitfailedexception:无法完成提交,因为组已重新平衡分区并将其分配给其他成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。
这可能是什么原因?如何解决?据我所知-我的消费者被封锁了很长一段时间,没有回应心跳。我应该调整Kafka的属性来解决这个问题。你能告诉我我应该调整什么样的属性和位置吗,比如在Kafka一侧或在我的应用程序springKafka一侧?
1条答案
按热度按时间1cklez4t1#
默认情况下,Kafka将返回一批
fetch.min.bytes
(默认值1)最多max.poll.records
(默认值500),或fetch.max.bytes
(默认为52428800),否则将等待fetch.wait.max.ms
(默认值100)返回一批数据之前。您的消费者需要对该数据进行一些处理,然后致电poll()
再一次。您的消费者的工作预计将在max.poll.interval.ms
(v2.0之前的默认值为300000-5分钟,v2.0之后的默认值为30000-30秒)。如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给另一个成员。因此,要解决您的问题,请减少返回的邮件数,或增加
max.poll.interval.ms
以避免超时和重新平衡。