我使用kafka队列来保存消费者应用程序要检索的一些对象,并对其执行一些操作。
问题:如果消费者的处理时间超过2小时,Kafka似乎会一次又一次地回馈同一个物体
代码:
private static Queue queue = new LinkedList();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.println("\n\n\n Kafka has :[" + record.offset());
queue.add(record.value());
}
System.out.println("\n\n\n Kafka has :[" + records.count());
if (queue != null) {
maintainQueue();
}
}
1条答案
按热度按时间6ojccjat1#
我使用的是Kafka版本0.10.1.0
我们已通过更新
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false"
和添加consumer.commitSync();
之后consumer.poll(Long.MAX_VALUE);
参考文献:kafka-如何使用高级使用者提交每条消息之后的偏移量?
http://www.slideshare.net/jjkoshy/offset-management-in-kafka