//to get the partition from consumer record
val partition: Int = consumerRecord.partition()
//to get offset of current record
val recordOffset = consumerRecord.offset()
if(data processing fail condition)
consumer.seek(new TopicPartition(topic, partition), recordOffset )
//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100)
1条答案
按热度按时间prdp8dxp1#
如果您知道您的kafka消费者当前正在访问哪个分区,您可以使用
kafkaconsumer.seek(partition, offset)
方法来告诉您的消费者从特定偏移量读取消息。例子: