嗨,我有一个类似下图的架构。我有两个Kafka生产者将发送消息到Kafka主题与频繁的重复消息。有没有一种方法,我可以处理的情况下,在一个简单的方式像服务巴士的主题。谢谢你的帮助。
643ylb081#
现在,apache kafka只支持一次交付:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
ejk8hzay2#
假设您实际上有多个不同的生产者编写相同的消息,我可以看到以下两个选项:1) 将所有副本写入单个kafka主题,然后使用kafka streams(或任何其他流处理器,如flink、spark streaming等)对消息进行重复数据消除,并将消除重复的结果写入新主题。下面是一个伟大的Kafka流使用国有商店的例子:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/java/io/confluent/examples/streams/eventdeduplicationlambdaintegrationtest.java2) 确保重复的消息具有相同的消息密钥。之后,您需要启用日志压缩,kafka最终将消除重复项。这种方法不太可靠,但如果您适当调整压缩设置,它可能会为您提供所需的内容。
2条答案
按热度按时间643ylb081#
现在,apache kafka只支持一次交付:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
ejk8hzay2#
假设您实际上有多个不同的生产者编写相同的消息,我可以看到以下两个选项:
1) 将所有副本写入单个kafka主题,然后使用kafka streams(或任何其他流处理器,如flink、spark streaming等)对消息进行重复数据消除,并将消除重复的结果写入新主题。
下面是一个伟大的Kafka流使用国有商店的例子:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/java/io/confluent/examples/streams/eventdeduplicationlambdaintegrationtest.java
2) 确保重复的消息具有相同的消息密钥。之后,您需要启用日志压缩,kafka最终将消除重复项。这种方法不太可靠,但如果您适当调整压缩设置,它可能会为您提供所需的内容。