在spark/python中正向填充缺少的值

pprl5pva  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(391)

我试图用以前的非空值(如果存在)来填充sparkDataframe中缺少的值。我在python/pandas中做过这种事情,但是我的数据对于pandas来说太大了(在一个小集群上),我是spark noob。这是spark能做的吗?它可以对多个列执行吗?如果是,怎么做?如果没有,对who hadoop工具套件中的替代方法有什么建议吗?
谢谢!

8cdiaqws

8cdiaqws1#

我发现了一个解决方案,它不需要额外的编码,只需要在这里使用一个窗口。所以杰夫是对的,有个解决办法。完整的代码,我将简要地解释它做什么,更多的细节只看博客。

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# define the window

window = Window.orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column

filled_column_temperature = last(df6['temperature'], ignorenulls=True).over(window)

# do the fill

spark_df_filled = df6.withColumn('temperature_filled',  filled_column_temperature)

因此,我们的想法是通过始终包含实际行和所有前一行的数据定义一个滑动窗口(这里更多地介绍滑动窗口):

window = Window.orderBy('time')\
           .rowsBetween(-sys.maxsize, 0)

请注意,我们按时间排序,因此数据的顺序是正确的。另外请注意,使用“-sys.maxsize”可以确保窗口始终包含所有以前的数据,并且在自上而下遍历数据时不断增长,但是可能有更有效的解决方案。
使用“last”函数,我们总是处理窗口中的最后一行。通过传递“ignorenulls=true”,我们定义如果当前行为null,那么函数将返回窗口中最近(最后)的非null值。否则将使用实际行的值。
完成。

相关问题