使用pyspark在Spark Dataframe中向后和向前填充空值

k5ifujac  于 12个月前  发布在  Apache
关注(0)|答案(2)|浏览(152)

我有一个Spark框架,我必须在其中创建一个窗口分区列(“desired_output”)。如果排序顺序值中没有第一个非空值,则该列必须回填非空值,并向前填充其他非空值。
下面是一个带有所需输出的Spark框架示例:

columns = ['user_id', 'date', 'desired_outcome']
data = [\
        ('1', None,         '2022-01-05'),\
        ('1', None,         '2022-01-05'),\
        ('1', '2022-01-05', '2022-01-05'),\
        ('1', None,         '2022-01-05'),\
        ('1', None,         '2022-01-05'),\
        ('2', None,         '2022-01-07'),\
        ('2', None,         '2022-01-07'),\
        ('2','2022-01-07',  '2022-01-07'),\
        ('2',None,          '2022-01-07'),\
        ('2','2022-01-09',  '2022-01-09'),\
        ('2',None,          '2022-01-09'),\
        ('3','2022-01-01',  '2022-01-01'),\
        ('3',None,          '2022-01-01'),\
        ('3',None,          '2022-01-01'),\
        ('3','2022-01-04',  '2022-01-04'),\
        ('3',None,          '2022-01-04'),\
        ('3',None,          '2022-01-04')]

sample_df = spark.createDataFrame(data, columns)

字符串

[更新]

基于@user238607的建议解决方案(请参阅下面的第一个答案)我应该添加此更新,概述所需的输出不依赖于日期列值的时间顺序(或任何其他)顺序-所需的输出仅取决于具有相同user_id的组内记录的原始顺序(此值列可以是任何类型的一般情况:数字,字符串等).我已经使用了建议的代码与其他输入和计算的期望输出的一些版本是不太正确-不正确的计算值在correct列中标记为三个破折号:

from pyspark.sql import Window
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F

sc = spark #SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
        ('1', None,         '2022-02-12'),
        ('1', None,         '2022-02-12'),
        ('1', '2022-02-12', '2022-02-12'),
        ('1', None,         '2022-02-12'),
        ('1', None,         '2022-02-12'),
        ('2', None,         '2022-04-09'),
        ('2', None,         '2022-04-09'),
        ('2','2022-04-09',  '2022-04-09'),
        ('2',None,          '2022-04-09'),
        ('2','2022-01-07',  '2022-01-07'),
        ('2',None,          '2022-01-07'),
        ('3','2022-11-05',  '2022-11-05'),
        ('3',None,          '2022-11-05'),
        ('3',None,          '2022-11-05'),
        ('3','2022-01-04',  '2022-01-04'),
        ('3',None,          '2022-01-04'),
        ('3',None,          '2022-01-04'),
        ('3','2022-04-15',  '2022-04-15'),
        ('3',None,          '2022-04-15'),
        
        ]

columns = ['row_id', 'user_id', 'date', 'desired_outcome_given']

## following is done so that we have order of the rows.
data2 = [ (index, item[0], item[1], item[2]) for index, item in enumerate(data1) ]

df1 = sqlContext.createDataFrame(data=data2, schema=columns)

print("Given dataframe")
df1.show(n=10, truncate=False)

window_min_spec = Window.partitionBy("user_id").orderBy(F.col("row_id").asc()).rowsBetween(0, Window.unboundedFollowing)
window_max_spec = Window.partitionBy("user_id").orderBy(F.col("row_id").asc()).rowsBetween(Window.unboundedPreceding, 0)

df1 = df1.withColumn("first_date", F.first("date", ignorenulls=True).over(window_min_spec))
df1 = df1.withColumn("last_date", F.last("date", ignorenulls=True).over(window_max_spec))
print("Calculated first and last dates")
df1.show(truncate=False)
print("Final dataframe")
#output = df1.select('row_id', 'user_id', 'date').withColumn("desired_outcome_calculated", F.least(*["min_date", "max_date"])).select("desired_outcome_given", "desired_outcome_calculated")
output = df1.\
            withColumn("desired_outcome_calculated", F.least(*["first_date", "last_date"]))\
            .withColumn("correct", F.when(F.col("desired_outcome_given") == F.col("desired_outcome_calculated"),F.lit('true')).otherwise('---'))\
            .select('row_id', 'user_id', 'date', "desired_outcome_given", "desired_outcome_calculated", "correct")
