我试图创建一个Kafkaavro系列主题的flink消费者。我有Kafka主题流avro序列化数据。我可以通过avroconsoleconsumer看到。
flink1.6.0添加了一个avrodeserializationschema,但我找不到完整的用法示例。是的,有一些生成了avrodeserialization类,似乎是在1.6.0之前添加的类。
我有一个通过avro工具生成的avro类。
现在,我一直在努力遵循现有的例子,但他们是不同的,我不能让事情继续下去(我不经常用java编程)
大多数使用以下形式
Myclass mc = new MyClass();
AvroDeserializationSchema<Myclass> ads = new AvroDeserializationSchema<> (Myclass.class);
FlinkKafkaConsumer010<Myclass> kc = new FlinkKafkaConsumer010<>(topic,ads,properties);
其中myclass是通过avro工具jar生成的avro类。走这条路对吗?在执行此操作和利用内部flink 1.6.0 avrodeserializationschema类时,我遇到了一些私有/公共访问问题。我必须创建一个新类并扩展avrodeserializationschema吗?
1条答案
按热度按时间66bbxpm51#
好的,我深入研究了kafka消费者javadocs,并找到了一个示例来使用avro流。我仍然需要将Kafka消费转换为flinkkafkaconsumer,但是下面的代码可以工作。
为了使io.confluent引用工作,我必须向pom文件添加一个存储库和一个依赖项。
}