我在AWS中使用Kafka和MSK还是个新手。我使用Kafkajs从lambda写入MSK集群。我的记录成功写入Kafka集群,但我的客户端也将连接超时错误记录到CloudWatch中。我很好奇我是否可以在代码中做一些不同的事情来避免错误日志。
这是我的生产商代码:
const client = new Kafka({
clientId: "client-id",
brokers: ["broker1:9092", "broker2:9092"], // example brokers used here
});
const producer = client.producer({
idempotent: true
});
const record = {
topic: "topic1",
messages: [
{ value: JSON.stringify("message") }
]
};
await producer
.connect()
.then(async () => await producer.send(record))
.then(async () => await producer.disconnect())
.catch(err => throw new Error(JSON.stringify(err)));
下面是错误输出的一个示例:
{
"level": "ERROR",
"timestamp": "2022-12-05T20:44:06.637Z",
"logger": "kafkajs",
"message": "[Connection] Connection timeout",
"broker": "[some-broker]:9092",
"clientId": "[some-client-id]"
}
我不确定我是否只需要增加客户端的连接超时,或者我在初始化中遗漏了一些东西。就像我说的,记录仍然会进入集群,但是我想清理日志,这样我就不会经常看到这个错误。有人遇到过这个问题并解决了它吗?或者这是使用MSK和Kafka时看到的正常事情吗?
1条答案
按热度按时间r6hnlfcb1#
这不是一个令人兴奋的答案,但事实证明,其中一个代理没有正确配置我们的Transit Gateway以允许来自VPC的流量。故事的寓意是,始终检查代理端点的配置。
我通过Transit Gateway使用lambda中的kafkajs从一个帐户向另一个帐户发送数据。代码本身按预期工作,但网关配置不正确,无法允许流量从工作帐户通过其中一个代理进入MSK群集。