我在试着用Kafka节点
库将数据推送到现有主题。我已经用curl将schema添加到schemaregistry中,并且还添加了主题。我得到以下错误:
Error: Topic name or id is needed to build Schema
at new Shema (/work/node_modules/kafka-node-avro/lib/schema.js:7:41)
at Object.Pool.getByName (/work/node_modules/kafka-node-avro/lib/schemaPool.js:70:10)
at new Promise (<anonymous>)
我的代码片段如下:
const Settings = {
"kafka" : {
"kafkaHost" : config.KafkaHost
},
"schema": {
"registry" : config.KafkaRegistry
}
};
console.log("settings registry: ", config.KafkaRegistry);
console.log("settings kafkaHost: ", config.KafkaHost)
KafkaAvro.init(Settings).then( kafka => {
const producer = kafka.addProducer();
let payloads = [
{
topic: 'classifier-response-test',
messages: JSON.stringify(kafkaData)
}
];
producer.send(payloads).then( success => {
// Message was sent encoded with Avro Schema
console.log("message sent ! Awesome ! :) ")
}, error => {
// Something wrong happen
console.log("There seems that there is a mistake ! try Again ;) ")
console.log(error)
});
} , error => {
// something wrong happen
console.log("There seems that there is a global mistake ! try Again ;) ")
console.log(error)
});
2条答案
按热度按时间vdgimpew1#
问题是我们需要将主题列表放在模式设置中,以便在主题和模式注册表之间建立链接。我们可以将schema id或topic name。
tsm1rwdh2#
图书馆还没有准备好
send
消息列表,当尝试在发送机制中将架构动态添加到架构池中时,它抱怨的原因。最简单的解决方案是一次发送一条消息,在您的代码示例中可能是这样的感谢您使用Kafka节点avro