如何使用kafka节点控制已使用kafka消息的提交

gc0ot86w  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(327)

我第一次使用节点和Kafka,使用Kafka节点。使用消息需要调用外部api,甚至可能需要一秒钟的时间来响应。我希望克服我的消费者的突然失败,在某种程度上,如果一个消费者失败,另一个消费者将消费,将取代它将收到相同的信息,其工作没有完成。
我正在使用kafka 0.10并尝试使用consumergroup。
我想到了设置 autoCommit: false 在options中,只在消息的工作完成后提交消息(正如我不久前对一些java代码所做的那样)。
但是,我似乎不能确定只有在消息完成后我应该如何正确地提交它。我该如何承诺?
我的另一个担心是,由于回调,下一条消息似乎在上一条消息完成之前就被读取了。我担心如果消息x+2在消息x+1之前完成,那么偏移量将被设置为x+2,因此在失败的情况下x+1将永远不会被重新执行。
到目前为止,我基本上是这么做的:

var options = {
    host: connectionString,
    groupId: consumerGroupName,
    id: clientId,
    autoCommit: false
};

var kafka = require("kafka-node");
var ConsumerGroup = kafka.ConsumerGroup;

var consumerGroup = new ConsumerGroup(options, topic);

consumerGroup.on('connect', function() {
    console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic);
});

consumerGroup.on('message', function(message) {
    console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
    console.log(message.value);
    doSomeStuff(function() {
        // HOW TO COMMIT????
        consumerGroup.commit(function(err, data) {
            console.log("------ Message done and committed ------");
        });
    });
});

consumerGroup.on('error', function(err) {
    console.log("Error in consumer: " + err);
    close();
});

process.once('SIGINT', function () {
    close();
});

var close = function() {
    // SHOULD SEND 'TRUE' TO CLOSE ???
    consumerGroup.close(true, function(error) {
        if (error) {
            console.log("Consuming closed with error", error);
        } else {
            console.log("Consuming closed");
        }
    });
};
uklbhaso

uklbhaso1#

您可以在这里做的一件事是为您处理的每条消息提供重试机制。
你可以参考我的答案:https://stackoverflow.com/a/44328233/2439404
我使用 kafka-consumer ,使用 async/cargo 把它们放进去 async/queue (内存队列中)。队列接受一个worker函数作为参数,我将一个 async/retryable .
对于您的问题,您可以使用retryable对您的消息进行处理。https://caolan.github.io/async/docs.html#retryable
这也许能解决你的问题。

相关问题