我正在使用springcloudkafka活页夹将数据读取到kstream。在阅读某个主题的数据时,我需要从头开始阅读。
我试图设置Kafka偏移重置和启动偏移属性。但是,找不到任何参考资料。
你能帮我提供任何示例application.yaml来重置偏移量吗,这样我就可以从头开始使用主题中的消息了
添加我使用过的application.yaml:
spring.cloud.stream.bindings.input:
destination: input-topic1
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.output:
destination: output-topic
producer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.beginningInput:
destination: beginning-topic
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.beginningInput:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
resetOffsets: true
startOffset: earliest
spring.cloud.stream.kafka.streams.binder:
brokers: 127.0.0.1
zkNodes: 127.0.0.1
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000
1条答案
按热度按时间bejyjqdl1#
resetOffsets
是坏的。它在2.0.0.0版本中恢复。在这里公关。