spark scala—有条件地从其他列添加json列

oxiaedzo  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(502)

使用Dataframe,例如:

+-----+-----+-----+-----+-----+-----+
|old_a|new_a|    a|old_b|new_b|    b|
+-----+-----+-----+-----+-----+-----+
|    6|    7| true|    6|    6|false|
|    1|    1|false|   12|    8| true|
|    1|    2| true|    2|    8| true|
|    1| null| true|    2|    8| true|
+-----+-----+-----+-----+-----+-----+

注意:当“new\u a”与“old\u a”不同时,“a”为“true”,与“b”相同
我想添加一个json列,其中包含其他列的一些值,遵循规则“如果'a'为真,则必须将'new\u a'col的值添加到新的json中,对于'b'也是这样,
将产生以下Dataframe

+-----+-----+--------+-----+-----+--------+------------------------+
|old_a|new_a|a       |old_b|new_b|       b| json                   |
+-----+-----+--------+-----+-----+--------+------------------------+
|    6|    7|    true|    6|    6|   false| { "a" : 7 }            |
|    1|    1|   false|   12|    8|    true| { "b" : 8 }            |    
|    1|    2|    true|    2|    8|    true| { "a" : 2, "b" : 8}    |
|    1| null|    true|    2|    8|    true| { "a" : null, "b" : 8} |
+-----+-----+--------+-----+-----+--------+------------------------+

有没有一种方法可以在没有自定义项的情况下实现这一点?
如果不是的话,写udf的最好方法是什么,这样就不会太贵了?
谢谢

guz6ccqo

guz6ccqo1#

当我们不知道新旧列对的数目时,推广srinivas解的一种方法
(请注意,我没有提到的是,列“a”和“b”在这里说明值是否在旧a和新a(分别是b)之间变化)

val df = Seq(
      (null, "a", "b", "b"),
      ("a", null, "b", "b"),
      ("a", "a2", "b", "b"),
      ("a", "a2", "b", "b2"),
      (null, null, "b", "b2"),
    ).toDF("old_a", "new_a","old_b", "new_b")

    // replace null by empty string to not mess with the voluntary null value we set later
    val df2 = df.na.fill("",df.columns)

    df2.show()

    val colNames = df2.columns.map(name => name.stripPrefix("old_").stripPrefix("new_")).distinct
    val res = colNames.foldLeft(df2){(tempDF, colName) =>
      tempDF.withColumn(colName,
        when(col(s"old_$colName").equalTo(col(s"new_$colName")), null)
        .otherwise(col(s"new_$colName"))
      )
    }
    val cols: Array[Column] = colNames.map(col(_))
    val resWithJson = res.withColumn("json", to_json(struct(cols:_*)))

输出:

+-----+-----+-----+-----+
|old_a|new_a|old_b|new_b|
+-----+-----+-----+-----+
|     |    a|    b|    b|
|    a|     |    b|    b|
|    a|   a2|    b|    b|
|    a|   a2|    b|   b2|
|     |     |    b|   b2|
+-----+-----+-----+-----+

+-----+-----+-----+-----+----+----+-------------------+
|old_a|new_a|old_b|new_b|a   |b   |json               |
+-----+-----+-----+-----+----+----+-------------------+
|     |a    |b    |b    |a   |null|{"a":"a"}          |
|a    |     |b    |b    |    |null|{"a":""}           |
|a    |a2   |b    |b    |a2  |null|{"a":"a2"}         |
|a    |a2   |b    |b2   |a2  |b2  |{"a":"a2","b":"b2"}|
|     |     |b    |b2   |null|b2  |{"b":"b2"}         |
+-----+-----+-----+-----+----+----+-------------------+
ao218c7q

ao218c7q2#

使用 to_json & struct 功能。
默认情况下 to_json 函数删除所有 null 值列,由于这个原因我转换了 new_a 列数据类型到
string new_a 数据类型 integer ```
scala> df.show(false)
+-----+-----+-----+-----+-----+-----+
|old_a|new_a|a |old_b|new_b|b |
+-----+-----+-----+-----+-----+-----+
|6 |7 |true |6 |6 |false|
|1 |1 |false|12 |8 |true |
|1 |2 |true |2 |8 |true |
|1 |null |true |2 |8 |true |
+-----+-----+-----+-----+-----+-----+

scala> df.printSchema
root
|-- old_a: integer (nullable = false)
|-- new_a: integer (nullable = true)
|-- a: boolean (nullable = false)
|-- old_b: integer (nullable = false)
|-- new_b: integer (nullable = false)
|-- b: boolean (nullable = false)

scala> df.withColumn("json",when($"a" && $"b",to_json(struct($"new_a",$"new_b"))).when($"a",to_json(struct($"new_a"))).otherwise(to_json(struct($"new_b")))).show(false)
+-----+-----+-----+-----+-----+-----+---------------------+
|old_a|new_a|a |old_b|new_b|b |json |
+-----+-----+-----+-----+-----+-----+---------------------+
|6 |7 |true |6 |6 |false|{"new_a":7} |
|1 |1 |false|12 |8 |true |{"new_b":8} |
|1 |2 |true |2 |8 |true |{"new_a":2,"new_b":8}|
|1 |null |true |2 |8 |true |{"new_b":8} |
+-----+-----+-----+-----+-----+-----+---------------------+
`new_a` 数据类型 `string`
scala> df.show(false)
+-----+-----+-----+-----+-----+-----+
|old_a|new_a|a |old_b|new_b|b |
+-----+-----+-----+-----+-----+-----+
|6 |7 |true |6 |6 |false|
|1 |1 |false|12 |8 |true |
|1 |2 |true |2 |8 |true |
|1 |null |true |2 |8 |true |
+-----+-----+-----+-----+-----+-----+

scala> df.printSchema
root
|-- old_a: integer (nullable = false)
|-- new_a: string (nullable = true)
|-- a: boolean (nullable = false)
|-- old_b: integer (nullable = false)
|-- new_b: integer (nullable = false)
|-- b: boolean (nullable = false)

scala> df.withColumn("json",when($"a" && $"b",to_json(struct($"new_a",$"new_b"))).when($"a",to_json(struct($"new_a"))).otherwise(to_json(struct($"new_b")))).show(false)
+-----+-----+-----+-----+-----+-----+--------------------------+
|old_a|new_a|a |old_b|new_b|b |json |
+-----+-----+-----+-----+-----+-----+--------------------------+
|6 |7 |true |6 |6 |false|{"new_a":"7"} |
|1 |1 |false|12 |8 |true |{"new_b":8} |
|1 |2 |true |2 |8 |true |{"new_a":"2","new_b":8} |
|1 |null |true |2 |8 |true |{"new_a":"null","new_b":8}|
+-----+-----+-----+-----+-----+-----+--------------------------+

相关问题