kafka streams输出主题是否可以在单独的集群上?

wsewodh2  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(306)

我有一个主题,其中所有日志都被推到集中的主题,但如果可能的话,我想过滤掉一些记录到一个单独的主题和集群中。
谢谢

dy2hfwbg

dy2hfwbg1#

kafka streams不允许使用来自不同kafka群集的源和输出主题创建流。所以下面的代码将不适用于您

streamsBuilder.stream(sourceTopicName).filter(..).to(outputTopicName)

在本例中,它期望outputtopicname与topic sourcetopicname来自同一集群。
作为一种解决方法,为了将消息从另一个集群发送到输出主题,您可以使用另外创建的kafkaproducer with property bootstrap.servers 指向外部集群和 KStream.foreach() 方法。

streamsBuilder.stream(sourceTopicName)
    .filter((key, value) -> ..)
    .foreach((key, value) -> 
        sendMessage(kafkaProducerFromAnotherCluster, destinationTopicName, key, value);

public static void sendMessage(KafkaProducer<String, String> kafkaProducer, 
                               String destinationTopicName, String key, String value) {
    try {
        kafkaProducer.send(new ProducerRecord(destinationTopicName, key, value));
    } catch (RuntimeException ex) {
        log.error(errorMessage, ex);
    }
}

另一个选项是在kafka集群中创建输出主题,该主题将过滤消息,并在两个集群之间设置kafka镜像(因此消息将从一个主题复制到另一个集群的第二个主题)。

相关问题