output.show(truncate=False)


输出量:

Given dataframe
+------+-------+----------+---------------------+
|row_id|user_id|date      |desired_outcome_given|
+------+-------+----------+---------------------+
|0     |1      |NULL      |2022-02-12           |
|1     |1      |NULL      |2022-02-12           |
|2     |1      |2022-02-12|2022-02-12           |
|3     |1      |NULL      |2022-02-12           |
|4     |1      |NULL      |2022-02-12           |
|5     |2      |NULL      |2022-04-09           |
|6     |2      |NULL      |2022-04-09           |
|7     |2      |2022-04-09|2022-04-09           |
|8     |2      |NULL      |2022-04-09           |
|9     |2      |2022-01-07|2022-01-07           |
+------+-------+----------+---------------------+
only showing top 10 rows

Calculated first and last dates
+------+-------+----------+---------------------+----------+----------+
|row_id|user_id|date      |desired_outcome_given|first_date|last_date |
+------+-------+----------+---------------------+----------+----------+
|0     |1      |NULL      |2022-02-12           |2022-02-12|NULL      |
|1     |1      |NULL      |2022-02-12           |2022-02-12|NULL      |
|2     |1      |2022-02-12|2022-02-12           |2022-02-12|2022-02-12|
|3     |1      |NULL      |2022-02-12           |NULL      |2022-02-12|
|4     |1      |NULL      |2022-02-12           |NULL      |2022-02-12|
|5     |2      |NULL      |2022-04-09           |2022-04-09|NULL      |
|6     |2      |NULL      |2022-04-09           |2022-04-09|NULL      |
|7     |2      |2022-04-09|2022-04-09           |2022-04-09|2022-04-09|
|8     |2      |NULL      |2022-04-09           |2022-01-07|2022-04-09|
|9     |2      |2022-01-07|2022-01-07           |2022-01-07|2022-01-07|
|10    |2      |NULL      |2022-01-07           |NULL      |2022-01-07|
|11    |3      |2022-11-05|2022-11-05           |2022-11-05|2022-11-05|
|12    |3      |NULL      |2022-11-05           |2022-01-04|2022-11-05|
|13    |3      |NULL      |2022-11-05           |2022-01-04|2022-11-05|
|14    |3      |2022-01-04|2022-01-04           |2022-01-04|2022-01-04|
|15    |3      |NULL      |2022-01-04           |2022-04-15|2022-01-04|
|16    |3      |NULL      |2022-01-04           |2022-04-15|2022-01-04|
|17    |3      |2022-04-15|2022-04-15           |2022-04-15|2022-04-15|
|18    |3      |NULL      |2022-04-15           |NULL      |2022-04-15|
+------+-------+----------+---------------------+----------+----------+

Final dataframe
+------+-------+----------+---------------------+--------------------------+-------+
|row_id|user_id|date      |desired_outcome_given|desired_outcome_calculated|correct|
+------+-------+----------+---------------------+--------------------------+-------+
|0     |1      |NULL      |2022-02-12           |2022-02-12                |true   |
|1     |1      |NULL      |2022-02-12           |2022-02-12                |true   |
|2     |1      |2022-02-12|2022-02-12           |2022-02-12                |true   |
|3     |1      |NULL      |2022-02-12           |2022-02-12                |true   |
|4     |1      |NULL      |2022-02-12           |2022-02-12                |true   |
|5     |2      |NULL      |2022-04-09           |2022-04-09                |true   |
|6     |2      |NULL      |2022-04-09           |2022-04-09                |true   |
|7     |2      |2022-04-09|2022-04-09           |2022-04-09                |true   |
|8     |2      |NULL      |2022-04-09           |2022-01-07                |---    |
|9     |2      |2022-01-07|2022-01-07           |2022-01-07                |true   |
|10    |2      |NULL      |2022-01-07           |2022-01-07                |true   |
|11    |3      |2022-11-05|2022-11-05           |2022-11-05                |true   |
|12    |3      |NULL      |2022-11-05           |2022-01-04                |---    |
|13    |3      |NULL      |2022-11-05           |2022-01-04                |---    |
|14    |3      |2022-01-04|2022-01-04           |2022-01-04                |true   |
|15    |3      |NULL      |2022-01-04           |2022-01-04                |true   |
|16    |3      |NULL      |2022-01-04           |2022-01-04                |true   |
|17    |3      |2022-04-15|2022-04-15           |2022-04-15                |true   |
|18    |3      |NULL      |2022-04-15           |2022-04-15                |true   |
+------+-------+----------+---------------------+--------------------------+-------+

