我正在尝试连接到Kafka服务器。身份验证基于gssapi。
/opt/app-root/src/server/node_modules/node-rdkafka/lib/error.js:411
return new LibrdKafkaError(e);
^
Error: broker transport failure
at Function.createLibrdkafkaError (/opt/app-root/src/server/node_modules/node-rdkafka/lib/error.js:411:10)
at /opt/app-root/src/server/node_modules/node-rdkafka/lib/client.js:350:28
这是我的测试\u kafka.js:
const Kafka = require('node-rdkafka');
const kafkaConf = {
'group.id': 'espdev2',
'enable.auto.commit': true,
'metadata.broker.list': 'br01',
'security.protocol': 'SASL_SSL',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.keytab': 'svc_esp_kafka_nonprod.keytab',
'sasl.kerberos.principal': 'svc_esp_kafka_nonprod@INT.LOCAL',
'debug': 'all',
'enable.ssl.certificate.verification': true,
//'ssl.certificate.location': 'some-root-ca.cer',
'ssl.ca.location': 'some-root-ca.cer',
//'ssl.key.location': 'svc_esp_kafka_nonprod.keytab',
};
const topics = 'hello1';
console.log(Kafka.features);
let readStream = new Kafka.KafkaConsumer.createReadStream(kafkaConf, { "auto.offset.reset": "earliest" }, { topics })
readStream.on('data', function (message) {
const messageString = message.value.toString();
console.log(`Consumed message on Stream: ${messageString}`);
});
1条答案
按热度按时间holgip5t1#
您可以查看此问题以了解此错误的解释:https://github.com/edenhill/librdkafka/issues/1987
取自@edenhill:
基于librdkafka的客户机的一般规则是:如果集群和客户机配置正确,则可以忽略所有错误,因为它们很可能是临时的,librdkafka将尝试自动恢复。
在这种特殊情况下;如果组协调器请求失败,将在500毫秒内重试(使用状态为up的任何代理)。如果在丢失的心跳超时成员资格(session.timeout.ms)之前找到新的协调器,则当前分配和组成员资格将不受影响。自动偏移提交将暂停,直到找到新的协调器。
在未来的版本中,我们将扩展错误类型以包含严重性,允许应用程序愉快地忽略非终端错误。此时,应用程序应该考虑所有的错误信息,而不是终端错误。