spark(scala):如何对嵌套数据中的数组列应用udf转换

scyqe7ek  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(569)

我需要对具有嵌套结构的Dataframe的一组列进行转换。转换依赖于已经存在的函数。
假设数据如下所示

case class A(A: B)
case class B(B: String, C: String, D: Seq[C])
case class C(E: String, F: String)

val df = sc.parallelize(Seq(A(B("b", "c", Seq(C("e1","f1"), C("e2", "f2")))) )).toDF
df.printSchema

root
|-- A: struct (nullable = true)
|    |-- B: string (nullable = true)
|    |-- C: string (nullable = true)
|    |-- D: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- E: string (nullable = true)
|    |    |    |-- F: string (nullable = true)

假设转换是把一个字符串变成大写

val upper: String => String = _.toUpperCase
val upperUDF = udf(upper)

在这里,我找到了一种方法,部分解决了我的问题。应用这里给出的代码

def mutate(df: DataFrame, fn: Column => Column): DataFrame = {
   // Get a projection with fields mutated by `fn` and select it
   // out of the original frame with the schema reassigned to the original
   // frame (explained later)
   df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, df.schema)
     }

def traverse(schema: StructType, fn: Column => Column, path: String = ""): Array[Column] = {
   schema.fields.map(f => {
   f.dataType match {
      case s: StructType => struct(traverse(s, fn, path + f.name + "."): _*)
      case _ => fn(col(path + f.name))
       }
     })
    }

以下对我来说很好

val df2 = mutate(df, c => if (c.toString == "A.B" || c.toString == "A.C") upperUDF(c) else c)

但是,当涉及到嵌套数组d的列的转换时,它会失败而不会出错。

val df3 = mutate(df, c => if (c.toString == "A.D.F") upperUDF(c) else c)

这里出了什么问题?如何转换嵌套数组的列(如上所述)?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题