使用kafka连接器在kafka主题之间复制数据

b1zrtrql  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(594)

我是Kafka的新手,现在我需要将数据从一个Kafka主题复制到另一个。我想知道有什么可能的办法?我能想到的方法如下:
Kafka消费者+Kafka制作人
Kafka河
KafkaFlume连接器+生产商
Kafka消费者+源连接器
我的问题是:有可能在两者之间使用两个Kafka连接器吗?e、 g.接收器连接器+电源连接器。是的,你能给我举几个好例子吗?或者一些如何做到这一点的提示?
提前谢谢!

balp4ylt

balp4ylt1#

您列出的所有方法都是可能的。哪一个是最好的实际上取决于您想要对流程的控制,或者它是一次性操作还是您想要继续运行的东西。
kafka streams提供了一种通过dsl将一个主题转换为另一个主题的简单方法
您可以这样做(演示代码显然不是用于生产的!):

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final Serde<byte[]> bytesSerdes = Serdes.ByteArray();
final StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], byte[]> input = builder.stream(
        "input-topic",
        Consumed.with(bytesSerdes, bytesSerdes)
);
input.to("output-topic", Produced.with(bytesSerdes, bytesSerdes));

final KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
    streams.start();
    Thread.sleep(60000L);
} catch (Exception e) {
    e.printStackTrace();
} finally {
    streams.close();
}

相关问题