带窗口的Spark PySpark递增值延迟/上一次函数

sd2nnvve  于 2022-10-07  发布在  Spark
关注(0)|答案(1)|浏览(147)

我有一个这样的 Dataframe (Spark源)
Customer|Date|ol_x|day_number
-|-|
1|2022-05-10|...(val无关紧要)|空
1|2022-05-11|...|空
|2022-05-12|...|0
1|2022-05-13|...|空
1|2022-05-14|...|空
|2022年05月15日|...|3
|2022-05-16|...|4
1|2022-05-17|...|空
|2022-05-18|...|6
2|2022-05-10|...(val无关紧要)|空
2|2022-05-11|...|空
|2022-05-12|...|0
2|2022-05-13|...|空
|2022年05月14日|...|2
|2022年05月15日|...|3
2|2022-05-16|...|空
|2022年05月17日|...|5
2|2022-05-18|...|空

这种模式在许多客户身上重复出现。

我想要的是填充介于5/13和5/14的客户1之间的数字(例如,对于5/13和5/14的客户1,天数将是1和2),它应该始终递增1,因为每个日期值都是填充的。

我尝试了一个Lag函数,我相信这会起作用(下面的代码),但我想知道是否有其他方法可以做到这一点,也许可以使用row_number()从每个组的第一个非空的day_number值开始。

这就是我试过的

import pyspark.sql.functions as F

window = Window.partitionBy("customer").orderBy("date")

# LAG FUNCTION SEEMS TO WORK

df.select(
F.coalesce(F.col("day_number"), F.lit(F.lag(F.col("day_number")).over(window)) + 1)
)

# SOME ROW_NUMBER() FUNCTION , DOES NOT WORK --

df.select(
F.coalesce(F.col("day_number"), F.row_number().over(window) + F.lit(F.min("day_number").over(window) - 1)).alias("day_number")
)

我很好奇有没有其他的解决方案,我也在想有一种方法可以使用F.last(),但需要有一种方法来添加偏移量(+1,+2,+3),等等。

谢谢!

ttygqcqt

ttygqcqt1#

有一种解决方案绝对可以通过以下方式实现:

window = Window.partitionBy("customer").orderBy("date")

df.select(F.when(F.datediff(F.col("date"),F.first(F.when(F.col("day_number") == 0, F.col("date")), ignorenulls=True).over(fill_window))>=0,F.datediff(F.col("date"),F.first(F.when(F.col("day_number") == 0, F.col("date")), ignorenulls=True).over(fill_window))).alias("day_number_test")

但这似乎不是最好的解决方案,我很想知道是否存在其他立即合并到窗口函数中的解决方案。

相关问题