piv4azn7

piv4azn71#

你可以这样做。

from pyspark.sql import Window
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
        ('1', None,         '2022-01-05'),
        ('1', None,         '2022-01-05'),
        ('1', '2022-01-05', '2022-01-05'),
        ('1', None,         '2022-01-05'),
        ('1', None,         '2022-01-05'),
        ('2', None,         '2022-01-07'),
        ('2', None,         '2022-01-07'),
        ('2','2022-01-07',  '2022-01-07'),
        ('2',None,          '2022-01-07'),
        ('2','2022-01-09',  '2022-01-09'),
        ('2',None,          '2022-01-09'),
        ('3','2022-01-01',  '2022-01-01'),
        ('3',None,          '2022-01-01'),
        ('3',None,          '2022-01-01'),
        ('3','2022-01-04',  '2022-01-04'),
        ('3',None,          '2022-01-04'),
        ('3',None,          '2022-01-04')]

columns = ['row_id', 'user_id', 'date', 'desired_outcome_given']

## following is done so that we have order of the rows.
data2 = [ (index, item[0], item[1], item[2]) for index, item in enumerate(data1) ]

df1 = sqlContext.createDataFrame(data=data2, schema=columns)

print("Given dataframe")
df1.show(n=10, truncate=False)

window_min_spec = Window.partitionBy("user_id").orderBy(F.col("row_id").asc()).rowsBetween(0, Window.unboundedFollowing)
window_max_spec = Window.partitionBy("user_id").orderBy(F.col("row_id").asc()).rowsBetween(Window.unboundedPreceding, 0)

df1 = df1.withColumn("min_date", F.first("date", ignorenulls=True).over(window_min_spec))
df1 = df1.withColumn("max_date", F.last("date", ignorenulls=True).over(window_max_spec))
print("Calculated min and max")
df1.show(truncate=False)
print("Final dataframe")
output = df1.withColumn("desired_outcome_calculated", F.least(*["min_date", "max_date"])).select("desired_outcome_given", "desired_outcome_calculated")
output.show(truncate=False)

字符串
输出量:

Given dataframe
+------+-------+----------+---------------------+
|row_id|user_id|date      |desired_outcome_given|
+------+-------+----------+---------------------+
|0     |1      |null      |2022-01-05           |
|1     |1      |null      |2022-01-05           |
|2     |1      |2022-01-05|2022-01-05           |
|3     |1      |null      |2022-01-05           |
|4     |1      |null      |2022-01-05           |
|5     |2      |null      |2022-01-07           |
|6     |2      |null      |2022-01-07           |
|7     |2      |2022-01-07|2022-01-07           |
|8     |2      |null      |2022-01-07           |
|9     |2      |2022-01-09|2022-01-09           |
+------+-------+----------+---------------------+
only showing top 10 rows

Calculated min and max
+------+-------+----------+---------------------+----------+----------+
|row_id|user_id|date      |desired_outcome_given|min_date  |max_date  |
+------+-------+----------+---------------------+----------+----------+
|0     |1      |null      |2022-01-05           |2022-01-05|null      |
|1     |1      |null      |2022-01-05           |2022-01-05|null      |
|2     |1      |2022-01-05|2022-01-05           |2022-01-05|2022-01-05|
|3     |1      |null      |2022-01-05           |null      |2022-01-05|
|4     |1      |null      |2022-01-05           |null      |2022-01-05|
|5     |2      |null      |2022-01-07           |2022-01-07|null      |
|6     |2      |null      |2022-01-07           |2022-01-07|null      |
|7     |2      |2022-01-07|2022-01-07           |2022-01-07|2022-01-07|
|8     |2      |null      |2022-01-07           |2022-01-09|2022-01-07|
|9     |2      |2022-01-09|2022-01-09           |2022-01-09|2022-01-09|
|10    |2      |null      |2022-01-09           |null      |2022-01-09|
|11    |3      |2022-01-01|2022-01-01           |2022-01-01|2022-01-01|
|12    |3      |null      |2022-01-01           |2022-01-04|2022-01-01|
|13    |3      |null      |2022-01-01           |2022-01-04|2022-01-01|
|14    |3      |2022-01-04|2022-01-04           |2022-01-04|2022-01-04|
|15    |3      |null      |2022-01-04           |null      |2022-01-04|
|16    |3      |null      |2022-01-04           |null      |2022-01-04|
+------+-------+----------+---------------------+----------+----------+

