Kafkajs日志中的Producer.send()“错误:[Connection]连接超时”,但仍写入流

avkwfej4  于 2022-12-17  发布在  Apache
关注(0)|答案(1)|浏览(99)

我在AWS中使用Kafka和MSK还是个新手。我使用Kafkajs从lambda写入MSK集群。我的记录成功写入Kafka集群,但我的客户端也将连接超时错误记录到CloudWatch中。我很好奇我是否可以在代码中做一些不同的事情来避免错误日志。
这是我的生产商代码:

const client = new Kafka({ 
    clientId: "client-id", 
    brokers: ["broker1:9092", "broker2:9092"],  // example brokers used here
});

const producer = client.producer({
    idempotent: true
});

const record = {
    topic: "topic1",
    messages: [
        { value: JSON.stringify("message") }
    ]
};

await producer
    .connect()
    .then(async () => await producer.send(record))
    .then(async () => await producer.disconnect())
    .catch(err => throw new Error(JSON.stringify(err)));

下面是错误输出的一个示例:

{
    "level": "ERROR",
    "timestamp": "2022-12-05T20:44:06.637Z",
    "logger": "kafkajs",
    "message": "[Connection] Connection timeout",
    "broker": "[some-broker]:9092",
    "clientId": "[some-client-id]"
}

我不确定我是否只需要增加客户端的连接超时,或者我在初始化中遗漏了一些东西。就像我说的,记录仍然会进入集群,但是我想清理日志,这样我就不会经常看到这个错误。有人遇到过这个问题并解决了它吗?或者这是使用MSK和Kafka时看到的正常事情吗?

r6hnlfcb

r6hnlfcb1#

这不是一个令人兴奋的答案,但事实证明,其中一个代理没有正确配置我们的Transit Gateway以允许来自VPC的流量。故事的寓意是,始终检查代理端点的配置。
我通过Transit Gateway使用lambda中的kafkajs从一个帐户向另一个帐户发送数据。代码本身按预期工作,但网关配置不正确,无法允许流量从工作帐户通过其中一个代理进入MSK群集。

相关问题