系统实现了kafka流处理,实现了事务处理。解决方案如下:,kafka生产者将事件发布到kafka主题,流处理器处理输入事件并执行聚合操作。在流处理之后,事件将被发布到另一个主题。由于第一个主题中没有实现使用者,因此如何从第一个主题中删除已处理的消息。
oug3syen1#
考虑到流处理链是第一个主题的使用者。如果您出于某种原因需要重新处理原始数据(例如,如果您意识到流处理逻辑中有一个bug),那么您可能希望即使在处理了原始消息之后,也能在第一个主题中获得原始消息。因此,您不需要删除邮件,您必须针对该主题设置一个适合您需要的保留策略。取舍通常是数据可用的时间量与所需的存储量。
ttcibm8c2#
无法手动删除Kafka的消息(磁盘上没有黑客数据,afaik)。您只有3个选项:使用基于时间的保留策略(例如,让kafka自动删除超过1小时的所有邮件)使用基于存储的保留策略(让kafka将主题大小保持为某个预定义的值)使用主题压缩策略-让Kafka保留最新版本的密钥。所有旧版本的密钥将被删除(压缩)。正如luciano afranllie所描述的,您不需要手动删除消息。您可以处理消息,并让Kafka根据您的策略管理主题。
f45qwnt83#
有一个kafka改进建议(kip)为这个用例添加这个功能。https://cwiki.apache.org/confluence/display/kafka/kip-107%3a+add+purgedatabefore%28%29+api+in+adminclient目前,所有用于删除消息的scala代码都在0.11kafka中,并且已经过测试https://github.com/apache/kafka/pull/2476但是,在javaadminclientapi和文档中添加这个功能还不完整。
3条答案
按热度按时间oug3syen1#
考虑到流处理链是第一个主题的使用者。如果您出于某种原因需要重新处理原始数据(例如,如果您意识到流处理逻辑中有一个bug),那么您可能希望即使在处理了原始消息之后,也能在第一个主题中获得原始消息。
因此,您不需要删除邮件,您必须针对该主题设置一个适合您需要的保留策略。取舍通常是数据可用的时间量与所需的存储量。
ttcibm8c2#
无法手动删除Kafka的消息(磁盘上没有黑客数据,afaik)。您只有3个选项:
使用基于时间的保留策略(例如,让kafka自动删除超过1小时的所有邮件)
使用基于存储的保留策略(让kafka将主题大小保持为某个预定义的值)
使用主题压缩策略-让Kafka保留最新版本的密钥。所有旧版本的密钥将被删除(压缩)。
正如luciano afranllie所描述的,您不需要手动删除消息。您可以处理消息,并让Kafka根据您的策略管理主题。
f45qwnt83#
有一个kafka改进建议(kip)为这个用例添加这个功能。
https://cwiki.apache.org/confluence/display/kafka/kip-107%3a+add+purgedatabefore%28%29+api+in+adminclient
目前,所有用于删除消息的scala代码都在0.11kafka中,并且已经过测试
https://github.com/apache/kafka/pull/2476
但是,在javaadminclientapi和文档中添加这个功能还不完整。