KafkaJSProtocolError:给定当前SASL状态,请求无效

yyyllmsg  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(139)

我有一个AWS MKS集群,我可以在其中创建一个主题,生成一条消息,并使用我的ec2服务器中安装的Kafka消费消息。但是当我尝试在我的nodejs应用程序中使用Kafka生产者时,我在npm启动时收到错误

{"level":"ERROR","timestamp":"2023-07-29T06:18:34.532Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"b-1.ashixxxxxx4.c3.kafka.ap-southeast-1.amazonaws.com:9094","clientId":"cab-allocation","error":"Request is not valid given the current SASL state","correlationId":1,"size":10}
{"level":"ERROR","timestamp":"2023-07-29T06:18:34.533Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Request is not valid given the current SASL state","retryCount":0,"retryTime":273}
KafkaJSProtocolError: Request is not valid given the current SASL state

我的kafkaproducer.js代码是:

const { Kafka, AwsSasl } = require('kafkajs');// Define the Kafka client configuration
const BROKER_1 = process.env.KAFKA_BROKER_1 as string
const BROKER_2 = process.env.KAFKA_BROKER_2 as string
const AWS_REGION = process.env.AWS_DEFAULT_REGION as string
const ACCESS_KEY_ID = process.env.AWS_ACCESS_KEY_ID as string
const SECRET_ACCESS_KEY = process.env.AWS_SECRET_ACCESS_KEY as string

const kafka = new Kafka({
  clientId: 'cab-allocation',
  brokers: ["b-1.axxxxxxx3.kafka.ap-southeast-1.amazonaws.com:9094,b-2.asxxxxx74.c3.kafka.ap-southeast-1.amazonaws.com:9094"], // Replace with your AWS MSK broker endpoints
  ssl: true,
  sasl: {
    mechanism: 'aws',
    authenticationProvider: AwsSasl,
   aws: {
     region: "ap-southeast-1",
     //authorizationIdentity:"Geolah",       
     secretAccessKey: "q7SI3JyeOxxxxxyYpyMhY0ciAou6TDXWdyR6h",
     accessKeyId: "AKIAxxxxS25GD37NEJ",
   },
 },
});

const producer = kafka.producer();
producer.connect();

export async function sendKafkaMessage(topic: string, message: any) {
  const result = await producer.send({
    topic,
    messages: [
      { value: message }
    ]
  });
  console.log(result);
  return result;
}

producer.on('producer.connect', () => {
  console.log('Kafka producer connected');
});

(我使用了正确的AWS凭证和MSK经纪人)

相关问题