scala—如何将此数据集转换为以下数据集

6tqwzwtp  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(406)

输入

+------+------+------+------+
|emp_name|emp_area| dept|zip| 
+------+------+------+------+
|ram|USA|"Sales"|805912|     
|sham|USA|"Sales"|805912|   
|ram|Canada|"Marketing"|805912|   
|ram|USA|"Sales"|805912|
|sham|USA|"Marketing"|805912|      
+------+------+------+------

期望输出

feature   |Top1 name |Top 1 value1|Top2 name|top 2 value|

emp_name    ram |3|sham |2
emp_area    Usa |4|canada |1    
dept       sales|3|Marketing|3
zip         805912|5|NA|NA

我一开始是动态地为它们中的每一个生成计数,但无法将它们存储在数据集中

val features=ds.columns.toList
for (e <- features) {
  val ds1=ds.groupBy(e).count().sort(desc("count")).limit(5).withColumnRenamed("count", e+"_count")
}

现在,如何将所有值收集到一个Dataframe中并转换为输出?

hk8txs48

hk8txs481#

这里有一个稍微冗长的方法。你可以 map 每列到一个具有一行的Dataframe,该行对应于所需输出中的行。必要时添加na列。将列名转换为所需的列名,最后执行 unionAll 合并Dataframe(每行一个)。

import org.apache.spark.sql.expressions.Window

val top = 2

val result = ds.columns.map(
    c => ds.groupBy(c).count()
           .withColumn("rn", row_number().over(Window.orderBy(desc("count"))))
           .filter(s"rn <= $top")
           .groupBy().pivot("rn")
           .agg(first(col(c)), first(col("count")))
           .select(lit(c), col("*"))
).map(df => 
    if (df.columns.size != 1 + top*2)
        df.select(List(col("*")) ::: (1 to (top*2+1 - df.columns.size)).toList.map(x => lit("NA")): _*)
    else df
).map(df =>
    df.toDF(List("feature") ::: (1 to top).toList.flatMap(x => Seq(s"top$x name", s"top$x value")): _*)
).reduce(_ unionAll _)

result.show
+--------+---------+----------+---------+----------+
| feature|top1 name|top1 value|top2 name|top2 value|
+--------+---------+----------+---------+----------+
|emp_name|      ram|         3|     sham|         2|
|emp_area|      USA|         4|   Canada|         1|
|    dept|    Sales|         3|Marketing|         2|
|     zip|   805912|         5|       NA|        NA|
+--------+---------+----------+---------+----------+

相关问题