动态生成具有过滤器、列重命名和合并条件scala spark的代码

tjvv9vkg  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(340)

我有一段代码,我想动态生成。我想采取以下列的形式,一个列表或序列和执行 filter 操作 coalesce 在里面, drop 以及 withColumnRenamed 声明。
这里是我想要动态接受的列的列表(这里是一个字符串)。

val cols = "a|tmp_a,b|tmp_b"

代码如下所示:

val df1 = df2.filter(!(coalesce(col("a"), lit(0)) === coalesce(col("tmp_a"), lit(0))) || !(upper(col("b")) === upper(col("tmp_b"))))
  .drop("a")
  .drop("b")
  .withColumnRenamed("tmp_a", "a")
  .withColumnRenamed("tmp_b", "b")

如果将更多列添加到 cols ,如何动态调整代码?新的列对应该使用与上面的“b | tmp | u b”相同的过滤条件。

quhf5bfb

quhf5bfb1#

给定一个具有列名对的输入,您可以创建两种类型的过滤条件(在第一个列对下面使用第一个过滤模式,其余的使用第二个过滤模式)。过滤完Dataframe后 drop 以及 withColumnRenamed 可以使用 foldLeft .

val cols = "a|tmp_a,b|tmp_b,c|tmp_c".split(",").map(_.split("\\|"))

val filterCondHead = !(coalesce(col(cols.head(0)), lit(0)) === coalesce(col(cols.head(1)), lit(0)))
val filterCondTail = cols.tail.map(c => !(upper(col(c(0))) === upper(col(c(1))))).reduce(_ || _)

val df2 = df.filter(filterCondHead || filterCondTail)

val df3 = cols.foldLeft(df2){ case(df, c) => 
  df.drop(c(0)).withColumnRenamed(c(1), c(0))
}

相关问题