我已经为kafka客户机和生产者创建了singleton类,以便只创建一个对象。我需要多次发布相同的主题,而不创建新的客户端和生产者示例。我发现producer.on('ready',fn(){})不是使用同一个客户机和producer示例触发的,只是在我有新的客户机和producer对象时第一次触发。
下面是示例代码:
单例类:
const kafka = require('kafka-node');
const logger = require('./../../../../applogger');
const kafkaConfig = require('./../../../../config/config');
function ConnectionProvider() {
let kafkaConnection = undefined;
let client = undefined;
this.getConnection = () => {
if (!this.kafkaConnection) {
logger.info("Creating new kafka connection ------------------------------------- ");
this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST);
this.kafkaConnection = new kafka.Producer(this.client);
}
return this.kafkaConnection;
};
this.getClient = () => {
if (!this.client) {
logger.info("Creating new kafka Client ------------------------------------- ");
this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST);
}
return this.client;
}
process.on('SIGINT', function() {
logger.info("Going to terminate kafka connection...!");
process.exit(0);
});
}
module.exports = exports = new ConnectionProvider;
主题发布方法:
const kafkaClient = require('./../core/kafkaConnection');
const publishToKafka = function(dataPayload, callback) {
logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload);
let producer = kafkaClient.getConnection();
producer.on('ready', function() {
let payloads = dataPayload;
producer.send(payloads, function(err, data) {
if (err) {
logger.error(
'Error in publishing message to messaging pipeline ', err
);
callback(err, null);
return;
}
logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data);
callback(null, data);
return;
});
});
producer.on('error', function(err) {
logger.error(
'Error in publishing message to messaging pipeline ', err
);
producer.close();
});
};
datapayload为:let datapayload=[{topic:sometopic,message:somemessage}]
我需要多次调用publishtokafka方法,但只想创建一个kafka客户机和生产者示例。但是producer没有发布主题,因为producer.on('ready',function(){})在使用客户机和producer的同一对象时没有触发。
提前谢谢。
1条答案
按热度按时间ao218c7q1#
我通过在每次调用后关闭kafka producer和client示例来解决这个问题,因为我需要多次发布到kafka producer,但默认情况下kafka zookeeper只允许最多60个连接(如果需要,我们可以增加连接的值)。所以这就是为什么要为单个kafka示例创建singleton类。
但是在创建单个kafka示例之后,它的producer.on('ready')事件不会触发,因为第二次使用kafka producer的同一个对象时,它已经处于就绪状态。所以我们每次都需要新的producer示例来发布。
不需要为单个对象创建单例类。