模仿scala的pyspark asdict(),不使用case类

shstlldc  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(374)

pyspark允许您在使用以下方法从Dataframe返回单行时创建字典。

t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).collect()[0].asDict()
print(t)
print(t["key"])
print(t["value"])
print(t["rw"])
print("Printing using for comprehension")
[print(t[i]) for i in t ]

Results:

{'key': 'spark.app.id', 'value': 'local-1594577194330', 'rw': 1}
spark.app.id
local-1594577194330
1
Printing using for comprehension
spark.app.id
local-1594577194330
1

我在scala spark也试过。可以使用case类方法。

case class download(key:String, value:String,rw:Long)

val t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).as[download].first
println(t)
println(t.key)
println(t.value)
println(t.rw)

结果:

download(spark.app.id,local-1594580739413,1)
spark.app.id
local-1594580739413
1

在实际问题中,我有将近200多列,不想使用case类方法。我正在尝试下面这样的方法来避免case类选项。

val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))

(df.columns).zip(df.take(1)(0))

但是有个错误。

<console>:28: error: type mismatch;
 found   : (String, String, Long)
 required: Iterator[?]
       (df.columns.toIterator).zip(df.take(1)(0))

有没有办法解决这个问题。

xggvc2p6

xggvc2p61#

问题是 zip 是一个集合上的方法,该方法只能接受另一个实现iterableonce的集合对象,并且 df.take(1)(0) 是Sparksql Row ,不属于这一类。
尝试将行转换为 Seq 使用its toSeq 方法。

df.columns.zip(df.take(1)(0).toSeq)

结果:

Array((key,spark.app.id), (value,local-1594577194330), (rw,1))
nukf8bse

nukf8bse2#

在scala中,有一种方法 getValuesMap 转换 row 进入 Map[columnName: String, columnValue: T] . 尝试使用以下相同的方法-

val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))
    df.show(false)
    df.printSchema()

    /**
      * +----------------------------+-------------------+---+
      * |key                         |value              |rw |
      * +----------------------------+-------------------+---+
      * |spark.app.id                |local-1594644271573|1  |
      * |spark.app.name              |TestSuite          |2  |
      * |spark.driver.host           |192.168.1.3        |3  |
      * |spark.driver.port           |58420              |4  |
      * |spark.executor.id           |driver             |5  |
      * |spark.master                |local[2]           |6  |
      * |spark.sql.shuffle.partitions|2                  |7  |
      * +----------------------------+-------------------+---+
      *
      * root
      * |-- key: string (nullable = false)
      * |-- value: string (nullable = false)
      * |-- rw: integer (nullable = true)
      */

    val map = df.head().getValuesMap(df.columns)
    println(map)
    println(map("key"))
    println(map("value"))
    println(map("rw"))
    println("Printing using for comprehension")
    map.foreach(println)

    /**
      * Map(key -> spark.app.id, value -> local-1594644271573, rw -> 1)
      * spark.app.id
      * local-1594644271573
      * 1
      * Printing using for comprehension
      * (key,spark.app.id)
      * (value,local-1594644271573)
      * (rw,1)
      */

相关问题