我正在把Kafka的信息读到flink shell(scala)中,如下所示:
scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091
这里,我使用simplestringschema()作为反序列化程序,但实际上消息有另一个avro模式(比如msg.avsc)。如何基于这个不同的avro模式(msg.avsc)创建一个反序列化程序来反序列化传入的kafka消息?
我还没有找到任何在scala中实现这一点的代码示例或教程,所以任何输入都会有所帮助。看来,我可能需要扩展和实施
org.apache.flink.streaming.util.serialization.deserializationschema
但我不知道该怎么做。任何教程或说明都会很有帮助。因为,我不想做任何自定义处理,而只是根据avro模式(msg.avsc)解析消息,任何快速的方法都会非常有用。
1条答案
按热度按时间hpcdzsge1#
我在java中找到了avrodeserializationschema类的示例
https://github.com/okkam-it/flink-examples/blob/master/src/main/java/org/okkam/flink/avro/avrodeserializationschema.java
代码段:
如果您想反序列化到特定的case类中,那么使用
new FlinkKafkaConsumer011[case_class_name]
,new AvroDeserializationSchema[case_class_name](classOf[case_class_name]
```val stream = env .addSource(new FlinkKafkaConsumer011[DeviceData]
("test", new AvroDeserializationSchemacase_class_name, properties))
import io.confluent.kafka.serializers.KafkaAvroDeserializer
...
val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
...
override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): KafkaKV = {
...