我有一个关于Kafka消费者的问题:
enable.auto.commit=false
max.poll.records=1
auto.offset.reset=latest
group.id="processor-1"
KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties);
consumer.subscribe(Arrays.asList(topic));
while(true) {
ConsumerRecords<String,String> records=consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String,String> record: records){
logger.info("Value:" +record.value());
logger.info("Partition:" + record.partition()+",Offset:"+record.offset());
//consumer.commitSync();
}
}
正如你所看到的,自动提交是关闭的,我们没有acking记录,你能告诉我什么时候之后,我们可以有相同的消息。如果同一组中的多个消费者(如消息“A”)被消费者-1读取但没有确认,那么当相同的消息可以传递到消费者-2时,它将如何表现
1条答案
按热度按时间3htmauhk1#
当这个JVM进程重新启动时,您将得到相同的消息,因为您没有提交任何偏移量。