我正在读《Kafka:权威指南》,希望能更好地理解《重新平衡的听众》。书中的例子使用了一个简单的 HashMap
保持已处理的当前偏移量,并在撤销分区时提交当前状态。我担心的是:
关于代码示例,我有两个问题:
使用的语言使我假设这些回调是在不同的线程上进行的。所以,应用当前偏移量时不应该考虑线程安全吗?另外,提交后是否应该取消当前批处理?
它说要使用commitsync来确保在再平衡开始之前补偿已经被提交。但是,这只是在该使用者内同步的。是否存在某种机制,协调器在收到所有订阅用户的回复之前不会继续?
1条答案
按热度按时间tsm1rwdh1#
我重新阅读了书中的部分,我同意我也有点困惑!
javadoc声明:
每当分区分配发生变化时,这个回调只会作为poll(long)调用的一部分在用户线程中执行。
我查看了代码,并确认rebalance listener方法确实是在拥有使用者的同一线程中调用的。
是的,你应该用
commitSync()
在重新平衡侦听器中提交时。为了解释原因,让我们看一下黄金路径示例。我们从一个快乐消费的消费者开始,并定期向协调员报告。在某个时刻,协调器返回一个
REBALANCE_IN_PROGRESS
心跳信号请求出错。这可能是由想要加入组的新成员、离开或未能检测到心跳信号的成员、或从订阅中添加/删除新分区引起的。此时,所有的消费者都需要重新加入这个群体。在尝试重新加入组之前,使用者将同步执行
ConsumerRebalanceListener.onPartitionsRevoked()
. 侦听器返回后,使用者将向协调器发送joinrequest以重新加入组。也就是说,我想这就是你所想的,如果你的回电时间太长(>
session.timeout.ms
)若要提交,该组可能已经处于另一个生成中,并且带有offset的分区试图提交给另一个成员。在这种情况下,提交将失败,即使它是同步的。但是通过使用commitSync()
在侦听器中,可以保证使用者在完成提交之前不会重新加入组。