kafka节点

wj8zmpe1  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(335)

有一个基本的例子,就像一个魅力为1消费者。它接收消息。但添加一个额外的消费者将被忽略。

let kafka = require('kafka-node');
let client = new kafka.Client();

let producer = new kafka.Producer(client);

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]);
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}]);

producer.on('ready', function () {

    producer.send([{topic:'topic1', messages: 'topic 1 msg' ], (err,data)=>{
        console.log(err,'1 sent');
    });
    producer.send([{topic:'topic2', messages: 'topic 1 msg'}], (err,data)=>{
        console.log(err, '2 sent');
    });

});
producer.on('error', function (err) {
    console.log('err', err);
})

consumer1.on('message',(message) =>{
    console.log(11, message);
});
consumer2.on('message',(message) =>{
    console.log(22, message);
})

consumer2的事件“22”从未触发的问题。如果我用命令行工具检查该主题,那么该主题的数据就存在了

k97glaaz

k97glaaz1#

您忘记在consumer参数中添加使用者组。在您的案例中,两个消费者都属于一个消费者组(默认情况下称为 kafka-node-group ). 拥有一个用户组意味着每个消息将在每个用户组中传递一次。由于主题有0个分区,因此只有第一个使用者将处理消息。
如果要向两个消费者发送相同的消息,则必须有两个消费者组(每组一个消费者)。您可以按以下方式进行:

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}], {groupId: 'group1'});
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}], {groupId: 'group2'});

如果您希望您的使用者(按分区)一起处理消息,则需要增加主题中的分区数。

相关问题