这个问题在这里已经有答案了:
如何在单个kafka streams应用程序中连接到多个集群(2个答案)
去年关门了。
我们有一个场景,我们想在集群1上使用kafka主题的数据,但在集群2上创建ktable主题(重分区和changelog)。
通道绑定-
spring.cloud.stream.bindings.member.destination: member
spring.cloud.stream.bindings.member.consumer.useNativeDecoding: true
spring.cloud.stream.bindings.member.consumer.headerMode: raw
spring.cloud.stream.kafka.streams.bindings.member.consumer.keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.member.consumer.valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
创建ktable-
protected KTable<String, GenericRecord> createKTable(String field, KStream<String, GenericRecord> stream, String stateStore) {
return stream
.map((s, genericRecord) -> KeyValue.pair(field, genericRecord))
.groupByKey()
.reduce((oldVal, newVal) -> newVal, Materialized.as(stateStore));
}
所以成员主题在集群#1上,但是我们想在不同的集群上创建下面的ktable主题,不确定在这种情况下如何使用两个不同的kafka绑定-
application-member-store-repartition
application-member-store-changelog
1条答案
按热度按时间c9x0cxw01#
单个kafka streams应用程序只能连接到一个群集。根据下面链接的答案,您可以创建两个不同的示例,但它们将是不同的应用程序。
更多细节可以找到Kafka流-连接到多个集群