我正在使用spark结构化流媒体,如本页所述。我从Kafka主题得到了正确的信息,但值是avro格式的。有没有反序列化avro记录的方法(比如 KafkaAvroDeserializer 接近)?
KafkaAvroDeserializer
wtlkbnrh1#
Spark>=2.4你可以用 from_avro 函数来自 spark-avro 图书馆。
from_avro
spark-avro
import org.apache.spark.sql.avro._ val schema: String = ??? df.withColumn("value", from_avro($"value", schema))
Spark<2.4定义一个函数 Array[Byte] (序列化对象):
Array[Byte]
import scala.reflect.runtime.universe.TypeTag def decode[T : TypeTag](bytes: Array[Byte]): T = ???
它将反序列化avro数据并创建对象,该对象可以存储在 Dataset .创建 udf 基于函数。
Dataset
udf
val decodeUdf = udf(decode _)
呼叫 udf 在 value ```val df = spark.readStream.format("kafka")....load()
value
df.withColumn("value", decodeUdf($"value"))
1条答案
按热度按时间wtlkbnrh1#
Spark>=2.4
你可以用
from_avro
函数来自spark-avro
图书馆。Spark<2.4
定义一个函数
Array[Byte]
(序列化对象):它将反序列化avro数据并创建对象,该对象可以存储在
Dataset
.创建
udf
基于函数。呼叫
udf
在value
```val df = spark
.readStream
.format("kafka")
...
.load()
df.withColumn("value", decodeUdf($"value"))