如何在flink(scala)中反序列化来自kafka的avro消息?

lvmkulzt  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(741)

我正在把Kafka的信息读到flink shell(scala)中,如下所示:

scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091

这里,我使用simplestringschema()作为反序列化程序,但实际上消息有另一个avro模式(比如msg.avsc)。如何基于这个不同的avro模式(msg.avsc)创建一个反序列化程序来反序列化传入的kafka消息?
我还没有找到任何在scala中实现这一点的代码示例或教程,所以任何输入都会有所帮助。看来,我可能需要扩展和实施
org.apache.flink.streaming.util.serialization.deserializationschema
但我不知道该怎么做。任何教程或说明都会很有帮助。因为,我不想做任何自定义处理,而只是根据avro模式(msg.avsc)解析消息,任何快速的方法都会非常有用。

hpcdzsge

hpcdzsge1#

我在java中找到了avrodeserializationschema类的示例
https://github.com/okkam-it/flink-examples/blob/master/src/main/java/org/okkam/flink/avro/avrodeserializationschema.java
代码段:
如果您想反序列化到特定的case类中,那么使用 new FlinkKafkaConsumer011[case_class_name] , new AvroDeserializationSchema[case_class_name](classOf[case_class_name] ```
val stream = env .addSource(new FlinkKafkaConsumer011[DeviceData]
("test", new AvroDeserializationSchemacase_class_name, properties))

如果您使用confluent的模式注册表,那么首选的解决方案是使用confluent提供的avro serde。我们只需调用deserialize(),最新版本的avro模式的解析将在后台自动完成,不需要进行字节操作。
类似于下面的scala。

import io.confluent.kafka.serializers.KafkaAvroDeserializer

...

val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)

...

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): KafkaKV = {

val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]

KafkaKV(key, value)
}

...

详细说明如下:http://svend.kelesia.com/how-to-integrate-flink-with-confluents-schema-registry.html#how-将flink与合流模式注册表集成
希望有帮助!

相关问题