在建立安全TLS连接之前,客户端网络套接字断开,如何在Node js中使用Kafka JS连接到Kafka集群?

ezykj2lf  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(133)

提供给我的凭证:-(我还有Kafka.keystore.jksKafka.truststore.jks

host: xxxxx-xxxxx-x.cloudclusters.net
port: xxxxx
ip: xxx.xxx.xxx.xx
trustore pw: xxxxxxxx
keystore pw: xxxxxxxx

字符串
我想我没有使用所有提供的凭据。

import * as dotenv from 'dotenv' 
import express from 'express'
import { Kafka } from 'kafkajs';
import { Partitioners } from 'kafkajs';
import jks from 'jks-js';
import fs from 'fs';

const keystore = jks.toPem(
    fs.readFileSync('./kafka.keystore.jks'),
    'mypassword'
);

const trustore = jks.toPem(
    fs.readFileSync('./kafka.truststore.jks'),
    'mypassword'
);

const { 
  caroot: {ca},
   localhost: {key,cert} } = keystore;
// const { caroot: {ca} } = trustore;

console.log("**************** kafka.keystore.jks ****************");
// console.log(keystore)

console.log("ca ===>", ca);
console.log("key ===>", key);
console.log("cert ===>", cert);

console.log("**************** kafka.truststore.jks ****************");

// setting up kafka
const kafka = new Kafka({
  clientId: 'qa-topic',
  brokers: ['xxxxxxx.cloudclusters.net:xxxxx'], //HOST:PORT
  ssl: {
    rejectUnauthorized: false,
    ca: ca,
    key: key,
    cert: cert
  },
 
})

const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner })

producer.on('producer.connect', () => {
  console.log(`KafkaProvider: connected`);
});
producer.on('producer.disconnect', () => {
  console.log(`KafkaProvider: could not connect`);
});
producer.on('producer.network.request_timeout', (payload) => {
  console.log(`KafkaProvider: request timeout ${payload.clientId}`);
});

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'supplier-ratings',
    messages: [
      {
        value: Buffer.from(JSON.stringify(
          {
            "event_name": "QA",
            "external_id": user_uuiD,
            "payload": {
              "supplier_id": i.supplier_id,
              "assessment": {
                "performance": 7,
                "quality": 7,
                "communication": 7,
                "flexibility": 7,
                "cost": 7,
                "delivery": 6
              }
            },
            "metadata": {
              "user_uuid": "5a12cba8-f4b5-495b-80ea-d0dd5d4ee17e"
            }
          }
        ))
      },
    ],
  })

  //Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

const port = process.env.PORT || 5000;
app.listen(port, () => {
  console.log(`I am listening at ${port}`);
});


我已经从我的kafka.keystore.jks得到了cakeycert。我根据文档在SSL对象中传递它们。但仍然得到Client network socket disconnected before secure TLS connection was established错误。
我无法与Kafka集群建立连接。我相信我丢失了一些密钥。我正在跟踪Kafka.js's Documentation

nfs0ujit

nfs0ujit1#

将默认值connectionTimeout增加到25000帮助了我。

const kafka = new Kafka({
  ... your configuration (clientId, brokers, ssl, etc...),
  connectionTimeout: 25000,
});

字符串

相关问题