如何将flink中的protobuf字节数组写入kafka

agxfikkp  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(944)

我是Flink的新手。我只想把protobuf pojo作为字节数组放到kafka中。所以我的 FlinkKafkaProducer 看起来像这样:

FlinkKafkaProducer<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
        stringInputStream
                .map(//here returns byte[])
                .addSink(flinkKafkaProducer);

public static FlinkKafkaProducer<String> createStringProducer(String topic, String kafkaAddress) {
        return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema());
    }

现在它工作正常,但我的输出是字符串。我试着补充 TypeInformationSerializationSchema() 而不是 new SimpleStringSchema() 改变输出,但我不知道如何调整它正确。找不到任何教程。有人能帮忙吗?

eoxn13cs

eoxn13cs1#

在这件事上找到文件确实很棘手。我假设你使用flink>=1.9。在这种情况下,应采取以下措施:

private static class PojoKafkaSerializationSchema implements KafkaSerializationSchema<YourPojo> {
    @Override
    public void open(SerializationSchema.InitializationContext context) throws Exception {}

    @Override
    public ProducerRecord<byte[], byte[]> serialize(YourPojo element,@Nullable Long timestamp) {
        // serialize your POJO here and return a Kafka `ProducerRecord`
        return null;
    }
}

// Elsewhere: 
PojoKafkaSerializationSchema schema = new PojoKafkaSerializationSchema();
FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
    "test-topic",
    schema,
    properties,
    FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
);

这段代码主要是受这个测试用例的启发,但我没有时间实际运行它。

42fyovps

42fyovps2#

所以,我终于想出了如何将protobuf作为字节数组写入kafka producer。问题在于序列化。如果是波乔·Flink用的是自由 Kryo 用于自定义反序列化。编写protobuf的最好方法是使用 ProtobufSerializer.class . 在本例中,我将读取kafka字符串消息,并将其作为字节数组写入。梯度依赖性:

compile (group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'){
        exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
    }
    implementation 'com.google.protobuf:protobuf-java:3.11.0'

注册:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().registerTypeWithKryoSerializer(MyProtobuf.class, ProtobufSerializer.class);

Kafka塞里扎尔卡斯

@Data
@RequiredArgsConstructor
public class MyProtoKafkaSerializer implements KafkaSerializationSchema<MyProto> {
    private final String topic;
    private final byte[] key;

    @Override
    public ProducerRecord<byte[], byte[]> serialize(MyProto element, Long timestamp) {

        return new ProducerRecord<>(topic, key, element.toByteArray());
    }
}

工作

public static FlinkKafkaProducer<MyProto> createProtoProducer(String topic, String kafkaAddress) {
        MyProtoKafkaSerializer myProtoKafkaSerializer = new MyProtoKafkaSerializer(topic);
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaAddress);
        props.setProperty("group.id", consumerGroup);
        return new FlinkKafkaProducer<>(topic, myProtoKafkaSerializer, props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
    }

 public static FlinkKafkaConsumer<String> createProtoConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaAddress);
        props.setProperty("group.id", kafkaGroup);
        return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
    }

DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
        FlinkKafkaProducer<MyProto> flinkKafkaProducer = createProtoProducer(outputTopic, address);
        stringInputStream
                .map(hashtagMapFunction)
                .addSink(flinkKafkaProducer);

        environment.execute("My test job");

资料来源:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-为您的flink程序定制序列化程序
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#protobuf-经kryo

相关问题