流处理后从kafka主题中删除消息

vaqhlq81  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(379)

系统实现了kafka流处理,实现了事务处理。解决方案如下:,
kafka生产者将事件发布到kafka主题,流处理器处理输入事件并执行聚合操作。在流处理之后,事件将被发布到另一个主题。由于第一个主题中没有实现使用者,因此如何从第一个主题中删除已处理的消息。

oug3syen

oug3syen1#

考虑到流处理链是第一个主题的使用者。如果您出于某种原因需要重新处理原始数据(例如,如果您意识到流处理逻辑中有一个bug),那么您可能希望即使在处理了原始消息之后,也能在第一个主题中获得原始消息。
因此,您不需要删除邮件,您必须针对该主题设置一个适合您需要的保留策略。取舍通常是数据可用的时间量与所需的存储量。

ttcibm8c

ttcibm8c2#

无法手动删除Kafka的消息(磁盘上没有黑客数据,afaik)。您只有3个选项:
使用基于时间的保留策略(例如,让kafka自动删除超过1小时的所有邮件)
使用基于存储的保留策略(让kafka将主题大小保持为某个预定义的值)
使用主题压缩策略-让Kafka保留最新版本的密钥。所有旧版本的密钥将被删除(压缩)。
正如luciano afranllie所描述的,您不需要手动删除消息。您可以处理消息,并让Kafka根据您的策略管理主题。

f45qwnt8

f45qwnt83#

有一个kafka改进建议(kip)为这个用例添加这个功能。
https://cwiki.apache.org/confluence/display/kafka/kip-107%3a+add+purgedatabefore%28%29+api+in+adminclient
目前,所有用于删除消息的scala代码都在0.11kafka中,并且已经过测试
https://github.com/apache/kafka/pull/2476
但是,在javaadminclientapi和文档中添加这个功能还不完整。

相关问题