我在和Kafka玩,试着抓住它。我们需要做的一件事是运行负载平衡的服务器集(为了冗余/高可用性等),然后彼此独立地重新启动。应该很简单。
但我发现有点奇怪。如果我运行一个kafka消费者来处理一组消息,然后在处理消息的同时将第二个消费者添加到同一消费者组,那么我会多次获得整个消息集,而不是一次。
例如,以下是我的日志文件:https://gist.github.com/sazzer/5604d0652ff14533654c8b543942c10e
这是使用2个主题-Kafka现场和Kafka批量。每个主题有2个分区-每个使用者一个。然后,测试将向批量队列添加20条消息,然后向实时队列添加10条消息(实际上是在测试其他东西,但我只是重新使用了设置)
从日志中,您将看到每条消息总共被处理了3次,而不是像我预期的那样只有一次。
其代码如下:https://gist.github.com/sazzer/c67e4db9a04aac8c0d46bbc21188775d
这是使用Spring Boot和SpringKafka,除了这一个案件,它只是工作。
当一个新的消费者出现时,我是否错过了阻止它重播所有信息的方法?或者这只是我必须处理的事情?
干杯
1条答案
按热度按时间mtb9vblg1#
尝试设置
ConsumerConfig.ENABLE_AUTO_COMMIT
至false
.监听器容器不依赖客户机进行提交,而是在处理完每批记录之后提交偏移量;当再平衡发生时,它还将提交任何挂起的补偿;您还可以设置
AckMode
至RECORD
它将提交处理的每条记录的偏移量。您还可以手动将分区分配给示例,而不使用组管理进行分配。