我想设置一个小的 retention_ms
例如1200000,有时用于移除初始脏负载。
json是完全正确的,因为它不工作,另外我还尝试设置其他参数。但是,我的消费者仍然可以获取两周前存在的数据。
{"config": {"segment.ms": "1200000", "retention.ms": "1200000", "cleanup.policy": "delete", "segment.jitter.ms": "1200000", "delete.retention.ms": "1200000", "min.cleanable.dirty.ratio": "0.01"}, "version": 1}
总而言之,我想知道默认的retention.ms是7天,但是我的消费者最早可以用autooffer.reset在7天内到达存在的数据,
那么,Kafka为什么会忽视我的留任呢?
1条答案
按热度按时间ss2ws0br1#
在阅读源代码之后https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/adminutils.scala#l492 ,
kafka通过在zookeeper中存储更改通知来更新topic的配置。所以问题是将配置json存储在
/config/topics
还不够,您需要将json存储在/config/changes
在Zookeeper也一样。