我有一个spring-boot应用程序,它不断读取来自特定主题的所有消息。我的问题是,当应用程序重新启动时,它需要再次读取主题中的所有消息。我原以为这两个选项结合起来就行了,但行不通:
重置偏移:真
startoffset:最早
这是我的方法;
@StreamListener(myTopic)
public void handle(@Payload Input input) {
/**Do other stuff not related**/
}
这是我的申请表
spring:
cloud.stream:
bindings:
myTopic:
destination: TOPIC
content-type: application/**avro
group: group-topic
concurrency: 1
resetOffsets: true
startOffset: earliest
1条答案
按热度按时间tzdcorbm1#
你得换衣服
group
在application.yaml中设置为新的唯一组名(请参见下面的示例,其中我将使用者组设置为新的使用者组id:如果继续使用同一个consumergroup,配置
resetOffset
以及startingOffset
不会有任何影响。或者,可以使用命令行工具重置每个使用者组的偏移
kafka-consumer-groups.sh
. 模板如下所示: