动态替换每一行的Spark Dataframe 列

new9mtju  于 2022-12-13  发布在  Apache
关注(0)|答案(1)|浏览(224)

我这里有2个JSON字符串每条记录。
第一个JSON字符串是变量,指示第二个JSON字符串中要替换的列。
例如输入。

import spark.implicits._
// Leaving timestamp aspect out and multiple occurrence of changes in same run, for simplicity.
val df = Seq((1, """{"b": "new", "c": "new"}""",  """{"k": 1, "b": "old", "c": "old", "d": "old"}""" ),
             (2, """{"b": "new", "d": "new"}""",  """{"k": 2, "b": "old", "c": "old", "d": "old"}""" )).toDF("id", "chg", "before")

就像这样:

+---+------------------------+--------------------------------------------+
|id |chg                     |before                                      |
+---+------------------------+--------------------------------------------+
|1  |{"b": "new", "c": "new"}|{"k": 1, "b": "old", "c": "old", "d": "old"}|
|2  |{"b": "new", "d": "new"}|{"k": 2, "b": "old", "c": "old", "d": "old"}|
+---+------------------------+--------------------------------------------+

我想要的是-但我不知道如何做,甚至不确定是否可能-是这个输出,因此在每行的基础上”。

+---+--------------------------------------------+
|id |after                                       |
+---+------------------------+-------------------+
|1  |{"k": 1, "b": "new", "c": "new", "d": "old"}|
|2  |{"k": 2, "b": "new", "c": "old", "d": "new"}|
+---+------------------------+-------------------+

如果可以的话,这就有点意思了。我们可以在JSON中有一个数组,这也很好。

ozxc1zmp

ozxc1zmp1#

您可以走强类型化的路线,这使您能够对希望发生的确切转换进行大量控制。
我做了如下假设:

  • before列包含每行中的所有字段
  • chg列从不包含k字段。
import spark.implicits._
import org.apache.spark.sql.Encoders

// Creating case classes with the schema of your json objects. We're making
// these to make use of strongly typed Datasets. Notice that the MyChgClass has
// each field as an Option: this will enable us to choose between "chg" and
// "before"
case class MyChgClass(b: Option[String], c: Option[String], d: Option[String])
case class MyFullClass(k: Int, b: String, c: String, d: String)
case class MyEndClass(id: Int, after: MyFullClass)

// Creating schemas for the from_json function
val chgSchema = Encoders.product[MyChgClass].schema
val beforeSchema = Encoders.product[MyFullClass].schema

// Your dataframe from the example
val df = Seq(
  (1, """{"b": "new", "c": "new"}""",  """{"k": 1, "b": "old", "c": "old", "d": "old"}""" ),
  (2, """{"b": "new", "d": "new"}""",  """{"k": 2, "b": "old", "c": "old", "d": "old"}""" )
).toDF("id", "chg", "before")

// Parsing the json string into our case classes and finishing by creating a
// strongly typed dataset with the .as[] method
val parsedDf = df
  .withColumn("parsedChg",from_json(col("chg"), chgSchema))
  .withColumn("parsedBefore",from_json(col("before"), beforeSchema))
  .drop("chg")
  .drop("before")
  .as[(Int, MyChgClass, MyFullClass)]

// Mapping over our dataset with a lot of control of exactly what we want. Since
// the "chg" fields are options, we can use the getOrElse method to choose
// between either the "chg" field or the "before" field
val output = parsedDf.map{
  case (id, chg, before) => {
    MyEndClass(id, MyFullClass(
      before.k,
      chg.b.getOrElse(before.b),
      chg.c.getOrElse(before.c),
      chg.d.getOrElse(before.d)
    ))
  }
}

output.show(false)
+---+------------------+                                                                                                                                                                                                                                                        
|id |after             |                                                                                                                                                                                                                                                        
+---+------------------+                                                                                                                                                                                                                                                        
|1  |[1, new, new, old]|                                                                                                                                                                                                                                                        
|2  |[2, new, old, new]|                                                                                                                                                                                                                                                        
+---+------------------+

这样,output数据集就包含了您想要的值。既然您要求了它,我们可以再次将其转换为json字符串(收到的意见:这在Spark IMO中不太有用,所以除非你绝对需要在某个地方输出这个json,否则我不会这么做),方法如下:

output.select($"id", to_json($"after")).show(false)
+---+-------------------------------------+                                                                                                                                                                                                                                     
|id |structstojson(after)                 |                                                                                                                                                                                                                                     
+---+-------------------------------------+                                                                                                                                                                                                                                     
|1  |{"k":1,"b":"new","c":"new","d":"old"}|                                                                                                                                                                                                                                     
|2  |{"k":2,"b":"new","c":"old","d":"new"}|                                                                                                                                                                                                                                     
+---+-------------------------------------

希望这对你有帮助!

相关问题