scala—基于列值分区后应用bucketizer触发Dataframe

2vuwiymt  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(582)

我需要在下面的Dataframe上应用spark bucketizer df . 这是模型数据。原始Dataframe有大约10k条记录。

instance   name                 value    percentage
 A37        Histogram.ratio      1            0.20
 A37        Histogram.ratio      20           0.34           
 A37        Histogram.ratio      50           0.04           
 A37        Histogram.ratio      500          0.13           
 A37        Histogram.ratio      2000         0.05           
 A37        Histogram.ratio      9000         0.32           
 A49        Histogram.ratio      1            0.50
 A49        Histogram.ratio      20           0.24           
 A49        Histogram.ratio      25           0.09           
 A49        Histogram.ratio      55           0.12           
 A49        Histogram.ratio      120          0.06           
 A49        Histogram.ratio      300          0.08

我需要在按列划分Dataframe之后应用bucketizer instance . 中的每个值 instance 具有不同的拆分数组,定义如下

val splits_map =  Map("A37" -> Array(0,30,1000,5000,9000), "A49" -> Array(0,10,30,80,998))

我将使用下面的代码在单列上执行bucketing。但是需要帮助将Dataframe划分为 instance 列,然后应用bucketizer.transform

val bucketizer = new Bucketizer().setInputCol("value").setOutputCol("value_range").setSplits(splits)
val df2 = bucketizer.transform(df)

df2.groupBy("value_range").sum("percentage").show()

是否可以使用列值将Dataframe拆分为多个Dataframe instance 那就把它装好 value 列,然后使用groupby().sum()计算百分比之和。
预期产量:

instance   name                 bucket    percentage
A37        Histogram.ratio      0            0.54                
A37        Histogram.ratio      1            0.17           
A37        Histogram.ratio      3            0.05           
A37        Histogram.ratio      4            0.32           
A49        Histogram.ratio      0            0.50
A49        Histogram.ratio      1            0.33                     
A49        Histogram.ratio      2            0.12           
A49        Histogram.ratio      3            0.14
2cmtqfgy

2cmtqfgy1#

将分区内的数据bucketize的另一种方法是:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

def bucketizeWithinPartition(df: DataFrame, splits: Map[String, Array[Int]], partitionCol: String, featureCol: String): DataFrame = {
  val window = Window.partitionBy(partitionCol).orderBy($"bucket_start")

  val splitsDf = splits.toList.toDF(partitionCol, "splits")
    .withColumn("bucket_start", explode($"splits"))
    .withColumn("bucket_end", coalesce(lead($"bucket_start", 1).over(window), lit(Int.MaxValue)))
    .withColumn("bucket", row_number().over(window))

  val joinCond = "d.%s = s.%s AND d.%s >= s.bucket_start AND d.%s < bucket_end".format(partitionCol, partitionCol, featureCol, featureCol)
  df.as("d")
    .join(splitsDf.as("s"), expr(joinCond), "inner")
    .select($"d.*", $"s.bucket")
}

val data =
  List(
    ("A37", "Histogram.ratio", 1, 0.20),
    ("A37", "Histogram.ratio", 20, 0.34),
    ("A37", "Histogram.ratio", 9000, 0.32),
    ("A49", "Histogram.ratio", 1, 0.50),
    ("A49", "Histogram.ratio", 20, 0.24)
  ).toDF("instance", "name", "value", "percentage")

val splits_map =  Map("A37" -> Array(0,30,1000,5000,9000), "A49" -> Array(0,10,30,80,998))
val bucketedData = bucketizeWithinPartition(data, splits_map, "instance", "value")

相关问题