从一开始就读Kafka的主题

g6ll5ycj  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(413)

我有一个spring-boot应用程序,它不断读取来自特定主题的所有消息。我的问题是,当应用程序重新启动时,它需要再次读取主题中的所有消息。我原以为这两个选项结合起来就行了,但行不通:
重置偏移:真
startoffset:最早
这是我的方法;

@StreamListener(myTopic)
 public void handle(@Payload Input input) {
    /**Do other stuff not related**/
 }

这是我的申请表

spring:
  cloud.stream:
    bindings:
      myTopic:
        destination: TOPIC
        content-type: application/**avro
        group: group-topic
        concurrency: 1
        resetOffsets: true
        startOffset: earliest
tzdcorbm

tzdcorbm1#

你得换衣服 group 在application.yaml中设置为新的唯一组名(请参见下面的示例,其中我将使用者组设置为新的使用者组id:

spring:
  cloud.stream:
    bindings:
      myTopic:
        destination: TOPIC
        content-type: application/**avro
        group: new-consumer-group-id
        concurrency: 1
        resetOffsets: true
        startOffset: earliest

如果继续使用同一个consumergroup,配置 resetOffset 以及 startingOffset 不会有任何影响。
或者,可以使用命令行工具重置每个使用者组的偏移 kafka-consumer-groups.sh . 模板如下所示:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
 --execute --reset-offsets \
 --group groupA\
 --topic topicC \
 --partition 0 \
 --to-offset <insert number>

相关问题