我无法使以下代码正常工作:
"use strict";
let kafka = require('kafka-node');
var conf = require('./providers/Config');
let client = new kafka.Client(conf.kafka.connectionString, conf.kafka.clientName);
let consumer = new kafka.HighLevelConsumer(client, [ { topic: conf.kafka.readTopic } ], { groupId: conf.kafka.clientName, paused: true });
let threads = 0;
consumer.on('message', function(message) {
threads++;
if (threads > 10) consumer.pause();
if (threads > 50) process.exit(1);
console.log(threads + " >>> " + message.value);
});
consumer.resume();
我在控制台中看到50条消息,并且进程通过终止语句退出。
我想知道的是,是我的代码坏了还是包坏了?或者我只是做错了什么?有没有人能让Kafka消费者使用暂停/恢复功能?我尝试了几个版本的Kafka节点,但它们的行为方式都是一样的。谢谢!
1条答案
按热度按时间gmxoilav1#
您已经在使用
pause
以及resume
在你的代码中,所以很明显它们是有效的是因为
pause
不会暂停消息的消费。它暂停获取消息。我猜在你收到第一条信息和第一个电话之前,你已经一次性拿到了前50个了pause
.我刚测试了一下
pause()
以及resume()
在节点repl中,它们按预期工作:然后我走进另一个窗口跑:
输入一些东西,它就会出现在第一个窗口。然后在第一个窗口中,我键入:
consumer.pause();
在第二个窗口再输入一些。第一个窗口中不显示任何内容。然后我就跑consumer.resume()
在第一个窗口中,将显示延迟消息。顺便说一句,你应该可以玩Kafka配置属性
fetch.message.max.bytes
并控制一次可以获取多少条消息。例如,如果您有500字节的固定宽度消息,请设置fetch.message.max.bytes
小于1000(但大于500!)每次获取仅接收一条消息。但请注意,这可能无法完全解决问题——我对node还比较陌生,但它是异步的,我怀疑在您完全(或根本)处理第一次提取之前,第二次提取可能会启动。