一直在尝试使用vertx用java编写kafka消费者。
我需要将自动提交设置为false(特定用例)。
下面是执行显式轮询的代码
consumer.subscribe("test", ar -> {
if (ar.succeeded()) {
System.out.println("Consumer subscribed");
vertx.setPeriodic(1000, timerId -> {
consumer.poll(100, ar1 -> {
if (ar1.succeeded()) {
KafkaConsumerRecords<String, String> records = ar1.result();
for (int i = 0; i < records.size(); i++) {
KafkaConsumerRecord<String, String> record = records.recordAt(i);
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
}
}
});
});
} });
手动提交:
consumer.commit(ar -> {
if (ar.succeeded()) {
System.out.println("Last read message offset committed");
}
});
我的问题是,如果轮询频率设置为1000ms并且提交是手动的,那么如果消息没有在1000ms内处理,会发生什么?
下一次轮询将在处理第一组消息之前完成吗?如果是,它会再次获取同一组消息(尚未提交)还是一组较新的消息?
1条答案
按热度按时间bakd9h0s1#
查看
KafkaConsumer#poll
:在每次轮询中,使用者将尝试使用上次使用的偏移量作为起始偏移量,并按顺序获取。最后消耗的偏移量可以通过seek(topicpartition,long)手动设置,也可以自动设置为订阅的分区列表的最后提交偏移量
最后消耗的偏移量是指
KafkaConsumer
,而不是它所承诺的那个。这意味着它不会再次获取相同的消息,但它将获取下一个100条消息。