加里和Spring的Kafka社区你好!:)
我看到here在使用MANUAL_IMMEDIATE ack模式时,它应该提交所有分区中接收到记录的最高偏移量。这不是我遇到的行为。
我使用的是@KafkaListener,并且有一个单独的对象和一个Acknowledgement对象作为我的两个方法参数。我需要在内存中累积消息,在达到某个阈值后将它们发送出去,然后提交所有上述消息。我的计划是在发送它们之前提交/确认收到的最后一条消息。
我确实尝试过保留一个Map〈partitionId,Acknowledgement〉,并且每次都添加它。我确实观察到,当确认Map中的每个值(每个分区的最新确认)时,它也会提交所有之前的消息。
使用带有Spring的Spring Bootstrap 2.7.8用于Apache Kafka v2.8.11。
谢谢!
1条答案
按热度按时间wfauudbj1#
假定提交为其接收记录的所有分区的最高偏移量。
你怎么会这么想?如果文档有误导性,我愿意改正。
您需要确认每条记录(或者每个分区的最后一条记录),除非您的监听器是批处理模式监听器(接收来自
List
或原始ConsumerRecords
中轮询的所有记录),在这种情况下,Acknowledgment
用于整个批处理。