使用scala case类为spark表生成动态头文件

cigdeys3  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(540)
I have an existing case class having many fields 
case class output {
   userId : String, 
   timeStamp: String, 
   ...
}
And I am using it to generate header for a spark job like this.

--------------------
userId  | timeStamp|
-------------------- 
1         2324444444
2         2334445556

Now i want to add more columns to this and these column will be come from  map(attributeName, attributeValue) as attributeNames. So my question  is how can I add map to case class and then how can i use map key as  column value to generate dynamic columns. After this my final output  should be like 

----------------------------------------------------
userId  | timeStamp| attributeName1 | attributeName2
----------------------------------------------------
1         2324444444|               | 
2         2334445554|               |
rks48beu

rks48beu1#

你可以这样做

case class output {
       userId : String, 
       timeStamp: String,
       keyvalues: Map, 
       ...
    }
    import spark.implicits._
    import org.apache.spark.sql.functions._

    val df = spark.read.textFile(inputlocation).as[output]
    val keysDF = df.select(explode(map_keys($"keyvalues"))).distinct()
    val keys = keysDF.collect().map(f=>f.get(0)).map(f=>col("keyvalues").getItem(f).as(f.toString))
    df.select(col("userId") +: keyCols:_*)

或者你可以检查这个线程的其他方法。

相关问题