提供给我的凭证:-(我还有Kafka.keystore.jks和Kafka.truststore.jks)
host: xxxxx-xxxxx-x.cloudclusters.net
port: xxxxx
ip: xxx.xxx.xxx.xx
trustore pw: xxxxxxxx
keystore pw: xxxxxxxx
字符串
我想我没有使用所有提供的凭据。
import * as dotenv from 'dotenv'
import express from 'express'
import { Kafka } from 'kafkajs';
import { Partitioners } from 'kafkajs';
import jks from 'jks-js';
import fs from 'fs';
const keystore = jks.toPem(
fs.readFileSync('./kafka.keystore.jks'),
'mypassword'
);
const trustore = jks.toPem(
fs.readFileSync('./kafka.truststore.jks'),
'mypassword'
);
const {
caroot: {ca},
localhost: {key,cert} } = keystore;
// const { caroot: {ca} } = trustore;
console.log("**************** kafka.keystore.jks ****************");
// console.log(keystore)
console.log("ca ===>", ca);
console.log("key ===>", key);
console.log("cert ===>", cert);
console.log("**************** kafka.truststore.jks ****************");
// setting up kafka
const kafka = new Kafka({
clientId: 'qa-topic',
brokers: ['xxxxxxx.cloudclusters.net:xxxxx'], //HOST:PORT
ssl: {
rejectUnauthorized: false,
ca: ca,
key: key,
cert: cert
},
})
const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner })
producer.on('producer.connect', () => {
console.log(`KafkaProvider: connected`);
});
producer.on('producer.disconnect', () => {
console.log(`KafkaProvider: could not connect`);
});
producer.on('producer.network.request_timeout', (payload) => {
console.log(`KafkaProvider: request timeout ${payload.clientId}`);
});
const run = async () => {
// Producing
await producer.connect()
await producer.send({
topic: 'supplier-ratings',
messages: [
{
value: Buffer.from(JSON.stringify(
{
"event_name": "QA",
"external_id": user_uuiD,
"payload": {
"supplier_id": i.supplier_id,
"assessment": {
"performance": 7,
"quality": 7,
"communication": 7,
"flexibility": 7,
"cost": 7,
"delivery": 6
}
},
"metadata": {
"user_uuid": "5a12cba8-f4b5-495b-80ea-d0dd5d4ee17e"
}
}
))
},
],
})
//Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
const port = process.env.PORT || 5000;
app.listen(port, () => {
console.log(`I am listening at ${port}`);
});
型
我已经从我的kafka.keystore.jks
得到了ca
,key
,cert
。我根据文档在SSL
对象中传递它们。但仍然得到Client network socket disconnected before secure TLS connection was established
错误。
我无法与Kafka集群建立连接。我相信我丢失了一些密钥。我正在跟踪Kafka.js's Documentation。
1条答案
按热度按时间nfs0ujit1#
将默认值
connectionTimeout
增加到25000帮助了我。字符串