nodejs和kafka键控分区器

kdfy810k  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(358)

我正在尝试与kafka highlevelproducer一起使用键控分区,关于4分区主题,我有以下代码:

var kafka = require('kafka-node'),
      HighLevelProducer = kafka.HighLevelProducer,
      client = new kafka.Client(Host+":"+Port,client_id),
      producer = new HighLevelProducer(client,{partitionerType: 3});
.
.
   theKey = theKey+1;
   if ( theKey > Nb_key ) {
      theKey = 0;
   }
   var payloads = [
        { topic: Topic, key: theKey, messages: JSON.stringify({"hello": "world", "Timestamp": +timestamp}) }
      ];
      producer.send(payloads, function (err, data) {
      });

我检查了消息是否具有正确的键值,但仍然将所有消息发送到分区1。你觉得我做的有什么不对吗?
谢谢

3bygqnnd

3bygqnnd1#

看看代码,我想我理解了问题所在。baseproducer将密钥作为字符串进行哈希运算,结果总是相同的,所以总是相同的分区。我编写了自己的自定义分区器,用分区的nb对键进行简单的模运算,效果非常好。参见下面的代码:

var MyPartitioner = function (partitions, key) {
   key = key || '0';
   var index = parseInt(key) % partitions.length;
   return partitions[index];
   };
   var kafka = require('kafka-node'),
      HighLevelProducer = kafka.HighLevelProducer,
      client = new kafka.Client(Host+":"+Port,client_id),
      producer = new HighLevelProducer(client,{requireAcks: 0,partitionerType: 4},MyPartitioner );
      producer.on('ready', function () {
      timer.setInterval(send_data, '',SendIntervalInUsecs+'u');
   });

相关问题