如何在apache flink kafka sink中动态选择Kafka主题?

lyr7nygr  于 2022-12-16  发布在  Apache
关注(0)|答案(2)|浏览(154)

我使用KafkaSink作为我的flink应用程序中的接收器,我需要根据一些键-值对将stringifiedJSON发送到不同的Kafka主题(例如,一些JSON发送到topic 1,一些其他接收器发送到另一个主题topic 2,以此类推)。但是我在文档中没有找到任何方法来配置Kafka主题,使其根据传入的数据流进行选择。有人能帮我吗?
注意:我使用的是flink版本14.3

DataStream<String> data = .....
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers(parameter.get("bootstrap.servers"))
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic(parameter.get("kafka.output.topic"))
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
            )
            .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();
    data.sinkTo(sink);
qlfbtfca

qlfbtfca1#

我没有尝试过这种方法,但我相信,与其使用setTopic将接收器硬连接到特定主题,不如在自定义KafkaRecordSerializationSchema上实现serialize方法,以便它返回的每个ProducerRecord都指定它应该写入的主题。
另一种选择是为每个主题创建一个单独的接收器对象,然后使用一个ProcessFunction,该ProcessFunction扇出到一组端输出,每个端输出连接到适当的接收器。

zlwx9yxi

zlwx9yxi2#

我可以通过使用@DavidAnderson建议的自定义序列化方法实现KafkaRecordSerializationSchema,将输出接收到多个Kafka主题。

public class CustomSchema implements KafkaRecordSerializationSchema<Tuple2<String,String>> {

private final String encoding = StandardCharsets.UTF_8.name();

@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, String> input, KafkaSinkContext kafkaSinkContext, Long aLong) {
    String topic = input.f0;
    String data = input.f1;
    try {
        byte[] value = data==null ? null:data.getBytes(this.encoding);
        return new ProducerRecord<>(topic,value);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
    }
}

我通过setRecordSerializer方法配置了Kafka接收器来使用它。

相关问题