我在apache flink中使用kafka连接器来访问由合流kafka服务的流。
除了架构注册表url ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
应为“reader”架构。我不想提供read模式,而是想使用同一个writer的模式(在注册表中查找)来读取消息,因为使用者不会有最新的模式。
FlinkKafkaConsumer010<GenericRecord> myConsumer =
new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html “使用这些反序列化架构记录将与从架构注册表检索并转换为静态提供的架构一起读取”
既然我不想在用户端保留模式定义,那么如何使用writer的模式反序列化来自kafka的avro消息呢?
谢谢你的帮助!
1条答案
按热度按时间slmsl1lt1#
我认为不可能直接使用
ConfluentRegistryAvroDeserializationSchema.forGeneric
. 它打算与读卡器模式一起使用,并且它们具有检查这一点的前提条件。你必须实现你自己的。两件重要的事情:
套
specific.avro.reader
为false(否则会得到特定的记录)这个
KafkaAvroDeserializer
必须延迟初始化(因为它本身不可序列化,因为它持有对schema registry客户机的引用)