我第一次使用节点和Kafka,使用Kafka节点。使用消息需要调用外部api,甚至可能需要一秒钟的时间来响应。我希望克服我的消费者的突然失败,在某种程度上,如果一个消费者失败,另一个消费者将消费,将取代它将收到相同的信息,其工作没有完成。
我正在使用kafka 0.10并尝试使用consumergroup。
我想到了设置 autoCommit: false
在options中,只在消息的工作完成后提交消息(正如我不久前对一些java代码所做的那样)。
但是,我似乎不能确定只有在消息完成后我应该如何正确地提交它。我该如何承诺?
我的另一个担心是,由于回调,下一条消息似乎在上一条消息完成之前就被读取了。我担心如果消息x+2在消息x+1之前完成,那么偏移量将被设置为x+2,因此在失败的情况下x+1将永远不会被重新执行。
到目前为止,我基本上是这么做的:
var options = {
host: connectionString,
groupId: consumerGroupName,
id: clientId,
autoCommit: false
};
var kafka = require("kafka-node");
var ConsumerGroup = kafka.ConsumerGroup;
var consumerGroup = new ConsumerGroup(options, topic);
consumerGroup.on('connect', function() {
console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic);
});
consumerGroup.on('message', function(message) {
console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
console.log(message.value);
doSomeStuff(function() {
// HOW TO COMMIT????
consumerGroup.commit(function(err, data) {
console.log("------ Message done and committed ------");
});
});
});
consumerGroup.on('error', function(err) {
console.log("Error in consumer: " + err);
close();
});
process.once('SIGINT', function () {
close();
});
var close = function() {
// SHOULD SEND 'TRUE' TO CLOSE ???
consumerGroup.close(true, function(error) {
if (error) {
console.log("Consuming closed with error", error);
} else {
console.log("Consuming closed");
}
});
};
1条答案
按热度按时间uklbhaso1#
您可以在这里做的一件事是为您处理的每条消息提供重试机制。
你可以参考我的答案:https://stackoverflow.com/a/44328233/2439404
我使用
kafka-consumer
,使用async/cargo
把它们放进去async/queue
(内存队列中)。队列接受一个worker函数作为参数,我将一个async/retryable
.对于您的问题,您可以使用retryable对您的消息进行处理。https://caolan.github.io/async/docs.html#retryable
这也许能解决你的问题。