我使用KafkaSink作为我的flink应用程序中的接收器,我需要根据一些键-值对将stringifiedJSON发送到不同的Kafka主题(例如,一些JSON发送到topic 1,一些其他接收器发送到另一个主题topic 2,以此类推)。但是我在文档中没有找到任何方法来配置Kafka主题,使其根据传入的数据流进行选择。有人能帮我吗?
注意:我使用的是flink版本14.3
DataStream<String> data = .....
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(parameter.get("bootstrap.servers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(parameter.get("kafka.output.topic"))
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
data.sinkTo(sink);
2条答案
按热度按时间qlfbtfca1#
我没有尝试过这种方法,但我相信,与其使用
setTopic
将接收器硬连接到特定主题,不如在自定义KafkaRecordSerializationSchema
上实现serialize
方法,以便它返回的每个ProducerRecord
都指定它应该写入的主题。另一种选择是为每个主题创建一个单独的接收器对象,然后使用一个
ProcessFunction
,该ProcessFunction
扇出到一组端输出,每个端输出连接到适当的接收器。zlwx9yxi2#
我可以通过使用@DavidAnderson建议的自定义序列化方法实现KafkaRecordSerializationSchema,将输出接收到多个Kafka主题。
我通过setRecordSerializer方法配置了Kafka接收器来使用它。