在每一组中寻找星火标度的百分位数

ogsagwnx  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(505)

我正在尝试使用下面的窗口函数对列进行百分位数计算。我在这里提到使用 ApproxQuantile 群的定义。

val df1 = Seq(
    (1, 10.0), (1, 20.0), (1, 40.6), (1, 15.6), (1, 17.6), (1, 25.6),
    (1, 39.6), (2, 20.5), (2 ,70.3), (2, 69.4), (2, 74.4), (2, 45.4),
    (3, 60.6), (3, 80.6), (4, 30.6), (4, 90.6)
).toDF("ID","Count")

val idBucketMapping = Seq((1, 4), (2, 3), (3, 2), (4, 2))
    .toDF("ID", "Bucket")

//jpp
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.expressions.Window

object PercentileApprox {
     def percentile_approx(col: Column, percentage: Column,
                             accuracy: Column): Column = {
         val expr = new ApproximatePercentile(
             col.expr, percentage.expr, accuracy.expr
         ).toAggregateExpression
         new Column(expr)
    }

    def percentile_approx(col: Column, percentage: Column): Column =
        percentile_approx(col, percentage,
                  lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
}
import PercentileApprox._

var res = df1
    .withColumn("percentile",
        percentile_approx(col("count"), typedLit(doBucketing(2)))
                 .over(Window.partitionBy("ID"))
    )

def doBucketing(bucket_size : Int) = (1 until bucket_size)
    .scanLeft(0d)((a, _) => a + (1 / bucket_size.toDouble))
scala> df1.show
+---+-----+
| ID|Count|
+---+-----+
|  1| 10.0|
|  1| 20.0|
|  1| 40.6|
|  1| 15.6|
|  1| 17.6|
|  1| 25.6|
|  1| 39.6|
|  2| 20.5|
|  2| 70.3|
|  2| 69.4|
|  2| 74.4|
|  2| 45.4|
|  3| 60.6|
|  3| 80.6|
|  4| 30.6|
|  4| 90.6|
+---+-----+

scala> idBucketMapping.show
+---+------+
| ID|Bucket|
+---+------+
|  1|     4|
|  2|     3|
|  3|     2|
|  4|     2|
+---+------+

scala> res.show
+---+-----+------------------+
| ID|Count|        percentile|
+---+-----+------------------+
|  1| 10.0|[10.0, 20.0, 40.6]|
|  1| 20.0|[10.0, 20.0, 40.6]|
|  1| 40.6|[10.0, 20.0, 40.6]|
|  1| 15.6|[10.0, 20.0, 40.6]|
|  1| 17.6|[10.0, 20.0, 40.6]|
|  1| 25.6|[10.0, 20.0, 40.6]|
|  1| 39.6|[10.0, 20.0, 40.6]|
|  3| 60.6|[60.6, 60.6, 80.6]|
|  3| 80.6|[60.6, 60.6, 80.6]|
|  4| 30.6|[30.6, 30.6, 90.6]|
|  4| 90.6|[30.6, 30.6, 90.6]|
|  2| 20.5|[20.5, 69.4, 74.4]|
|  2| 70.3|[20.5, 69.4, 74.4]|
|  2| 69.4|[20.5, 69.4, 74.4]|
|  2| 74.4|[20.5, 69.4, 74.4]|
|  2| 45.4|[20.5, 69.4, 74.4]|
+---+-----+------------------+

到目前为止,一切都很好,逻辑也很简单。但我需要一个动态的结果。这意味着论点 doBucketing(2) 此函数的 idBucketMapping 基于id值。
这对我来说似乎有点棘手。这有可能吗?
预期输出——这意味着百分位数桶基于- idBucketMapping Dataframe。

+---+-----+------------------------+
|ID |Count|percentile              |
+---+-----+------------------------+
|1  |10.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |20.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |40.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |15.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |17.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |25.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |39.6 |[10.0, 15.6, 20.0, 39.6]|
|3  |60.6 |[60.6, 60.6]            |
|3  |80.6 |[60.6, 60.6]            |
|4  |30.6 |[30.6, 30.6]            |
|4  |90.6 |[30.6, 30.6]            |
|2  |20.5 |[20.5, 45.4, 70.3]      |
|2  |70.3 |[20.5, 45.4, 70.3]      |
|2  |69.4 |[20.5, 45.4, 70.3]      |
|2  |74.4 |[20.5, 45.4, 70.3]      |
|2  |45.4 |[20.5, 45.4, 70.3]      |
+---+-----+------------------------+
acruukt9

acruukt91#

百分位近似值表示百分比和准确度。似乎,它们都是常量。因此我们无法计算 percentile_approx 在运行时动态计算 percentage 以及 accuracy .
ref-apache spark git百分比\u近似源

mfpqipee

mfpqipee2#

我有一个解决方案,你是非常不合法的,只有当你有一个有限的可能扣。
我的第一个版本很难看。

// for the sake of clarity, let's define a function that generates the
// window aggregation
def per(x : Int) = percentile_approx(col("count"), typedLit(doBucketing(x)))
                        .over(Window.partitionBy("ID"))

// then, we simply try to match the Bucket column with a possible value
val res = df1
    .join(idBucketMapping, Seq("ID"))
    .withColumn("percentile", when('Bucket === 2, per(2)
                     .otherwise(when('Bucket === 3, per(3))
                     .otherwise(per(4)))
    )

这很恶心,但对你来说很管用。稍微不那么难看,但逻辑非常相同,您可以定义一组可能的桶数,并使用它来执行与上面相同的操作。

val possible_number_of_buckets = 2 to 5

val res = df1
    .join(idBucketMapping, Seq("ID"))
    .withColumn("percentile", possible_number_of_buckets
                .tail
                .foldLeft(per(possible_number_of_buckets.head))
                         ((column, size) => when('Bucket === size, per(size))
                                              .otherwise(column)))

相关问题