如何在初始化失败时抛出KafkaJS客户端自定义错误?

uinbv5nw  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(142)

我使用以下代码初始化Kafka客户端:

this.kafka = new Kafka({
  clientId: <my_client_ID>,
  brokers: [
    `${process.env.KAFKA_BROKER_1}`,
    `${process.env.KAFKA_BROKER_2}`,
    `${process.env.KAFKA_BROKER_3}`,
  ],
  retry: {
    initialRetryTime: 3000,
    retries: 3,
  },

});

现在,如果连接到代理时出现问题,它将抛出如下错误:

{"level":"ERROR","timestamp":"2022-10-19T04:21:08.143Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"<broker_1>","clientId":"<my_client_id"}
{"level":"ERROR","timestamp":"2022-10-19T04:21:08.144Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":299}
{"level":"ERROR","timestamp":"2022-10-19T04:21:08.143Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"<broker_2>","clientId":"<my_client_id"}
{"level":"ERROR","timestamp":"2022-10-19T04:21:09.447Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":564}
{"level":"ERROR","timestamp":"2022-10-19T04:21:08.143Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"<broker_3>","clientId":"<my_client_id"}
{"level":"ERROR","timestamp":"2022-10-19T04:21:11.014Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1008}

现在,我想在这里修改日志消息,或者在发生这种情况后添加一个自定义消息。我想把它 Package 在一个try-catch块中,但由于某种原因,它没有抛出异常。因此:
1.是否有办法将此开箱即用日志更改为自定义日志?
1.如果Kafka客户端无法初始化,则无法执行(1),如何添加自定义long?

  • 谢谢-谢谢
lmvvr0a8

lmvvr0a81#

据我所知,您希望添加自定义日志来处理某些错误情况。
Kafkajs提供了一种创建我们自己的自定义日志并使用它们而不是默认日志的方法。
here是相同的参考。您可以检查您的kafkajs版本以避免兼容性问题。已在下面添加了一个示例。

{
        level: 4,
        label: 'INFO', // NOTHING, ERROR, WARN, INFO, or DEBUG
        timestamp: '2017-12-29T13:39:54.575Z',
        logger: 'kafkajs',
        message: 'Started',
        // ... any other extra key provided to the log function
    }
const { logLevel } = require('kafkajs')
const winston = require('winston')
const toWinstonLogLevel = level => switch(level) {
    case logLevel.ERROR:
    case logLevel.NOTHING:
        return 'error'
    case logLevel.WARN:
        return 'warn'
    case logLevel.INFO:
        return 'info'
    case logLevel.DEBUG:
        return 'debug'
}

const WinstonLogCreator = logLevel => {
    const logger = winston.createLogger({
        level: toWinstonLogLevel(logLevel),
        transports: [
            new winston.transports.Console(),
            new winston.transports.File({ filename: 'myapp.log' })
        ]
    })

    return ({ namespace, level, label, log }) => {
        const { message, ...extra } = log
        logger.log({
            level: toWinstonLogLevel(level),
            message,
            extra,
        })
    }
}

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['kafka1:9092', 'kafka2:9092'],
    logLevel: logLevel.ERROR,
    logCreator: WinstonLogCreator
})
  • 谢谢-谢谢

相关问题