使用kafka处理器api(不是dsl)从源主题读取并写入目标主题,对于单个kafka群集设置(即,如果源主题和目标主题都位于同一个群集上)来说效果很好,但是当源主题和目标主题位于不同的kafka群集上时,我得到的是目标处理器上下文的nullpointerexception
Topology
topology.addSource("mySource", "SourceTopic");
topology.addProcessor("SourceStreamProcessor",()->new SourceStreamProcessor(), "mySource");
topology.addProcessor("TargetProcessor",()->new TargetProcessor(), "Target");
topology.addSink("sink1","OUTPUT_TOPIC1","TargetProcessor");
topology.addSink("sink2","OUTPUT_TOPIC2","TargetProcessor");
Properties sourceProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SourceStreamProcessor"); // Kafka Cluster 1
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_Cluser_xx.org:9092");
Properties targetProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "targetStreamProcessor"); // Kafka Cluster 2
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test_Cluser_xx.org:9092");
如何使用kafka streams处理器api将一个集群中的一个主题写入另一个集群中的另一个主题?
1条答案
按热度按时间2ledvvac1#
kafka流不支持从一个kafka集群读取数据并向另一个kafka集群写入数据。
您可以在一个集群中处理消息,然后使用mirror maker将其复制到另一个集群。