我有一个AWS MKS集群,我可以在其中创建一个主题,生成一条消息,并使用我的ec2服务器中安装的Kafka消费消息。但是当我尝试在我的nodejs应用程序中使用Kafka生产者时,我在npm启动时收到错误
{"level":"ERROR","timestamp":"2023-07-29T06:18:34.532Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"b-1.ashixxxxxx4.c3.kafka.ap-southeast-1.amazonaws.com:9094","clientId":"cab-allocation","error":"Request is not valid given the current SASL state","correlationId":1,"size":10}
{"level":"ERROR","timestamp":"2023-07-29T06:18:34.533Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Request is not valid given the current SASL state","retryCount":0,"retryTime":273}
KafkaJSProtocolError: Request is not valid given the current SASL state
我的kafkaproducer.js代码是:
const { Kafka, AwsSasl } = require('kafkajs');// Define the Kafka client configuration
const BROKER_1 = process.env.KAFKA_BROKER_1 as string
const BROKER_2 = process.env.KAFKA_BROKER_2 as string
const AWS_REGION = process.env.AWS_DEFAULT_REGION as string
const ACCESS_KEY_ID = process.env.AWS_ACCESS_KEY_ID as string
const SECRET_ACCESS_KEY = process.env.AWS_SECRET_ACCESS_KEY as string
const kafka = new Kafka({
clientId: 'cab-allocation',
brokers: ["b-1.axxxxxxx3.kafka.ap-southeast-1.amazonaws.com:9094,b-2.asxxxxx74.c3.kafka.ap-southeast-1.amazonaws.com:9094"], // Replace with your AWS MSK broker endpoints
ssl: true,
sasl: {
mechanism: 'aws',
authenticationProvider: AwsSasl,
aws: {
region: "ap-southeast-1",
//authorizationIdentity:"Geolah",
secretAccessKey: "q7SI3JyeOxxxxxyYpyMhY0ciAou6TDXWdyR6h",
accessKeyId: "AKIAxxxxS25GD37NEJ",
},
},
});
const producer = kafka.producer();
producer.connect();
export async function sendKafkaMessage(topic: string, message: any) {
const result = await producer.send({
topic,
messages: [
{ value: message }
]
});
console.log(result);
return result;
}
producer.on('producer.connect', () => {
console.log('Kafka producer connected');
});
(我使用了正确的AWS凭证和MSK经纪人)
1条答案
按热度按时间moiiocjp1#
“请求无效”,因为您需要将端口9098/9198用于IAM,而不是9094
https://docs.aws.amazon.com/msk/latest/developerguide/port-info.html