scala中sparkDataframe的模式转换

djmepvbi  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(413)

输入Dataframe:

{ 
  "C_1" : "A",
  "C_2" : "B",
  "C_3" : [ 
            {
              "ID" : "ID1",
              "C3_C2" : "V1",
              "C3_C3" : "V2"
            },
            {
              "ID" : "ID2",
              "C3_C2" : "V3",
              "C3_C3" : "V4"
            },
            {
              "ID" : "ID3",
              "C3_C2" : "V4",
              "C3_C3" : "V5"
            },
            ..
          ]
}

期望输出:

{ 
  "C_1" : "A",
  "C_2" : "B",
  "ID1" : {
              "C3_C2" : "V2",
              "C3_C3" : "V3"
          },
  "ID2" : {
              "C3_C2" : "V2",
              "C3_C3" : "V3"
          },
  "ID3" : {
              "C3_C2" : "V4",
              "C3_C3" : "V5"
          },
  ..
}
``` `C_3` 是一个数组 `n` 结构,每个项都有一个唯一的 `ID` . 新的Dataframe将转换 `n` 中的结构 `C_3` 分为不同的列,并根据 `ID` .
我是spark&scala的新手。任何关于如何实现这一转变的想法都将非常有用。
谢谢!
x759pob2

x759pob21#

[发布我的黑客解决方案供参考]。
@mck的答案可能是一种简洁的方法,但对于我的用例来说还不够。我的数据框有很多列,并且在上使用了所有的列 group-by 那是个昂贵的手术。
在我的用例中 IDsC_3 是唯一且已知的值,因此这是本解决方案中的假设。我实现了如下转变:

case class C3_Interm_Struct(C3_C2: String, C3_C3: String)
case class C3_Out_Struct(ID1: C3_Interm_Struct, ID2: C3_Interm_Struct) //Manually add fields as required

val custom_flat_func = udf((a: Seq[Row]) =>
    {
        var _id1: C3_Interm_Struct= null;
        var _id2: C3_Interm_Struct = null;
        for(item<-a)
        {
            val intermData = C3_Interm_Struct(item(1).toString, item(2).toString)

            if(item(0).equals("ID1")) {
                _id1 = intermData
            }
            else if(item(0).equals("ID2")) {
                _id2 = intermData
            }
            else if()//Manual expansion
                ..
        }

        Seq(C3_Out_Struct(_id1, _id2)) //Return type has to be Seq
    }
)

val flatDf = df.withColumn("C_3", custom_flat_func($"C_3")).selectExpr("C_1", "C_2", "inline(C_3)") //Expand the Seq which has only 1 Row
flatDf.first.prettyJson

输出:

{ 
  "C_1" : "A",
  "C_2" : "B",
  "ID1" : {
              "C3_C2" : "V2",
              "C3_C3" : "V3"
          },
  "ID2" : {
              "C3_C2" : "V2",
              "C3_C3" : "V3"
          }
}
``` `udf` s通常很慢,但这比 `pivot` 与 `group-by` .
[可能会有更有效的解决方案,我在撰写本文时并不知道]
tzdcorbm

tzdcorbm2#

可以分解结构,然后按id透视:

val df2 = df.selectExpr("C_1","C_2","inline(C_3)")
            .groupBy("C_1","C_2")
            .pivot("ID")
            .agg(first(struct("C3_C2","C3_C3")))

df2.show
+---+---+--------+--------+--------+
|C_1|C_2|     ID1|     ID2|     ID3|
+---+---+--------+--------+--------+
|  A|  B|[V1, V2]|[V3, V4]|[V4, V5]|
+---+---+--------+--------+--------+

df2.printSchema
root
 |-- C_1: string (nullable = true)
 |-- C_2: string (nullable = true)
 |-- ID1: struct (nullable = true)
 |    |-- C3_C2: string (nullable = true)
 |    |-- C3_C3: string (nullable = true)
 |-- ID2: struct (nullable = true)
 |    |-- C3_C2: string (nullable = true)
 |    |-- C3_C3: string (nullable = true)
 |-- ID3: struct (nullable = true)
 |    |-- C3_C2: string (nullable = true)
 |    |-- C3_C3: string (nullable = true)

相关问题