spark scala在将Dataframe转换为数据集时遇到问题

azpvetkf  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(534)

我有下面的数据框和下面的模式

db.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- id: string (nullable = true)
 |-- sparse_rep: struct (nullable = true)
 |    |-- 1: double (nullable = true)
 |    |-- 10: double (nullable = true)
 |    |-- 11: double (nullable = true)
 |    |-- 12: double (nullable = true)
 |    |-- 13: double (nullable = true)
 |    |-- 14: double (nullable = true)
 |    |-- 15: double (nullable = true)
 |    |-- 17: double (nullable = true)
 |    |-- 18: double (nullable = true)
 |    |-- 2: double (nullable = true)
 |    |-- 20: double (nullable = true)
 |    |-- 21: double (nullable = true)
 |    |-- 22: double (nullable = true)
 |    |-- 23: double (nullable = true)
 |    |-- 24: double (nullable = true)
 |    |-- 25: double (nullable = true)
 |    |-- 26: double (nullable = true)
 |    |-- 27: double (nullable = true)
 |    |-- 3: double (nullable = true)
 |    |-- 4: double (nullable = true)
 |    |-- 7: double (nullable = true)
 |    |-- 9: double (nullable = true)
 |-- title: string (nullable = true)

这里所有的ID看起来都很简单,除了稀疏表示。这个稀疏表示对象最初是在spark中作为map[int,double]对象创建的,然后写入mongodb。
但是,当我试图使用数据集将它强制回map[int,double]时

case class blogRow(_id:String, id:Int, sparse_rep:Map[Int,Double],title:String)

    val blogRowEncoder = Encoders.product[blogRow]
    db.as[blogRow](blogRowEncoder)

我得到以下错误。

Caused by: org.apache.spark.sql.AnalysisException:     need a map field but got struct<1:double,10:double,11:double,12:double,13:double,14:double,15:double,17:double,18:double,2:double,20:double,21:double,22:double,23:double,24:double,25:double,26:double,27:double,3:double,4:double,7:double,9:double>;
cwtwac6a

cwtwac6a1#

转换 struct 键入到 map 然后输入用例类。
中的数据架构 DataFrame &中的字段 case class 应该匹配。
检查以下代码。

scala> case class blogRow(_id:String, id:Int, sparse_rep:Map[Int,Double],title:String)
defined class blogRow
scala> val blogRowDF =  df
.withColumn("sparse_rep",map(
    df
    .select("sparse_rep.*")
    .columns
    .flatMap(c => List(lit(c).cast("int"),col(s"sparse_rep.${c}"))):_*)
)
.withColumn("_id",$"_id.oid")
.withColumn("id",$"id".cast("int"))
.as[blogRow]
scala> blogRowDF.show(false)
+---------+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|_id      |id  |sparse_rep                                                                                                                                                                                                                                                     |title      |
+---------+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|oid_value|null|Map(10 -> 10.0, 24 -> 24.0, 25 -> 25.0, 14 -> 14.0, 20 -> 20.0, 1 -> 1.0, 21 -> 21.0, 9 -> 9.0, 13 -> 13.0, 2 -> 2.0, 17 -> 17.0, 22 -> 22.0, 27 -> 27.0, 12 -> 12.0, 7 -> 7.0, 3 -> 3.0, 18 -> 18.0, 11 -> 11.0, 26 -> 26.0, 23 -> 23.0, 4 -> 4.0, 15 -> 15.0)|title_value|
+---------+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
nc1teljy

nc1teljy2#

另一种选择-

输入Dataframe架构

df.printSchema()
    /**
      * root
      * |-- _id: struct (nullable = true)
      * |    |-- oid: string (nullable = true)
      * |-- id: string (nullable = true)
      * |-- sparse_rep: struct (nullable = true)
      * |    |-- 1: double (nullable = true)
      * |    |-- 10: double (nullable = true)
      * |    |-- 11: double (nullable = true)
      * |    |-- 12: double (nullable = true)
      * |    |-- 13: double (nullable = true)
      * |    |-- 14: double (nullable = true)
      * |    |-- 15: double (nullable = true)
      * |    |-- 17: double (nullable = true)
      * |    |-- 18: double (nullable = true)
      * |    |-- 2: double (nullable = true)
      * |    |-- 20: double (nullable = true)
      * |    |-- 21: double (nullable = true)
      * |    |-- 22: double (nullable = true)
      * |    |-- 23: double (nullable = true)
      * |    |-- 24: double (nullable = true)
      * |    |-- 25: double (nullable = true)
      * |    |-- 26: double (nullable = true)
      * |    |-- 27: double (nullable = true)
      * |    |-- 3: double (nullable = true)
      * |    |-- 4: double (nullable = true)
      * |    |-- 7: double (nullable = true)
      * |    |-- 9: double (nullable = true)
      * |-- title: string (nullable = true)
      */

将输入dataframe的模式转换为与case类匹配,然后转换为dataset[row]->dataset[blogrow]

val ds =
      df.withColumn("sparse_rep", expr("from_json(to_json(sparse_rep), 'map<int, double>')"))
        .withColumn("_id",$"_id.oid")
        .withColumn("id",$"id".cast("int"))
      .as[BlogRow]
    ds.printSchema()

    /**
      * root
      * |-- _id: string (nullable = true)
      * |-- id: integer (nullable = true)
      * |-- sparse_rep: map (nullable = true)
      * |    |-- key: integer
      * |    |-- value: double (valueContainsNull = true)
      * |-- title: string (nullable = true)
      */

其中案例类别如下-

case class BlogRow(_id:String, id:Int, sparse_rep:Map[Int,Double],title:String)

相关问题