我正在使用camel-kafka来读取和发送消息到kafka集群。
我有15个生产者(不同的POD)向同一个Kafka主题发送消息。有4个消费者(不同的POD)处理这些消息。
处理消息需要一些时间,因此队列可能有一段时间未读消息。
信息很简单,只有一个很长的数字。
我想防止重复的未读消息不排队,因此只被处理一次。
有没有办法避免未读的重复消息在主题?
我已经尝试过压缩主题,用一个键发送消息-具有相同的有效负载值。但它不起作用。
我正在使用camel-kafka来读取和发送消息到kafka集群。
我有15个生产者(不同的POD)向同一个Kafka主题发送消息。有4个消费者(不同的POD)处理这些消息。
处理消息需要一些时间,因此队列可能有一段时间未读消息。
信息很简单,只有一个很长的数字。
我想防止重复的未读消息不排队,因此只被处理一次。
有没有办法避免未读的重复消息在主题?
我已经尝试过压缩主题,用一个键发送消息-具有相同的有效负载值。但它不起作用。
2条答案
按热度按时间tuwxkamq1#
防止唯一生产者发送 * 等效 * 有效载荷(而不是重复)的唯一方法是在将它们生产到Kafka之前存储和比较它们。
换句话说,每个生产者请求对于Kafka来说都是唯一的。代理永远不会解析和比较整个主题上的数据。这会大大降低速度,因此客户端有责任管理。
压缩主题不会 * 阻止 * 编写等价记录,它只是定期减少主题中唯一键的数量。它在后台线程中执行此操作,而不是每个生产请求,并从主题的未压缩“头部”开始
另一种解决方案是简单地生成你想要的任何东西,然后让消费者检查它是否以前见过相同的消息数据,然后跳过处理。你可以选择使用本地散列表的多层方法来做到这一点,当消费者进程重新启动时,它会被转储并恢复到一个更持久的系统(数据库或文件)中。
b4qexyjb2#
在Kafka中,它被称为 “精确一次处理”。您可以使用transactional producers and consumers来表示。下面是一个example