kafka streams处理器api允许多集群拓扑?

vawmfj5a  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(310)

使用kafka处理器api(不是dsl)从源主题读取并写入目标主题,对于单个kafka群集设置(即,如果源主题和目标主题都位于同一个群集上)来说效果很好,但是当源主题和目标主题位于不同的kafka群集上时,我得到的是目标处理器上下文的nullpointerexception

Topology
topology.addSource("mySource", "SourceTopic");
topology.addProcessor("SourceStreamProcessor",()->new SourceStreamProcessor(), "mySource");         

topology.addProcessor("TargetProcessor",()->new TargetProcessor(), "Target");
topology.addSink("sink1","OUTPUT_TOPIC1","TargetProcessor");
topology.addSink("sink2","OUTPUT_TOPIC2","TargetProcessor");

Properties sourceProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SourceStreamProcessor"); // Kafka Cluster 1
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_Cluser_xx.org:9092");

Properties targetProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "targetStreamProcessor"); // Kafka Cluster 2
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test_Cluser_xx.org:9092");

如何使用kafka streams处理器api将一个集群中的一个主题写入另一个集群中的另一个主题?

2ledvvac

2ledvvac1#

kafka流不支持从一个kafka集群读取数据并向另一个kafka集群写入数据。
您可以在一个集群中处理消息,然后使用mirror maker将其复制到另一个集群。

相关问题