kafka使用者:如果未提交以前的消息偏移量和自动提交被禁用,则希望再次读取相同的消息

lokaqttq  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(534)

我已经关闭了自动提交和不提交抵消也从消费者阅读后。
检查的使用者延迟也保持不变,它确保偏移量不会被提交。但问题是,它正在消耗下一个msg,而不是相同的消息。
我怎么能一次又一次地读同样的信息呢。我应该能够阅读下一条消息,只有在以前的偏移已提交。请帮我做这个。

prdp8dxp

prdp8dxp1#

如果您知道您的kafka消费者当前正在访问哪个分区,您可以使用 kafkaconsumer.seek(partition, offset) 方法来告诉您的消费者从特定偏移量读取消息。例子:

//to get the partition from consumer record
val partition: Int = consumerRecord.partition() 

//to get offset of current record
val recordOffset = consumerRecord.offset() 

if(data processing fail condition)
  consumer.seek(new TopicPartition(topic, partition), recordOffset )

//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100)

相关问题