我的问题是,关于Kafka:
我有两个程序与Kafka交换数据(一个是生产,另一个是阅读)。假设顾客撞车了。生产者将继续发送消息,一段时间后,我们将重新启动客户。
根据我们当前的代理设置,以下是应该发生的情况:
-如果崩溃发生在不到1天前(offset.retention.minutes为1440),则检索偏移量,并处理等待的消息。
-如果崩溃发生在1天前,客户的新偏移量将重置为最早(因为auto.offset.reset是最早的)。问题是:如果一些消息已经被处理(在1天到7天之间),它们将再次被处理,因为kafka保存了7天的消息(log.retention.hours是168)。
解决方案是否简单到将offset.retention.minutes和log.retention.hours都设置为相同的值(当然,执行转换minutes<=>hours)?还是会有我错过的副作用?一个更简单的解决方案是删除已经处理过的消息,但Kafka似乎做不到。
谢谢你的阅读。
1条答案
按热度按时间cyvaqqii1#
我认为你的思路是正确的。
我将引用一个有趣的公开问题(你可以在这里找到)中的一些句子来说明这一点:
关于删除策略:
保留是按主题/分区进行的。如果给定主题/分区的上一次提交的偏移量的更新时间超过offsets.retention.minutes,则该偏移量将被垃圾收集。
偏移保留的默认值如此低的原因:
对于给定的使用者组、主题、分区,压缩后,最终只会有一条消息存储在偏移主题中。我们想要保护的是许多短命的消费群体。
你可以看看这个问题,你不是第一个提出这个问题的人。