窗口上的pyspark标准缩放器

ars1skjm  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(444)

我想用标准定标器 pyspark.ml.feature.StandardScaler 在我的数据窗口。

df4=spark.createDataFrame(
    [
        (1,1, 'X', 'a'),
        (2,1, 'X', 'a'),
        (3,9, 'X', 'b'),
        (5,1, 'X', 'b'),
        (6,2, 'X', 'c'),
        (7,2, 'X', 'c'),
        (8,10, 'Y', 'a'),
        (9,45, 'Y', 'a'),
        (10,3, 'Y', 'a'),
        (11,3, 'Y', 'b'),
        (12,6, 'Y', 'b'),
        (13,19,'Y', 'b')
    ],
    ['id','feature', 'txt', 'cat'] 
)

w = Window().partitionBy(..)

我可以通过调用 .fit & .transform 方法。但不是在电视上 w 我们通常使用的变量 F.col('feature') - F.mean('feature').over(w) .
我可以将所有窗口化/分组的数据转换为单独的列,将其放入Dataframe中,然后在其上应用standardscaler并转换回 1D . 还有别的方法吗?最终的目标是尝试不同的定标器,包括 pyspark.ml.feature.RobustScaler .

z2acfund

z2acfund1#

我最终不得不编写自己的scaler类。使用 pyspark StandardScaler  在上面的问题是不合适的,因为我们都知道它是更有效的端到端系列转换。尽管如此,我还是想出了自己的定标器。它并不真的有用 Window  但是我使用 groupby .

class StandardScaler:

    tol = 0.000001

    def __init__(self, colsTotransform, groupbyCol='txt', orderBycol='id'):
        self.colsTotransform = colsTotransform
        self.groupbyCol=groupbyCol
        self.orderBycol=orderBycol

    def __tempNames__(self):
        return [(f"{colname}_transformed",colname) for colname in self.colsTotransform]

    def fit(self, df):
        funcs = [(F.mean(name), F.stddev(name)) for name in self.colsTotransform]
        exprs = [ff for tup in funcs for ff in tup]
        self.stats = df.groupBy([self.groupbyCol]).agg(*exprs)

    def __transformOne__(self, df_with_stats, newName, colName):
        return df_with_stats\
                .withColumn(newName, 
                            (F.col(colName)-F.col(f'avg({colName})'))/(F.col(f'stddev_samp({colName})')+self.tol))\
                .drop(colName)\
                .withColumnRenamed(newName, colName)

    def transform(self, df):
        df_with_stats = df.join(self.stats, on=self.groupbyCol, how='inner').orderBy(self.orderBycol)
        return reduce(lambda df_with_stats, kv: self.__transformOne__(df_with_stats, *kv), 
                       self.__tempNames__(), df_with_stats)[df.columns]

用法:

ss = StandardScaler(colsTotransform=['feature'],groupbyCol='txt',orderbyCol='id')
ss.fit(df4)
ss.stats.show()

+---+------------------+--------------------+
|txt|      avg(feature)|stddev_samp(feature)|
+---+------------------+--------------------+
|  Y|14.333333333333334|  16.169930941926335|
|  X|2.6666666666666665|  3.1411250638372654|
+---+------------------+--------------------+

df4.show()

+---+-------+---+---+
| id|feature|txt|cat|
+---+-------+---+---+
|  1|      1|  X|  a|
|  2|      1|  X|  a|
|  3|      9|  X|  b|
|  5|      1|  X|  b|
|  6|      2|  X|  c|
|  7|      2|  X|  c|
|  8|     10|  Y|  a|
|  9|     45|  Y|  a|
| 10|      3|  Y|  a|
| 11|      3|  Y|  b|
| 12|      6|  Y|  b|
| 13|     19|  Y|  b|
+---+-------+---+---+

ss.transform(df4).show()
+---+--------------------+---+---+
| id|             feature|txt|cat|
+---+--------------------+---+---+
|  1|  -0.530595281053646|  X|  a|
|  2|  -0.530595281053646|  X|  a|
|  3|  2.0162620680038548|  X|  b|
|  5|  -0.530595281053646|  X|  b|
|  6|-0.21223811242145835|  X|  c|
|  7|-0.21223811242145835|  X|  c|
|  8| -0.2679871102053074|  Y|  a|
|  9|  1.8965241645298676|  Y|  a|
| 10| -0.7008893651523425|  Y|  a|
| 11| -0.7008893651523425|  Y|  b|
| 12| -0.5153598273178989|  Y|  b|
| 13|  0.2886015032980233|  Y|  b|
+---+--------------------+---+---+

相关问题