Final dataframe
+---------------------+--------------------------+
|desired_outcome_given|desired_outcome_calculated|
+---------------------+--------------------------+
|2022-01-05           |2022-01-05                |
|2022-01-05           |2022-01-05                |
|2022-01-05           |2022-01-05                |
|2022-01-05           |2022-01-05                |
|2022-01-05           |2022-01-05                |
|2022-01-07           |2022-01-07                |
|2022-01-07           |2022-01-07                |
|2022-01-07           |2022-01-07                |
|2022-01-07           |2022-01-07                |
|2022-01-09           |2022-01-09                |
|2022-01-09           |2022-01-09                |
|2022-01-01           |2022-01-01                |
|2022-01-01           |2022-01-01                |
|2022-01-01           |2022-01-01                |
|2022-01-04           |2022-01-04                |
|2022-01-04           |2022-01-04                |
|2022-01-04           |2022-01-04                |
+---------------------+--------------------------+


编辑:根据新的更新,这是我会怎么做。

columns = ['row_id', 'user_id', 'date', 'desired_outcome_given']

## following is done so that we have order of the rows.
data2 = [ (index, item[0], item[1], item[2]) for index, item in enumerate(data1) ]

df1 = sqlContext.createDataFrame(data=data2, schema=columns)

print("Given dataframe")
df1.show(n=10, truncate=False)

window_fillup_notnull_spec = Window.partitionBy("user_id").orderBy(F.col("row_id").asc()).rowsBetween(Window.unboundedPreceding, 0 )
window_fillbelow_notnull_spec = Window.partitionBy("user_id").orderBy(F.col("row_id").asc()).rowsBetween( 0, Window.unboundedFollowing)

df1 = df1.withColumn("up_values", F.last("date", ignorenulls=True).over(window_fillup_notnull_spec))
df1 = df1.withColumn("below_values", F.first("date", ignorenulls=True).over(window_fillbelow_notnull_spec))
print("Calculated min and max")
df1.show(truncate=False)
print("Final dataframe")
output = df1.withColumn("desired_outcome_calculated", F.when(  F.col("up_values").isNotNull(), F.col("up_values"))
                                                             .otherwise(F.col("below_values")) ).select("desired_outcome_given", "desired_outcome_calculated")
output.show(truncate=False)


输出如下:

Final dataframe
+---------------------+--------------------------+
|desired_outcome_given|desired_outcome_calculated|
+---------------------+--------------------------+
|2022-02-12           |2022-02-12                |
|2022-02-12           |2022-02-12                |
|2022-02-12           |2022-02-12                |
|2022-02-12           |2022-02-12                |
|2022-02-12           |2022-02-12                |
|2022-04-09           |2022-04-09                |
|2022-04-09           |2022-04-09                |
|2022-04-09           |2022-04-09                |
|2022-04-09           |2022-04-09                |
|2022-01-07           |2022-01-07                |
|2022-01-07           |2022-01-07                |
|2022-11-05           |2022-11-05                |
|2022-11-05           |2022-11-05                |
|2022-11-05           |2022-11-05                |
|2022-01-04           |2022-01-04                |
|2022-01-04           |2022-01-04                |
|2022-01-04           |2022-01-04                |
|2022-04-15           |2022-04-15                |
|2022-04-15           |2022-04-15                |
+---------------------+--------------------------+

qxsslcnc

qxsslcnc2#

基于@user238607的一个非常有用的建议答案(见上文),我做了一些功课,这里是我一直在寻找的通用实用程序向前/向后填充方法:

from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
sc = SparkSession\
    .builder\
    .master("local[*]")\
    .appName('fb_fill_method_sample')\
    .getOrCreate()
