我已经在kafka服务器上创建了主题,现在我正在创建一个消费者来读取服务器上的主题消息,但是当我尝试使用 consumer.on('message')
,知道下面代码中实现的错误是什么吗,我需要设置偏移量吗?
消费者.js
var kafka = require('kafka-node');
var config = require('./config.js');
var zk = require('node-zookeeper-client');
var kafkaConn = config.kafkaCon.dit;
var HighLevelConsumer = kafka.HighLevelConsumer;
var Client = kafka.Client;
function start() {
topics = [{
topic: 'test-1'
}];
var groupId = 'push';
var clientId = "consumer-" + Math.floor(Math.random() * 10000);
var options = {
autoCommit: true,
fetchMaxWaitMs: 100,
fetchMaxBytes: 10 * 1024 * 1024,
groupId: groupId
};
console.log("Started consumer: ", clientId);
var consumer_client = new kafka.Client(kafkaConn, clientId);
var client = new Client(consumer_client.connectionString, clientId);
var consumer = new HighLevelConsumer(client, topics, options);
consumer.on('message', function(message) {
var topic = message.topic;
console.log('Message', topic);
});
};
start();
2条答案
按热度按时间c2e8gylq1#
Kafka消费者可以用
kafka-node
npm模块。对于我的用例,我的消费者是一个独立的express服务器,它监听事件并将它们存储在数据库中。更多信息https://nodewebapps.com/2017/11/04/getting-started-with-nodejs-and-kafka/
6g8kf2rb2#