我在任何地方都找不到它的文档,但是Kafka如何处理那些无法访问的记录呢?
我发现我可以为v1.1kafka集群上的压缩主题生成记录,并为 cleanup.policy=delete
,并使用v1.1 AdminClient.deleteRecords()
方法从主题分区中“删除”记录,最后还原 cleanup.policy=compact
配置。
实际上在幕后发生的不是从日志段中删除记录,而是kafka将每个分区的初始偏移量增加到deleterecords方法提供的值:
[2019-02-07 01:46:55,282] INFO [Log partition=delete-records-compact-topic-2, dir=/data/kafka] Incrementing log start offset to 505317 (kafka.log.Log)
[2019-02-07 01:46:55,295] INFO [Log partition=delete-records-compact-topic-1, dir=/data/kafka] Incrementing log start offset to 485663 (kafka.log.Log)
[2019-02-07 01:46:55,298] INFO [Log partition=delete-records-compact-topic-3, dir=/data/kafka] Incrementing log start offset to 478872 (kafka.log.Log)
新使用者将开始从这些初始偏移量中读取数据,因此记录将从他们的Angular 有效地删除,但日志文件中的数据仍然存在。因为日志文件上的数据不会被 deleteRecords
api调用,并且压缩主题不受保留的约束,日志文件中的数据是否会一直存在,直到更新现有密钥并进行压缩?或者日志清理器/调度程序最终会删除它,因为初始偏移量之前的记录不再对使用者可用?
我没有完全遵循受支持的模式,因为如果主题仍然配置为,adminclient.deleterecords方法会抛出policyviolationexception cleanup.policy=compact
,但是这个方法对我来说要比基于保留的主题清除或主题删除/重新创建方法简单得多。
暂无答案!
目前还没有任何答案,快来回答吧!