我是Flink的新手。我只想把protobuf pojo作为字节数组放到kafka中。所以我的 FlinkKafkaProducer
看起来像这样:
FlinkKafkaProducer<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream
.map(//here returns byte[])
.addSink(flinkKafkaProducer);
public static FlinkKafkaProducer<String> createStringProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema());
}
现在它工作正常,但我的输出是字符串。我试着补充 TypeInformationSerializationSchema()
而不是 new SimpleStringSchema()
改变输出,但我不知道如何调整它正确。找不到任何教程。有人能帮忙吗?
2条答案
按热度按时间eoxn13cs1#
在这件事上找到文件确实很棘手。我假设你使用flink>=1.9。在这种情况下,应采取以下措施:
这段代码主要是受这个测试用例的启发,但我没有时间实际运行它。
42fyovps2#
所以,我终于想出了如何将protobuf作为字节数组写入kafka producer。问题在于序列化。如果是波乔·Flink用的是自由
Kryo
用于自定义反序列化。编写protobuf的最好方法是使用ProtobufSerializer.class
. 在本例中,我将读取kafka字符串消息,并将其作为字节数组写入。梯度依赖性:注册:
Kafka塞里扎尔卡斯
工作
资料来源:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-为您的flink程序定制序列化程序
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#protobuf-经kryo