Flink Apache Beam KafkaIO -写入多个主题

ftf50wuq  于 2022-12-16  发布在  Apache
关注(0)|答案(3)|浏览(119)

目前,我正在开发Apache Beam Pipeline实现,它使用来自三个不同Kafka主题的数据,经过一些处理后,我创建了三种类型的对象,添加了来自上述Kafka主题的数据。需要将这三个对象发布到三个不同的Kafka主题中。可以使用KafkaIO.read中的withTopics方法从多个主题中读取,但我没有找到KafkaIO功能来写入多个主题。
我想得到一些如何以最理想的方式做到这一点的建议,如果有人能提供一些代码示例,我将不胜感激。

uqdfh47h

uqdfh47h1#

您可以在PCollection上使用3个不同的接收器,例如:

private transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void kafkaIOSinksTest(){
        PCollection<String> inputCollection = pipeline.apply(Create.of(Arrays.asList("Object 1", "Object 2")));
        
        inputCollection.apply(KafkaIO.<Void, String>write()
                .withBootstrapServers("broker_1:9092,broker_2:9092")
                .withTopic("topic1")
                .withValueSerializer(new StringSerializer())
                .values());

        inputCollection.apply(KafkaIO.<Void, String>write()
                .withBootstrapServers("broker_1:9092,broker_2:9092")
                .withTopic("topic2")
                .withValueSerializer(new StringSerializer())
                .values());

        inputCollection.apply(KafkaIO.<Void, String>write()
                .withBootstrapServers("broker_1:9092,broker_2:9092")
                .withTopic("topic3")
                .withValueSerializer(new StringSerializer())
                .values());
    }

在该示例中,相同的PCollection经由多个接收器被接收到3个不同的主题中。

gopyfrb3

gopyfrb32#

一些Beam接收器(如BigQueryIO)支持“动态目的地”,但KafkaIO不支持。您需要为不同的主题设置3个不同的接收器,并且需要将消息拆分(可能使用Partition转换)以分离集合,然后将其馈送到这些接收器。

mznpcxlj

mznpcxlj3#

你可以使用KafkaIO.<K, V>writeRecords(),它把PCollection<ProducerRecord<K, V>>作为输入,所以你只需要在ProducerRecord中为每个元素指定一个必需的输出主题,或者使用默认的主题。
请看一下这个测试的例子。

相关问题