我用的是 kafka-node
consumergroup阅读来自kafka的消息。我想阅读主题中的新消息,即不是所有以前未使用的消息,而是仅在创建consumergroup之后到达的消息。
选择 fromOffset: 'latest'
仅当给定groupid没有存储偏移量时才生效。我可以通过每次创建consumergroup时创建一个新的groupid来使用它,但是我希望避免这种方法,因为这样会在kafka服务器上创建大量groupid和存储的偏移量。
我还尝试手动将偏移设置为 -1
这样地:
offset = new kafka.Offset(consumer.client);
offset.commit(groupId, [{ topic: topic, partition: 0, offset: -1 }],
function (err, data) {
logger.debug("tried to set offset: ", data);
if (err) logger.error("error", err);
})
);
这将返回一个错误代码0,该代码应该是成功的,但consumergroup随后会收到旧消息。
Kafka版本:0.10.0Kafka节点:1.6.2
3条答案
按热度按时间jhiyze9q1#
commit的第一个参数应该是groupid,而不是clientid。
您应该将消费者的组名传递给它。
kmynzznz2#
对于那些注意到即使将“autocommit”设置为“false”以上提到的解决方案也不起作用,并且偏移量仍然存储在group:中的人,请确保在option object中设置附加标志:commitofsetsonfirstjoin:false。如果未设置标志,则偏移量将按组存储,并且一旦第二次启动,它将获取从该偏移量开始的所有消息
我希望这有助于节省(很多)时间;)
pdtvr36n3#
使用以下选项启动kafka consumergroup:
创建一个consumergroup,它永远不会向kafka提交偏移量(除非您手动执行,这是不需要的)。因此,它在启动时总是选择“最新”选项。