kafka消费者自动提交是如何工作的?

xt0899hw  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(485)

我正在读这个:
自动提交提交偏移量最简单的方法是允许使用者为您执行。如果您配置enable.auto.commit=true,那么每五秒钟使用者将提交您的客户端从poll()收到的最大偏移量。默认设置为5秒间隔,并通过设置auto.commit.interval.ms进行控制。与使用者中的其他内容一样,自动提交是由轮询循环驱动的。每当您轮询时,使用者都会检查是否到了提交的时间,如果到了,它将提交上次轮询中返回的偏移量。
也许是因为我的英语不好,但我不完全理解这个描述。
假设我使用自动提交,默认间隔为5秒,轮询每7秒进行一次。在这种情况下,提交是每5秒发生一次还是每7秒发生一次?
如果投票每3秒进行一次,你能澄清一下行为吗?提交是每5秒一次还是每6秒一次?
我读过这个:
自动提交:可以将auto.commit设置为true,并使用毫秒值设置auto.commit.interval.ms属性。启用此选项后,kafka使用者将提交为响应其poll()调用而接收的最后一条消息的偏移量。poll()调用在后台以set auto.commit.interval.ms发出。
这与答案相矛盾。
你能详细解释一下吗。
假设我有这样的图表:
0秒-轮询
4秒-轮询
8秒-轮询
什么时候提交补偿?什么时候提交?

ss2ws0br

ss2ws0br1#

自动提交检查在每次轮询中都被调用,它检查经过的时间是否大于配置的时间。如果是,则提交偏移量。
如果提交间隔为5秒,轮询在7秒内发生,则提交将仅在7秒后发生。

smdncfj3

smdncfj32#

它将尝试在投票完成后尽快自动提交。您可以查看consumer coordinator的源代码,它在类级别上定义了一组本地字段,以了解是否启用了autocommit、间隔时间以及执行autocommit的下一个截止日期。
https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/consumercoordinator.java#l625
以及poll中执行调用do存储的位置之一https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/consumercoordinator.java#l279
例如,poll每7秒执行一次,autocommit设置为5:
0-轮询,+将截止时间设置为第5秒
7-由于截止日期,轮询+提交,将截止日期更新为7+5=12
14-投票+提交截止日期,将截止日期更新为12+5=17
但是,如果轮询设置为每3秒一次,并且自动提交设置为5:
0-轮询,+将截止时间设置为第5秒
3-轮询,不提交
6-根据截止日期轮询并提交,将截止日期更新为6+5=11

相关问题