我正在运行kafka(版本0.10.2)和spring数据(版本1.5.1.release),spring kafka(版本1.1.1.release)。
我有一个主题,一个消费者团体正在投票。我注意到,有时,当一个消费者重新启动时,主题的延迟会立即变成一个更大的数字。经过一些研究,我得出结论,Kafka重新启动抵消,但我不明白为什么。
enable.auto.commit = true
auto.commit.interval.ms = 5000
auto.offset.reset = smallest
log.retention.hours=168
滞后通常很低(低于500)并且在几毫秒内消耗掉,所以它不可能是一个超出范围的索引(或者可以吗?)
有人有主意吗?
1条答案
按热度按时间7gcisfzg1#
我不认为它真的像你期望的那样频繁地提交补偿,因此,当消费者重新启动时,组会重新平衡,然后在最近的自动提交补偿处进行恢复。
提交只会周期性地发生(5秒,根据您的配置),而不是基于每条消息。因此,预计最多可以看到5秒的重复数据,但不能看到主题的开头,除非根本没有提交偏移量(您应该在客户机中设置简单的log4j日志以确定这一点)
如果您想要更精细的控制,请禁用自动偏移提交,并调用consumer对象的commitsync或commitsync方法(这些是核心javaapi的方法,不确定spring)
一种选择可能是升级您的spring客户机,就像gary在下面所说的那样。既然您运行的是kafka 0.10.2+,这应该不是问题。