我们的主题保留时间设置为7天(168小时)。当生产者发送消息时,消息被实时消耗。一切正常。不过,最近在一台生产服务器上,devops意外地将时区从pst更改为est,这是os补丁的一部分。
在kafka服务器重启之后,我们看到很少(不是全部,而是随机的)旧消息被消费者使用。我们要求devops将其更改回pst并重新启动。这个周末,这些旧信息再次出现。
我们在较低的环境(dev、qa、stage等)中没有看到这个问题。
Kafka版本:Kafka2.12-0.11.0.2
非常感谢您的帮助。
正在添加更多信息。。。最近我们的centos有一个补丁更新,不知何故,管理员从pst时区改为est,并启动了kafka服务器。。。之后,我们的消费者开始看到来自偏移量0的消息。调试之后,我发现时区发生了变化,管理员在4天后从est改回pst。我们的消息生产者定期在时区变化前后发送消息。时区从东部标准时间改回太平洋标准时间后,Kafka服务器重新启动,我看到下面的警告。
此日志发生在我们从est切换回pst时:(server.log)[2018-06-13 18:36:34430]警告由于要求失败而发现损坏的索引文件:发现损坏的索引,索引文件(/app/kafka\u 2.12-0.11.0.2/data/\u consumer\u offsets-21/00000000000000002076.index)大小非零,但最后一个偏移量是2076,它不大于基偏移量2076。}。正在删除/app/kafka\u 2.12-0.11.0.2/data/\u consumer\u offsets-21/00000000000000002076.timeindex、/app/kafka\u 2.12-0.11.0.2/data/\u consumer\u offsets-21/00000000000000002076.index和/app/kafka\u 2.12-0.11.0.2/data/\u consumer\u offsets-21/00000000000000002076.txn索引和重建索引(Kafka.log.log)
在时区从est更改回pst的3天后,我们重新启动了消费者,并开始再次看到偏移量为0的消费者消息。
2条答案
按热度按时间8cdiaqws1#
在kafka v2.3.0上,您可以设置
这意味着,在每1秒之后,使用者将向kafka提交其偏移量,或者每次从指定主题获取数据时,它将提交最新的偏移量。
因此,您的kafka消费者一旦启动并经过1秒,它就永远不会读取消费者收到并提交的消息。此设置不要求重新启动kafka服务器。
esyap4oy2#
我想这是因为你会在你之前重新启动程序
Commit
新的偏移量。管理补偿
对于每个使用者组,kafka为每个正在使用的分区维护提交的偏移量。当使用者处理消息时,它不会将其从分区中删除。相反,它只是使用一个称为提交偏移量的过程来更新其当前偏移量。
如果使用者在处理消息之后但在提交其偏移量之前失败,则提交的偏移量信息将不会反映消息的处理。这意味着该组中要分配分区的下一个使用者将再次处理该消息。
自动提交偏移量
提交偏移量的最简单方法是让kafka消费者自动完成。这很简单,但它提供的控制比手动提交要少。默认情况下,使用者每5秒自动提交一次偏移量。此默认提交每5秒发生一次,与使用者处理消息的进度无关。另外,当消费者打电话时
poll()
,这也会导致上一次调用返回的最新偏移量poll()
提交(因为它可能已被处理)。如果提交的偏移量超过了对消息的处理,并且存在使用者故障,则可能有些消息可能无法处理。这是因为处理在提交的偏移量处重新启动,该偏移量比失败前要处理的最后一条消息要晚。因此,如果可靠性比简单性更重要,通常最好手动提交偏移量。
手动提交偏移量
如果
enable.auto.commit
如果设置为false,则使用者手动提交其偏移量。它可以同步或异步地执行此操作。一种常见的模式是基于周期计时器提交最新处理的消息的偏移量。此模式意味着每个消息至少处理一次,但提交的偏移量永远不会超过正在处理的消息的进度。周期计时器的频率控制在使用者发生故障后可以重新处理的消息数。当应用程序重新启动或组重新平衡时,将再次从上次保存的提交偏移量中检索消息。committed offset是恢复处理的消息的偏移量。这通常是最近处理的消息的偏移量加上1。
从这篇文章,我认为是非常有帮助的。