json数组的分解列

mqkwyuun  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(363)

我正在努力解决一个问题:用spark分解json数组列。
我有一个Dataframe,看起来像:

+------+------------------------------------------------------------------------+
|id    |struct                                                                  |
+------+------------------------------------------------------------------------+
|  1   |  [{_name: BankAccount, _value: 123456}, {_name: Balance, _value: 500$}]|
|  2   |  [{_name: BankAccount, _value: 098765}, {_name: Balance, _value: 100$}]|
|  3   |  [{_name: BankAccount, _value: 135790}, {_name: Balance, _value: 200$}]|
+------+------------------------------------------------------------------------+

我希望它像

+------+------------+--------+
|id    | BankAccount| Balance|
+------+------------+--------+
|  1   |   123456   | 500$   |
|  2   |   098765   | 100$   |
|  3   |   135790   | 200$   |
+------+------------+--------+

当然,这不是真的爆炸,但我不能远程接近的结果,我需要什么。
谢谢你的帮助!

plupiseo

plupiseo1#

检查以下代码。
从样本数据而不是 struct 我用过 data 为了简单起见……)

val df = Seq((1,"""[{"_name":"BankAccount","_value":"123456"},{"_name":"Balance","_value": "500$"}]"""),(2,"""[{"_name":"BankAccount","_value":"098765"},{"_name":"Balance","_value": "100$"}]"""),(3,"""[{"_name":"BankAccount","_value":"135790"},{"_name":"Balance","_value": "200$"}]""")).toDF("id","data")

打印数据架构

scala> df.printSchema
root
 |-- id: integer (nullable = false)
 |-- data: string (nullable = true)

显示示例数据

scala> df.show(false)
+---+--------------------------------------------------------------------------------+
|id |data                                                                            |
+---+--------------------------------------------------------------------------------+
|1  |[{"_name":"BankAccount","_value":"123456"},{"_name":"Balance","_value": "500$"}]|
|2  |[{"_name":"BankAccount","_value":"098765"},{"_name":"Balance","_value": "100$"}]|
|3  |[{"_name":"BankAccount","_value":"135790"},{"_name":"Balance","_value": "200$"}]|
+---+--------------------------------------------------------------------------------+

为json数据创建架构

scala> val schema = ArrayType(MapType(StringType,StringType))

使用 explode , groupBy & pivot 以得到预期的结果。
注意-您可能需要调整下面的代码一点点根据您的要求。

scala> 

df
.withColumn("data",explode(from_json($"data",schema)))
.select($"id",struct($"data"("_name").as("key"),$"data"("_value").as("value")).as("data"))
.select($"id",$"data.*")
.groupBy($"id")
.pivot($"key")
.agg(first($"value"))
.select("id","BankAccount","Balance")
.orderBy($"id".asc)
.show(false)

最终结果

+---+-----------+-------+
|id |BankAccount|Balance|
+---+-----------+-------+
|1  |123456     |500$   |
|2  |098765     |100$   |
|3  |135790     |200$   |
+---+-----------+-------+

相关问题