如何从apachenifi中最后提交的偏移量读取consumer中的kafka消息?

68bkxrlz  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(406)

我已经开始我的生产者发送数据到Kafka,也开始我的消费者拉相同的数据。当我在apache nifi中使用consumekafka处理器(Kafka版本1.0)时,我脑子里很少有与Kafka消费者相关的查询。
q、 1)当我第一次启动consumekafka处理器时,如何从开始和当前消息读取消息?
q、 2)以及在Kafka消费者关机的情况下,如何读取最后一条消费信息之后的信息?
在使用apachenifi时,我们如何实现以上两个呢?

6ojccjat

6ojccjat1#

consumekafka处理器有一个名为“offset reset”的属性,当消费组id没有以前的偏移量或偏移量不再存在时使用该属性。此属性的选项为“偏移最新”或“偏移最早”,默认为“最新”。
因此,如果您使用以前从未使用过的使用者组id启动consumekafka处理器,那么它将从最新消息开始消费。之后,如果启动和停止处理器,它将从上次消耗的偏移量开始。
如果您想再次使用“偏移重置”来强制将其设置为最早或最晚,则需要更改使用者组id,否则现有使用者组将始终使用现有偏移开始。
您不能同时从开始和当前读取消息,您可以从头开始,一直读到当前,也可以从当前开始。这是Kafka的工作方式,不是nifi特有的。

相关问题