消费者上的node kafka pause()方法有有效的版本吗?

v6ylcynt  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(651)

我无法使以下代码正常工作:

"use strict";

let kafka = require('kafka-node');
var conf = require('./providers/Config');

let client = new kafka.Client(conf.kafka.connectionString, conf.kafka.clientName);
let consumer = new kafka.HighLevelConsumer(client, [ { topic: conf.kafka.readTopic } ], { groupId: conf.kafka.clientName, paused: true });

let threads = 0;

consumer.on('message', function(message) {
    threads++;

    if (threads > 10) consumer.pause();

    if (threads > 50) process.exit(1);

    console.log(threads + " >>> " + message.value);
});

consumer.resume();

我在控制台中看到50条消息,并且进程通过终止语句退出。
我想知道的是,是我的代码坏了还是包坏了?或者我只是做错了什么?有没有人能让Kafka消费者使用暂停/恢复功能?我尝试了几个版本的Kafka节点,但它们的行为方式都是一样的。谢谢!

gmxoilav

gmxoilav1#

您已经在使用 pause 以及 resume 在你的代码中,所以很明显它们是有效的
是因为 pause 不会暂停消息的消费。它暂停获取消息。我猜在你收到第一条信息和第一个电话之前,你已经一次性拿到了前50个了 pause .
我刚测试了一下 pause() 以及 resume() 在节点repl中,它们按预期工作:

var kafka = require('kafka-node');
var client = new kafka.Client('localhost:2181');
var consumer = new kafka.HighLevelConsumer(client, [{topic: 'MyTest'}]);
consumer.on('message', (msg) => { console.log(JSON.stringify(msg)) });

然后我走进另一个窗口跑:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyTest

输入一些东西,它就会出现在第一个窗口。然后在第一个窗口中,我键入: consumer.pause(); 在第二个窗口再输入一些。第一个窗口中不显示任何内容。然后我就跑 consumer.resume() 在第一个窗口中,将显示延迟消息。
顺便说一句,你应该可以玩Kafka配置属性 fetch.message.max.bytes 并控制一次可以获取多少条消息。例如,如果您有500字节的固定宽度消息,请设置 fetch.message.max.bytes 小于1000(但大于500!)每次获取仅接收一条消息。但请注意,这可能无法完全解决问题——我对node还比较陌生,但它是异步的,我怀疑在您完全(或根本)处理第一次提取之前,第二次提取可能会启动。

相关问题