如何在scala中编写动态分解函数(分解多列)

jdg4fx2g  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(451)

我需要编写一个动态scala类。它将以三个参数作为输入。输入Dataframe、要分解的列列表和分隔符。假设我有下面的Dataframe。

DataBase     TableName       Value
dbdev        table1_name     Value1#Value2#Value3

爆炸后,我预期的结果如下

DataBase     TableName       Value                   Value_Exploded
dbdev        table1_name     Value1#Value2#Value3    Value1
dbdev        table1_name     Value1#Value2#Value3    Value2
dbdev        table1_name     Value1#Value2#Value3    Value3

所以我的问题是如何编写一个scala类来实现上述功能。约束是,它必须是泛型的。它可能得到不同的Dataframe。需要分解(多个)的列需要传递。
我能够做到这一点,当我只需要爆炸一列。请在下面找到-

val explodeColumnName = "Value" //column which i need to explode
val explodeColumnBy = "#" //delimiter

val explodeDF = df.select(df.col("*"), explode(split(col(explodeColumnName), s"$explodeColumnBy")).as (explodeColumnName+"_Exploded"))

但当我需要动态分解多个列时,我失败了。比方说,我需要分解Dataframedf的4列。
任何帮助/建议/建议都非常好。
谢谢您!

8yparm6h

8yparm6h1#

检查以下代码。

scala> val df = Seq(
     (
         "dbdev",
         "table1_name",
         "Value1#Value2#Value3",
         "Sample1#Sample2#Sample3"
    )
)
.toDF("database","tablename","value","sample")
scala> df.show(false)
+--------+-----------+--------------------+-----------------------+
|database|tablename  |value               |sample                 |
+--------+-----------+--------------------+-----------------------+
|dbdev   |table1_name|Value1#Value2#Value3|Sample1#Sample2#Sample3|
+--------+-----------+--------------------+-----------------------+

导入所需库

scala> import org.apache.spark.sql.{Column,DataFrame}
import org.apache.spark.sql.{Column, DataFrame}

定义 DFHelper 班级。
注意-不要使用 explode 作为内部函数名 DFHelper 班级, explode 在内置函数中已经可用,所以我使用 explodeM 作为功能。

scala> implicit class DFHelper(inDF: DataFrame) {
           import inDF.sparkSession.implicits._          
            def explodeM(delimiter:String,columns:Column*): DataFrame = {
               columns.foldLeft(inDF)((indf,column) => indf
               .withColumn(column.toString,split(column,delimiter))
               .withColumn(column.toString,explode(column))
               )
           }
      }
scala> df.explodeM("#",$"value").show(false) // one column exploding
+--------+-----------+------+-----------------------+
|database|tablename  |value |sample                 |
+--------+-----------+------+-----------------------+
|dbdev   |table1_name|Value1|Sample1#Sample2#Sample3|
|dbdev   |table1_name|Value2|Sample1#Sample2#Sample3|
|dbdev   |table1_name|Value3|Sample1#Sample2#Sample3|
+--------+-----------+------+-----------------------+
scala> df.explodeM("#",$"value",$"sample").show(false) // two columns exploding
+--------+-----------+------+-------+
|database|tablename  |value |sample |
+--------+-----------+------+-------+
|dbdev   |table1_name|Value1|Sample1|
|dbdev   |table1_name|Value1|Sample2|
|dbdev   |table1_name|Value1|Sample3|
|dbdev   |table1_name|Value2|Sample1|
|dbdev   |table1_name|Value2|Sample2|
|dbdev   |table1_name|Value2|Sample3|
|dbdev   |table1_name|Value3|Sample1|
|dbdev   |table1_name|Value3|Sample2|
|dbdev   |table1_name|Value3|Sample3|
+--------+-----------+------+-------+

相关问题