我在上面建立了一个排队系统 Apache Kafka
. 应用程序将生成特定的消息 Kafka topic
在消费端,我必须消费与主题相关的所有记录。
我使用新的javaconsumerapi编写了consumer。代码看起来像
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("Data recieved : "+record.value());
}
}
在这里,我需要永远运行consumer,这样制作人推到kafka主题中的任何记录都应该立即被消费和处理。
所以我的困惑是,使用无限while循环(如示例代码中)来使用数据是正确的方法吗?
4条答案
按热度按时间kfgdxczn1#
它对我很有用,但您可能希望将内部循环放在try/catch块中,以防抛出任何异常。如果断开连接,还可以考虑定期重新连接任务。
uurity8g2#
是的,使用无限循环使用数据是正确的方法。
消费者通常是长时间运行的应用程序,它们不断地轮询kafka以获取更多数据。消费者必须继续轮询kafka,否则他们将被视为死亡,他们正在消费的分区将被交给组中的另一个消费者继续消费。
poll()返回记录列表。每个记录都包含记录来自的主题和分区、分区内记录的偏移量以及记录的键和值。记录的处理是特定于应用程序的。
如果退出循环,请始终在退出前关闭()使用者。这将关闭网络连接和套接字,并立即触发重新平衡。
tkclm6bt3#
是的,你可以使用无限循环。实际上,这不是一个繁忙的循环。在每次轮询期间,如果数据不可用,则调用将等待给定的时间段。
新用户自动处理网络通信问题。确保关机时耗电元件正常关闭。
sbdsn5lh4#
虽然有一个无限循环是可以的,但是在kafka消费者文档中可以找到一个稍微优雅一点的方法,如下所示:
这允许您选择使用钩子正常关闭。