我正在使用kafka节点库,并测试高级生产者。
我创建了一个包含10个分区的主题“hlptestinput”,并编写了一个函数,每秒生成一个。
生产者写入分区0、2、4、6和8,但不写入奇数分区。
奇怪的是,当我从这个主题消费并生成第二个主题“hlptestinputfromconsumer”时,它有5个分区,所有的分区都会写入消息。
有没有我遗漏的配置?
const kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
ConsumerGroup = kafka.ConsumerGroup,
client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
producer = new HighLevelProducer(client),
consumer = new ConsumerGroup(
{
kafkaHost: 'smc-dev.silverbolt.lab:9092',
groupId: 'testGroup'
},
'HLPTestInput'
);
let index = 0;
setInterval(() => {
producer.send([{
topic: 'HLPTestInput',
messages: [index]
}], (err, data) => {
console.log('produced', data);
});
index++;
}, 1000);
consumer.on('message', (message) => {
console.log('consumed', message);
producer.send([{
topic: 'HLPTestInputFromConsumer',
messages: [message]
}], (err, data) => {
console.log('produced to secondary', data);
});
});
1条答案
按热度按时间kx5bkwkv1#
我不太确定,但可能是因为你用同一个制片人写了两个不同的主题。因为highlevelproducer使用循环写。因此,假设您的生产者写入“hlptestinput”主题,然后您将时间间隔设置为1000,因此在此期间,您的消费者收到消息,现在您的生产者写入“hlptestinputfromconsumer”主题。
因此,您的生产者在其分区0、2、4中编写“hlptestinput”主题。。。
以及“hlptestinputfromconsumer”主题的第1、3、5部分。。。
所以我建议你再找一个制作人。那就可以了。
请尝试以下代码: