如何在spark数据集中显示(或操作)kryo编码的对象?

9rygscc1  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(547)

假设你有这个:

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

什么时候做一件事 ds.show ,我得到:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

我理解这是因为内容被编码成内部sparksql二进制表示。但是我怎样才能像这样显示解码的内容呢?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

更新1
显示内容不是最大的问题,更重要的是它可能会导致处理数据集时出现问题,请考虑以下示例:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value);

这是不是意味着, kryo -编码类型无法执行以下操作 joinWith 方便地?我们如何处理自定义类型 Dataset 然后呢?

cngwdvgl

cngwdvgl1#

下面的代码对我很有用,但是看起来像是使用高级api来完成低级(反序列化)工作。
这并不是说应该这样做,而是表明这是可能的。
我不知道为什么kryodeserializer不将字节反序列化到字节来自的对象。就是这样。
你的类定义和我的类定义之间的一个主要区别是 case 让我用下面的技巧。再说一次,我也不知道为什么这会成为可能。

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
 |-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
  import java.nio.ByteBuffer
  ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

相关问题