使用变量模式从dataframe列读取json

myzjeezk  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(429)

我有一个Dataframe,它有一个列,其中包含一些嵌套的json,也有一个变量schema。每行中的json都有不同的模式。
例如

Key     Value
1       {"foo":"bar"}
2       {"key1":"val1","key2":"val2"}

我需要对此进行解析,并创建一个最终的Dataframe,其中包含根据json模式组合而成的所有列及其各自的值,如下所示。

Key     foo     key1        key2
1       bar     null        null
2       null    val1        val2
raogr8fs

raogr8fs1#

val data = Seq((1, """{"foo":"bar"}"""), 
             (2, """{"key1":"val1","key2":"val2"}"""),
             (3, """{"key1":"val1","key3":"val3", "key4": "val4"}"""))

val df = spark.createDataFrame(
  data
).toDF("num", "keyvalue")

df.show()

输出:

+---+---------------------------------------------+
|num|keyvalue                                     |
+---+---------------------------------------------+
|1  |{"foo":"bar"}                                |
|2  |{"key1":"val1","key2":"val2"}                |
|3  |{"key1":"val1","key3":"val3", "key4": "val4"}|
+---+---------------------------------------------+

将keyvalue json对象中的值转换为scalaMap对象。我们叫它mapped\u df

import scala.util.parsing.json._
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

import spark.implicits._

val stringToMap = udf((str: String) => JSON.parseFull(str).get.asInstanceOf[Map[String, String]])

val mapped_df =df.withColumn("mapped", stringToMap(col("keyvalue")))

mapped_df.show(false)

输出(df):-

+---+---------------------------------------------+------------------------------------------+
|num|keyvalue                                     |mapped                                    |
+---+---------------------------------------------+------------------------------------------+
|1  |{"foo":"bar"}                                |[foo -> bar]                              |
|2  |{"key1":"val1","key2":"val2"}                |[key1 -> val1, key2 -> val2]              |
|3  |{"key1":"val1","key3":"val3", "key4": "val4"}|[key1 -> val1, key3 -> val3, key4 -> val4]|
+---+---------------------------------------------+------------------------------------------+

通过从上面的Map列收集所有唯一键来创建新的Dataframe架构

var schema = List(StructField("number", IntegerType))
val col_rdd = mapped_df.select(col("mapped")).rdd.map(x => {
    val maps: Map[String, String] = x.getAs[Map[String, String]]("mapped")
    val m = maps.map(x =>  x._1)
    m
})

val schem = col_rdd.flatMap(x => x).collect().sorted.toSet
val new_schema = schem.toList.map(x => StructField(x, StringType, true))
schema = schema ++ new_schema

输出-

schema: List[org.apache.spark.sql.types.StructField] = List(
StructField(number,IntegerType,true), 
StructField(key4,StringType,true), 
StructField(key1,StringType,true), 
StructField(key2,StringType,true), 
StructField(key3,StringType,true), 
StructField(foo,StringType,true))

现在我们创建了模式。将Map的RDU df转换为rdd,并对其执行以下操作,以便与我们的新架构保持一致:

val df_rdd = mapped_df.rdd.map(row => {
    val num = List(row.getAs[Int]("num"))
    val map_val: Map[String, String] = row.getAs[Map[String, String]]("mapped")
    val new_cols = schem.toList.map(x => map_val.getOrElse(x, null))
    Row.fromSeq(num ++ new_cols)
})

val new_dataframe = spark.createDataFrame(df_rdd, StructType(schema))

new_dataframe.show(false)

新Dataframe与给定Dataframe

New Dataframe

+------+----+----+----+----+----+
|number|key4|key1|key2|key3| foo|
+------+----+----+----+----+----+
|     1|null|null|null|null| bar|
|     2|null|val1|val2|null|null|
|     3|val4|val1|null|val3|null|
+------+----+----+----+----+----+

Given Dataframe:

+---+---------------------------------------------+
|num|keyvalue                                     |
+---+---------------------------------------------+
|1  |{"foo":"bar"}                                |
|2  |{"key1":"val1","key2":"val2"}                |
|3  |{"key1":"val1","key3":"val3", "key4": "val4"}|
+---+---------------------------------------------+

谢谢!

相关问题