我正在使用kafka consumer group management来处理我的消息。
我的消息处理时间各不相同。因此,我将max poll interval设置为20分钟,最大记录数为20。我使用的是5个分区和5个消费者示例,除了上述两个示例之外,还有默认的配置值。
但我仍然间歇性地得到以下错误:
[Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] Attempt to heartbeat failed since group is rebalancing
我们的理解是,除非在达到consumer config docs中所写的最大轮询间隔之前不调用poll,否则不会发生重新平衡。但对我来说,再平衡只发生在20分钟之前。
同样,在运行几个小时之后,所有分配的消费者只会说“由于组正在重新平衡,尝试心跳失败”,并且不再重新加入(理想情况下应该重新加入)。
我是不是漏了什么?任何线索都会有帮助。
1条答案
按热度按时间vltsax251#
重新平衡的另一个原因是到期
session.timeout.ms
不发送心跳。您可以考虑增加此使用者配置。来自Kafka文件:
heartbeat.interval.ms:当使用kafka的组管理设施时,消费者协调员的预期心跳间隔时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms,但通常不应高于该值的1/3。它可以调整得更低,以控制正常再平衡的预期时间(默认值:3000)
session.timeout.ms:使用kafka的组管理工具时用于检测客户端故障的超时。客户机向代理发送周期性的心跳,以指示其活动性。如果在此会话超时过期之前代理未收到任何心跳信号,则代理将从组中删除此客户端并启动重新平衡。请注意,该值必须在代理配置中按group.min.session.timeout.ms和group.max.session.timeout.ms配置的允许范围内(默认值:10000)
有关详细信息,请查看此链接。
即使heartbeat是通过单独的线程以固定的时间间隔发送的,在某些情况下heartbeat也不能发送到代理中
session.timeout.ms
. 造成这种情况的一些可能原因是:网络问题
停止消费者或经纪人方面的世界垃圾收集