我使用的是kafka 0.10.1.1,并与以下3个属性混淆。
heartbeat.interval.ms
session.timeout.ms
max.poll.interval.ms
heartbeat.interval.ms-这是在0.10.1中添加的,它将在轮询之间发送心跳。session.timeout.ms-如果没有对kafka的请求,并且每次轮询都会重置它,则开始重新平衡。max.poll.interval.ms-这是整个轮询。
但是,Kafka什么时候开始重新平衡?为什么我们需要这3个?它们的默认值是什么?
谢谢
3条答案
按热度按时间n3schb8v1#
session.timeout.ms与heartbeat.interval.ms密切相关。
heartbeat.interval.ms控制kafkanconsumer poll()方法将心跳发送到组协调器的频率,而session.timeout.ms控制消费者不发送心跳的时间。
因此,这两个属性通常一起修改。heatbeat.interval.ms必须低于session.timeout.ms,并且通常设置为超时值的三分之一。因此,如果session.timeout.ms是3秒,heartbeat.interval.ms应该是1秒。
max.poll.interval.ms—使用使用者组管理时调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了一个上限。如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给另一个成员
vbkedwbf2#
更清楚的是,心跳线程(以及调用
Poll
同一进程中的函数)将每“heartbeat.interval.ms”一次向协调器发送一次heartbeat,如果超过“session.timeout.ms”或“max.poll.interval.ms”,协调器会将用户线程中的使用者标记为dead。ntjbwcob3#
假设我们讨论的是kafka 0.10.1.0或更高版本,其中每个使用者示例使用两个线程来运行。一个是用户线程
poll
被称为;另一个是心跳线程,专门处理心跳的事情。session.timeout.ms
是心跳线程。如果协调器未能在此时间间隔结束之前从使用者获取任何心跳信号,它会将使用者标记为失败,并触发新一轮的重新平衡。max.poll.interval.ms
用于用户线程。如果消息处理逻辑过于繁重,以至于成本不会超过这个时间间隔,那么协调器会显式地让使用者离开组,并触发新一轮的重新平衡。heartbeat.interval.ms
习惯于让其他健康的消费者更快地意识到重新平衡。如果协调器触发了一个重新平衡,那么其他消费者只有通过接收带有REBALANCE_IN_PROGRESS
封装了异常。心跳请求发送得越快,使用者就越快知道需要重新加入组。建议值:
session.timeout.ms
:相对较低的值,例如10秒。max.poll.interval.ms
:根据您的加工要求heartbeat.interval.ms
:相对较低的值,比session.timeout.ms