kafka抛出“org.apache.kafka.clients.consumer.commitfailedexception”

yxyvkwin  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(727)

我已经使用 spring-kafka 库,并使用默认使用者配置和手动提交。
我正在运行两个应用程序示例,听两个不同的Kafka主题。在执行负载测试时,我注意到只有一个应用程序在更高负载时出现以下错误:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 

    This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
    which typically implies that the poll loop is spending too much time message processing. 

    You can address this either by increasing the session timeout or by reducing the maximum size of batches 
    returned in poll() with max.poll.records.

org.apache.kafka.clients.consumer.CommitFailedException: 
    Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 

    This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
    which typically implies that the poll loop is spending too much time message processing. 

You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    \n org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

我读了几篇文章,发现如果消费者花了太多时间处理消息,而代理没有获得关于消费者活跃度的信息,那么消费者重新平衡就会发生,对于未受限制的消息,就会抛出上述异常。
我已通过将max.poll.interval.ms设置为ineger.max\u值来解决上述错误。但我想知道为什么我只在一个示例中得到上述错误,为什么另一个示例在更高的负载下按预期工作。
任何人都可以分享正确的根本原因和理想的价值观吗 max.poll.interval.ms 或适当的解决方案

gudnpqoy

gudnpqoy1#

在您喜欢的属性中设置max.poll.interval.ms minimum 5000000

consumer.set("max.poll.interval.ms","5000000");
consumer.set("max.poll.records","2");
consumer.set("session.time.out.ms","30000"); 
consumer.set("heartbeat.interval.ms","25000");

希望:-它能帮助你。。。。

fhg3lkii

fhg3lkii2#

除了yoav的建议之外,如果不能减少批量大小,您还可以尝试增加 max.poll.interval.ms . Kafka文献:
使用使用者群组管理时,调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了一个上限。如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给另一个成员。

fiei3ece

fiei3ece3#

其中一个原因可能是 poll() 需要大量的信息,这就是为什么处理所有这些信息需要花费大量时间的原因。
max.poll.records定义在对的单个调用中返回的最大记录数 poll() .
根据Kafka文档,它的默认值是500。
你可以试着把它设置成更小的值,看看这是否能解决你的问题。

相关问题