spark-winsorize组内的Dataframe列

fiei3ece  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(579)

我正在为机器学习输入预处理数据,一个目标值列,称之为“价格”有许多离群值,我不想在整个集合中赢得价格,而是想在标有“产品类别”的组中赢得价格。还有其他功能,产品类别只是一个价格相关的标签。
有一个scala stat函数非常有效:

df_data.stat.approxQuantile("price", Array(0.01, 0.99), 0.00001)
// res19: Array[Double] = Array(3.13, 318.54)

不幸的是,它不支持计算组内的分位数。is也不支持窗口分区。

df_data
    .groupBy("product_category")
    .approxQuantile($"price", Array(0.01, 0.99), 0.00001)

// error: value approxQuantile is not a member of
//   org.apache.spark.sql.RelationalGroupedDataset

为了替换超出该范围的值(例如winsorizing),计算sparkDataframe组中p01和p99的最佳方法是什么?
我的数据集模式可以想象成这样,它有超过20mm的行,有大约10k个不同的“产品类别”标签,所以性能也是一个问题。

df_data and a winsorized price column:
+---------+------------------+--------+---------+
|   item  | product_category |  price | pr_winz |
+---------+------------------+--------+---------+
| I000001 |     XX11         |   1.99 |   5.00  |
| I000002 |     XX11         |  59.99 |  59.99  |
| I000003 |     XX11         |1359.00 | 850.00  |
+---------+------------------+--------+---------+
supposing p01 = 5.00, p99 = 850.00 for this product_category
6jygbczu

6jygbczu1#

这是我在与文档斗争之后想到的(有两个函数 approx_percentile 以及 percentile_approx 显然是做同样的事情)。
除了作为一个sparksql表达式之外,我不知道如何实现它,不知道为什么分组只能在那里工作。我怀疑是因为它是Hive的一部分?
sparkDataframewinsorizor
在10至100mm范围内的df上进行测试

// Winsorize function, groupable by columns list
// low/hi element of [0,1]
// precision: integer in [1, 1E7-ish], in practice use 100 or 1000 for large data, smaller is faster/less accurate
// group_col: comma-separated list of column names
import org.apache.spark.sql._

def grouped_winzo(df: DataFrame, winz_col: String, group_col: String, low: Double, hi: Double, precision: Integer): DataFrame = {
    df.createOrReplaceTempView("df_table")

    spark.sql(s"""
    select distinct 
    *
    , percentile_approx($winz_col, $low, $precision) over(partition by $group_col) p_low
    , percentile_approx($winz_col, $hi, $precision) over(partition by $group_col) p_hi

    from df_table
    """)
    .withColumn(winz_col + "_winz", expr(s"""
        case when $winz_col <= p_low then p_low
             when $winz_col >= p_hi then p_hi
             else $winz_col end"""))
    .drop(winz_col, "p_low", "p_hi")

}

// winsorize the price column of a dataframe at the p01 and p99 
// percentiles, grouped by 'product_category' column.

val df_winsorized = grouped_winzo(
   df_data
   , "price"
   , "product_category"
   , 0.01, 0.99, 1000)

相关问题