java—永远运行kafka consumer(新的consumer api)

4jb9z9bj  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(445)

我在上面建立了一个排队系统 Apache Kafka . 应用程序将生成特定的消息 Kafka topic 在消费端,我必须消费与主题相关的所有记录。
我使用新的javaconsumerapi编写了consumer。代码看起来像

Properties props = new Properties();  
                     props.put("bootstrap.servers", kafkaBrokerIp+":9092");  
                     props.put("group.id",groupId);  
                     props.put("enable.auto.commit", "true");
                     props.put("session.timeout.ms", "30000");
                     props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
                     consumer.subscribe(Arrays.asList("consumertest"));  
                     while (true) {  
                         ConsumerRecords<String, String> records = consumer.poll(100);  
                         for (ConsumerRecord<String, String> record : records){  
                             System.out.println("Data recieved : "+record.value());  
                             }  
                     }

在这里,我需要永远运行consumer,这样制作人推到kafka主题中的任何记录都应该立即被消费和处理。
所以我的困惑是,使用无限while循环(如示例代码中)来使用数据是正确的方法吗?

kfgdxczn

kfgdxczn1#

它对我很有用,但您可能希望将内部循环放在try/catch块中,以防抛出任何异常。如果断开连接,还可以考虑定期重新连接任务。

uurity8g

uurity8g2#

是的,使用无限循环使用数据是正确的方法。
消费者通常是长时间运行的应用程序,它们不断地轮询kafka以获取更多数据。消费者必须继续轮询kafka,否则他们将被视为死亡,他们正在消费的分区将被交给组中的另一个消费者继续消费。
poll()返回记录列表。每个记录都包含记录来自的主题和分区、分区内记录的偏移量以及记录的键和值。记录的处理是特定于应用程序的。
如果退出循环,请始终在退出前关闭()使用者。这将关闭网络连接和套接字,并立即触发重新平衡。

tkclm6bt

tkclm6bt3#

是的,你可以使用无限循环。实际上,这不是一个繁忙的循环。在每次轮询期间,如果数据不可用,则调用将等待给定的时间段。

long millisToWait = 100;
consumer.poll(millisToWait);

新用户自动处理网络通信问题。确保关机时耗电元件正常关闭。

sbdsn5lh

sbdsn5lh4#

虽然有一个无限循环是可以的,但是在kafka消费者文档中可以找到一个稍微优雅一点的方法,如下所示:

public class KafkaConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
           consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

这允许您选择使用钩子正常关闭。

相关问题