我在Kafka中有一个主题,其中我有一些记录,我想在消费者中建立一个更改历史。
要做到这一点,每当一条消息被附加到主题上,并为现有记录添加一个新值时(即,一条消息的键已经存在于主题中),我的消费者将检索相同键的所有消息,并在最后两条消息的值之间构建一个差异。
我不能使用无限保留,因为主题会变得太大,所以我希望使用压缩,但我找不到一种方法来确保旧消息不会在消费者阅读之前从日志中删除。
有没有一种方法可以配置清理来实现这一点?或者我应该为这个用例采取不同的方法?
我在Kafka中有一个主题,其中我有一些记录,我想在消费者中建立一个更改历史。
要做到这一点,每当一条消息被附加到主题上,并为现有记录添加一个新值时(即,一条消息的键已经存在于主题中),我的消费者将检索相同键的所有消息,并在最后两条消息的值之间构建一个差异。
我不能使用无限保留,因为主题会变得太大,所以我希望使用压缩,但我找不到一种方法来确保旧消息不会在消费者阅读之前从日志中删除。
有没有一种方法可以配置清理来实现这一点?或者我应该为这个用例采取不同的方法?
1条答案
按热度按时间fcg9iug31#
我找不到一种方法来确保旧消息不会在使用者读取之前从日志中删除
LogCleaner按照自己的时间表运行,并且仅在封闭段上运行,或者对于压缩主题,dirty+封闭段,其中“dirty”被定义为具有新键的旧值。
消费者将检索同一密钥的所有消息
压缩会主动删除这些数据。你需要手动将这些数据聚合到一个列表中,而不是让Kafka通过压缩删除这些值。
因此,一种方法是使用两个主题,首先使用常规数据作为压缩主题
然后某个进程使用它并构建自己的流
这也是紧凑的,所以第一个
k1
最终会被删除,但是您有所有以前值的历史记录,当在其上构建KTable时,您可以通过键查找所有值其他解决方案-如果Kafka中没有足够的存储空间来保存所有这些值,则将数据转储到外部数据库表。假设您的数据库将有更多的存储空间