def fb_fill(df, 
            grouping_column_name, 
            value_column_name, 
            row_num_column_name = "row_num",
            temp_column_name = "temp_column",
            preserve_generated_row_num = False):
    dfx = df
    row_num_column_exists = row_num_column_name in df.columns
    if (not(row_num_column_exists)):
      # add a unique consecutive row number to dataframe in pyspark
      # credits: https://stackoverflow.com/questions/53082891/adding-a-unique-consecutive-row-number-to-dataframe-in-pyspark
      dfx = dfx.withColumn(temp_column_name, F.lit("XYZ"))
      w = Window().partitionBy(temp_column_name).orderBy(F.lit('X'))
      dfx = dfx.withColumn(row_num_column_name, F.row_number().over(w)).drop(temp_column_name)

      # rearrange columns setting {row_num_column_name} as the first column (if it's a generated column)
      rearanged_cols = [row_num_column_name] + ([c for c in dfx.columns if c not in [row_num_column_name]])
      dfx = dfx.select(*rearanged_cols)

    window_first_spec = Window.partitionBy(grouping_column_name).orderBy(F.col(row_num_column_name).asc()).rowsBetween(0, Window.unboundedFollowing)
    window_last_spec = Window.partitionBy(grouping_column_name).orderBy(F.col(row_num_column_name).asc()).rowsBetween(Window.unboundedPreceding, 0)

    dfx = dfx.withColumn(temp_column_name, F.last(value_column_name, ignorenulls=True).over(window_last_spec))
    dfx = dfx.withColumn(value_column_name, F.when(F.col(temp_column_name).isNull(), F.first(value_column_name, ignorenulls=True).over(window_first_spec)).otherwise(F.col(temp_column_name)))
    
    dfx = dfx.drop(temp_column_name)
    if (not(row_num_column_exists) and not(preserve_generated_row_num)): dfx = dfx.drop(row_num_column_name)
    
    return dfx
data = [
        ('1', None,         '2022-02-12'),
        ('1', None,         '2022-02-12'),
        ('1', '2022-02-12', '2022-02-12'),
        ('1', None,         '2022-02-12'),
        ('1', None,         '2022-02-12'),
        ('2', None,         '2022-04-09'),
        ('2', None,         '2022-04-09'),
        ('2','2022-04-09',  '2022-04-09'),
        ('2',None,          '2022-04-09'),
        ('2','2022-01-07',  '2022-01-07'),
        ('2',None,          '2022-01-07'),
        ('3','2022-11-05',  '2022-11-05'),
        ('3',None,          '2022-11-05'),
        ('3',None,          '2022-11-05'),
        ('3','2022-01-04',  '2022-01-04'),
        ('3',None,          '2022-01-04'),
        ('3',None,          '2022-01-04'),
        ('3','2022-04-15',  '2022-04-15'),
        ('3',None,          '2022-04-15'),
        
        ]

columns = ['user_id', 'date', 'desired_value_given']

sample_df = sc.createDataFrame(data, columns)
result_df = fb_fill(sample_df, "user_id", "date")
result_df = result_df.withColumn("correct", F.when(F.col("date") == F.col("desired_value_given"), F.lit('true')).otherwise(F.lit('---')))
result_df.show()
+-------+----------+-------------------+-------+
|user_id|      date|desired_value_given|correct|
+-------+----------+-------------------+-------+
|      1|2022-02-12|         2022-02-12|   true|
|      1|2022-02-12|         2022-02-12|   true|
|      1|2022-02-12|         2022-02-12|   true|
|      1|2022-02-12|         2022-02-12|   true|
|      1|2022-02-12|         2022-02-12|   true|
|      2|2022-04-09|         2022-04-09|   true|
|      2|2022-04-09|         2022-04-09|   true|
|      2|2022-04-09|         2022-04-09|   true|
|      2|2022-04-09|         2022-04-09|   true|
|      2|2022-01-07|         2022-01-07|   true|
|      2|2022-01-07|         2022-01-07|   true|
|      3|2022-11-05|         2022-11-05|   true|
|      3|2022-11-05|         2022-11-05|   true|
|      3|2022-11-05|         2022-11-05|   true|
|      3|2022-01-04|         2022-01-04|   true|
|      3|2022-01-04|         2022-01-04|   true|
|      3|2022-01-04|         2022-01-04|   true|
|      3|2022-04-15|         2022-04-15|   true|
|      3|2022-04-15|         2022-04-15|   true|
+-------+----------+-------------------+-------+

如果您发现任何错误,并且您知道更简洁有效的解决方案,请提供您的评论/更正,因为开发的方法应该应用于具有数十万数据行的相当大的源数据集。

相关问题