目前,我正在开发Apache Beam Pipeline实现,它使用来自三个不同Kafka主题的数据,经过一些处理后,我创建了三种类型的对象,添加了来自上述Kafka主题的数据。需要将这三个对象发布到三个不同的Kafka主题中。可以使用KafkaIO.read
中的withTopics
方法从多个主题中读取,但我没有找到KafkaIO功能来写入多个主题。
我想得到一些如何以最理想的方式做到这一点的建议,如果有人能提供一些代码示例,我将不胜感激。
目前,我正在开发Apache Beam Pipeline实现,它使用来自三个不同Kafka主题的数据,经过一些处理后,我创建了三种类型的对象,添加了来自上述Kafka主题的数据。需要将这三个对象发布到三个不同的Kafka主题中。可以使用KafkaIO.read
中的withTopics
方法从多个主题中读取,但我没有找到KafkaIO功能来写入多个主题。
我想得到一些如何以最理想的方式做到这一点的建议,如果有人能提供一些代码示例,我将不胜感激。
3条答案
按热度按时间uqdfh47h1#
您可以在
PCollection
上使用3个不同的接收器,例如:在该示例中,相同的
PCollection
经由多个接收器被接收到3个不同的主题中。gopyfrb32#
一些Beam接收器(如BigQueryIO)支持“动态目的地”,但KafkaIO不支持。您需要为不同的主题设置3个不同的接收器,并且需要将消息拆分(可能使用
Partition
转换)以分离集合,然后将其馈送到这些接收器。mznpcxlj3#
你可以使用
KafkaIO.<K, V>writeRecords()
,它把PCollection<ProducerRecord<K, V>>
作为输入,所以你只需要在ProducerRecord
中为每个元素指定一个必需的输出主题,或者使用默认的主题。请看一下这个测试的例子。