我有一个Spark框架,我必须在其中创建一个窗口分区列(“desired_output”)。如果排序顺序值中没有第一个非空值,则该列必须回填非空值,并向前填充其他非空值。
下面是一个带有所需输出的Spark框架示例:
columns = ['user_id', 'date', 'desired_outcome']
data = [\
('1', None, '2022-01-05'),\
('1', None, '2022-01-05'),\
('1', '2022-01-05', '2022-01-05'),\
('1', None, '2022-01-05'),\
('1', None, '2022-01-05'),\
('2', None, '2022-01-07'),\
('2', None, '2022-01-07'),\
('2','2022-01-07', '2022-01-07'),\
('2',None, '2022-01-07'),\
('2','2022-01-09', '2022-01-09'),\
('2',None, '2022-01-09'),\
('3','2022-01-01', '2022-01-01'),\
('3',None, '2022-01-01'),\
('3',None, '2022-01-01'),\
('3','2022-01-04', '2022-01-04'),\
('3',None, '2022-01-04'),\
('3',None, '2022-01-04')]
sample_df = spark.createDataFrame(data, columns)
字符串
[更新]
基于@user238607的建议解决方案(请参阅下面的第一个答案)我应该添加此更新,概述所需的输出不依赖于日期列值的时间顺序(或任何其他)顺序-所需的输出仅取决于具有相同user_id
的组内记录的原始顺序(此值列可以是任何类型的一般情况:数字,字符串等).我已经使用了建议的代码与其他输入和计算的期望输出的一些版本是不太正确-不正确的计算值在correct
列中标记为三个破折号:
from pyspark.sql import Window
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
sc = spark #SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
('1', None, '2022-02-12'),
('1', None, '2022-02-12'),
('1', '2022-02-12', '2022-02-12'),
('1', None, '2022-02-12'),
('1', None, '2022-02-12'),
('2', None, '2022-04-09'),
('2', None, '2022-04-09'),
('2','2022-04-09', '2022-04-09'),
('2',None, '2022-04-09'),
('2','2022-01-07', '2022-01-07'),
('2',None, '2022-01-07'),
('3','2022-11-05', '2022-11-05'),
('3',None, '2022-11-05'),
('3',None, '2022-11-05'),
('3','2022-01-04', '2022-01-04'),
('3',None, '2022-01-04'),
('3',None, '2022-01-04'),
('3','2022-04-15', '2022-04-15'),
('3',None, '2022-04-15'),
]
columns = ['row_id', 'user_id', 'date', 'desired_outcome_given']
## following is done so that we have order of the rows.
data2 = [ (index, item[0], item[1], item[2]) for index, item in enumerate(data1) ]
df1 = sqlContext.createDataFrame(data=data2, schema=columns)
print("Given dataframe")
df1.show(n=10, truncate=False)
window_min_spec = Window.partitionBy("user_id").orderBy(F.col("row_id").asc()).rowsBetween(0, Window.unboundedFollowing)
window_max_spec = Window.partitionBy("user_id").orderBy(F.col("row_id").asc()).rowsBetween(Window.unboundedPreceding, 0)
df1 = df1.withColumn("first_date", F.first("date", ignorenulls=True).over(window_min_spec))
df1 = df1.withColumn("last_date", F.last("date", ignorenulls=True).over(window_max_spec))
print("Calculated first and last dates")
df1.show(truncate=False)
print("Final dataframe")
#output = df1.select('row_id', 'user_id', 'date').withColumn("desired_outcome_calculated", F.least(*["min_date", "max_date"])).select("desired_outcome_given", "desired_outcome_calculated")
output = df1.\
withColumn("desired_outcome_calculated", F.least(*["first_date", "last_date"]))\
.withColumn("correct", F.when(F.col("desired_outcome_given") == F.col("desired_outcome_calculated"),F.lit('true')).otherwise('---'))\
.select('row_id', 'user_id', 'date', "desired_outcome_given", "desired_outcome_calculated", "correct")
output.show(truncate=False)
型
输出量:
Given dataframe
+------+-------+----------+---------------------+
|row_id|user_id|date |desired_outcome_given|
+------+-------+----------+---------------------+
|0 |1 |NULL |2022-02-12 |
|1 |1 |NULL |2022-02-12 |
|2 |1 |2022-02-12|2022-02-12 |
|3 |1 |NULL |2022-02-12 |
|4 |1 |NULL |2022-02-12 |
|5 |2 |NULL |2022-04-09 |
|6 |2 |NULL |2022-04-09 |
|7 |2 |2022-04-09|2022-04-09 |
|8 |2 |NULL |2022-04-09 |
|9 |2 |2022-01-07|2022-01-07 |
+------+-------+----------+---------------------+
only showing top 10 rows
Calculated first and last dates
+------+-------+----------+---------------------+----------+----------+
|row_id|user_id|date |desired_outcome_given|first_date|last_date |
+------+-------+----------+---------------------+----------+----------+
|0 |1 |NULL |2022-02-12 |2022-02-12|NULL |
|1 |1 |NULL |2022-02-12 |2022-02-12|NULL |
|2 |1 |2022-02-12|2022-02-12 |2022-02-12|2022-02-12|
|3 |1 |NULL |2022-02-12 |NULL |2022-02-12|
|4 |1 |NULL |2022-02-12 |NULL |2022-02-12|
|5 |2 |NULL |2022-04-09 |2022-04-09|NULL |
|6 |2 |NULL |2022-04-09 |2022-04-09|NULL |
|7 |2 |2022-04-09|2022-04-09 |2022-04-09|2022-04-09|
|8 |2 |NULL |2022-04-09 |2022-01-07|2022-04-09|
|9 |2 |2022-01-07|2022-01-07 |2022-01-07|2022-01-07|
|10 |2 |NULL |2022-01-07 |NULL |2022-01-07|
|11 |3 |2022-11-05|2022-11-05 |2022-11-05|2022-11-05|
|12 |3 |NULL |2022-11-05 |2022-01-04|2022-11-05|
|13 |3 |NULL |2022-11-05 |2022-01-04|2022-11-05|
|14 |3 |2022-01-04|2022-01-04 |2022-01-04|2022-01-04|
|15 |3 |NULL |2022-01-04 |2022-04-15|2022-01-04|
|16 |3 |NULL |2022-01-04 |2022-04-15|2022-01-04|
|17 |3 |2022-04-15|2022-04-15 |2022-04-15|2022-04-15|
|18 |3 |NULL |2022-04-15 |NULL |2022-04-15|
+------+-------+----------+---------------------+----------+----------+
Final dataframe
+------+-------+----------+---------------------+--------------------------+-------+
|row_id|user_id|date |desired_outcome_given|desired_outcome_calculated|correct|
+------+-------+----------+---------------------+--------------------------+-------+
|0 |1 |NULL |2022-02-12 |2022-02-12 |true |
|1 |1 |NULL |2022-02-12 |2022-02-12 |true |
|2 |1 |2022-02-12|2022-02-12 |2022-02-12 |true |
|3 |1 |NULL |2022-02-12 |2022-02-12 |true |
|4 |1 |NULL |2022-02-12 |2022-02-12 |true |
|5 |2 |NULL |2022-04-09 |2022-04-09 |true |
|6 |2 |NULL |2022-04-09 |2022-04-09 |true |
|7 |2 |2022-04-09|2022-04-09 |2022-04-09 |true |
|8 |2 |NULL |2022-04-09 |2022-01-07 |--- |
|9 |2 |2022-01-07|2022-01-07 |2022-01-07 |true |
|10 |2 |NULL |2022-01-07 |2022-01-07 |true |
|11 |3 |2022-11-05|2022-11-05 |2022-11-05 |true |
|12 |3 |NULL |2022-11-05 |2022-01-04 |--- |
|13 |3 |NULL |2022-11-05 |2022-01-04 |--- |
|14 |3 |2022-01-04|2022-01-04 |2022-01-04 |true |
|15 |3 |NULL |2022-01-04 |2022-01-04 |true |
|16 |3 |NULL |2022-01-04 |2022-01-04 |true |
|17 |3 |2022-04-15|2022-04-15 |2022-04-15 |true |
|18 |3 |NULL |2022-04-15 |2022-04-15 |true |
+------+-------+----------+---------------------+--------------------------+-------+
型
2条答案
按热度按时间piv4azn71#
你可以这样做。
字符串
输出量:
型
编辑:根据新的更新,这是我会怎么做。
型
输出如下:
型
qxsslcnc2#
基于@user238607的一个非常有用的建议答案(见上文),我做了一些功课,这里是我一直在寻找的通用实用程序向前/向后填充方法:
如果您发现任何错误,并且您知道更简洁有效的解决方案,请提供您的评论/更正,因为开发的方法应该应用于具有数十万数据行的相当大的源数据集。