使用Pyspark模式按版本分组填充NA值颜色

mklgxw1f  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(90)

有没有一种方法可以做到这一点,而不使用连接,以更好地执行?
我想通过模式版本填充颜色NA值,我这样做了...但是首先找到mode,然后用join创建一个新的列,代码如下:
Obs:我使用的是spark 3.2版,所以我不能使用mode功能,也不能更新到最新的pyspark版本
我是这样做的:

window_spec = Window.partitionBy(F.col('version')).orderBy(F.col('count_granularity').desc())

window_granularity= Window.partitionBy(F.col('version'),F.col('color'))

mode = df.orderBy('version'.withColumn('count_granularity', F.sum(F.lit(1)).over(window_granularity)) .withColumn('rank',F.row_number().over(window_spec)) .filter(F.col('rank') == 1) .withColumnRenamed('color', 'mode').select('version','mode')

df=df.join(mode, on='version', how='left') .withColumn('mode_color', F.when(df.color.isNull(), mode.mode).otherwise(df.color)).drop('color', 'mode') .withColumnRenamed('mode_color', 'color')
g0czyy6m

g0czyy6m1#

你可以删除连接,然后用窗口做所有的事情。
这里有一个例子

import pyspark.sql.functions as func
from pyspark.sql.window import Window as wd

# given the input data
# +-------+------+
# |version|colour|
# +-------+------+
# |      1|   red|
# |      1|   red|
# |      1|  null|
# |      1|yellow|
# +-------+------+

data_sdf. \
    withColumn('colour_cnt', func.count('*').over(wd.partitionBy('version', 'colour'))). \
    withColumn('max_cnt', func.max(func.struct('colour_cnt', 'colour')).over(wd.partitionBy('version'))). \
    withColumn('colour_filled', func.coalesce('colour', func.col('max_cnt.colour'))). \
    show(truncate=False)

# +-------+------+----------+--------+-------------+
# |version|colour|colour_cnt|max_cnt |colour_filled|
# +-------+------+----------+--------+-------------+
# |1      |null  |1         |{2, red}|red          |
# |1      |red   |2         |{2, red}|red          |
# |1      |red   |2         |{2, red}|red          |
# |1      |yellow|1         |{2, red}|yellow       |
# +-------+------+----------+--------+-------------+

相关问题