kafka-avro格式在spark结构化流中的反序列化

wn9m85ua  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(604)

我正在使用spark结构化流媒体,如本页所述。
我从Kafka主题得到了正确的信息,但值是avro格式的。有没有反序列化avro记录的方法(比如 KafkaAvroDeserializer 接近)?

wtlkbnrh

wtlkbnrh1#

Spark>=2.4
你可以用 from_avro 函数来自 spark-avro 图书馆。

import org.apache.spark.sql.avro._

val schema: String = ???
df.withColumn("value", from_avro($"value", schema))

Spark<2.4
定义一个函数 Array[Byte] (序列化对象):

import scala.reflect.runtime.universe.TypeTag

def decode[T : TypeTag](bytes: Array[Byte]): T = ???

它将反序列化avro数据并创建对象,该对象可以存储在 Dataset .
创建 udf 基于函数。

val decodeUdf  = udf(decode _)

呼叫 udfvalue ```
val df = spark
.readStream
.format("kafka")
...
.load()

df.withColumn("value", decodeUdf($"value"))

相关问题