kafka消费群:仅阅读新邮件

7vux5j2d  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(276)

我用的是 kafka-node consumergroup阅读来自kafka的消息。我想阅读主题中的新消息,即不是所有以前未使用的消息,而是仅在创建consumergroup之后到达的消息。
选择 fromOffset: 'latest' 仅当给定groupid没有存储偏移量时才生效。我可以通过每次创建consumergroup时创建一个新的groupid来使用它,但是我希望避免这种方法,因为这样会在kafka服务器上创建大量groupid和存储的偏移量。
我还尝试手动将偏移设置为 -1 这样地:

offset = new kafka.Offset(consumer.client);
offset.commit(groupId, [{ topic: topic, partition: 0, offset: -1 }],
  function (err, data) {
    logger.debug("tried to set offset: ", data);

    if (err) logger.error("error", err);

  })
);

这将返回一个错误代码0,该代码应该是成功的,但consumergroup随后会收到旧消息。
Kafka版本:0.10.0Kafka节点:1.6.2

jhiyze9q

jhiyze9q1#

commit(groupId, payloads, cb)

commit的第一个参数应该是groupid,而不是clientid。
您应该将消费者的组名传递给它。

kmynzznz

kmynzznz2#

对于那些注意到即使将“autocommit”设置为“false”以上提到的解决方案也不起作用,并且偏移量仍然存储在group:中的人,请确保在option object中设置附加标志:commitofsetsonfirstjoin:false。如果未设置标志,则偏移量将按组存储,并且一旦第二次启动,它将获取从该偏移量开始的所有消息
我希望这有助于节省(很多)时间;)

pdtvr36n

pdtvr36n3#

使用以下选项启动kafka consumergroup:

autoCommit: false,   
 fromOffset: 'latest'

创建一个consumergroup,它永远不会向kafka提交偏移量(除非您手动执行,这是不需要的)。因此,它在启动时总是选择“最新”选项。

相关问题