我试图得到一个窗口函数返回,并得到一个特定日期的前一行,不太确定是出了什么问题,但它是给我的前一行,而不是指定的日期行。为了计算这一点,我取当前行的日期,并找到当前星期一与该周的关系,如下所示
def previous_day(date, dayOfWeek):
return date_sub(next_day(date, "monday"), 7)
spark_df = spark_df.withColumn("last_monday", previous_day(spark_df['calendarday'], "monday"))
然后我计算当前日期和最近的前一个星期一之间的差值
d = F.datediff(spark_df['calendarday'], spark_df['last_monday'])
spark_df = spark_df.withColumn("daysSinceMonday",d)
从我的dayssincemonday值可以看出每行的值是正确的。接下来,我想创建一个窗口,并选择它的第一行,但范围他们的d值,我设置,但由于某种原因,它不起作用。
days = lambda i: i * 86400
w = (Window.partitionBy(column_list).orderBy(col('calendarday').cast("timestamp").cast("long")).rangeBetween(-days(d), 0))
spark_df = spark_df.withColumn('PreviousYearUnique', first("indexCP").over(w))
Starting Data Frame
## +---+-----------+-----------+--------+
## | id|calendarday|last_monday| indexCP|
## +---+-----------+-----------+--------+
## | 1|2015-01-05 | 2015-01-05| 0.0076|
## | 1|2015-01-06 | 2015-01-05| 0.0026|
## | 1|2015-01-07 | 2015-01-05| 0.0016|
## | 1|2015-01-08 | 2015-01-05| 0.0006|
## | 2|2015-01-09 | 2015-01-05| 0.0012|
## | 2|2015-01-10 | 2015-01-05| 0.0014|
## | 1|2015-01-12 | 2015-01-12| 0.0026|
## | 1|2015-01-13 | 2015-01-12| 0.0086|
## | 1|2015-01-14 | 2015-01-12| 0.0046|
## | 1|2015-01-15 | 2015-01-12| 0.0021|
## | 2|2015-01-16 | 2015-01-12| 0.0042|
## | 2|2015-01-17 | 2015-01-12| 0.0099|
## +---+-----------+-----------+--------+
New Data Frame Adding Previous last_mondays row indexCP as PreviousYearUnique
## +---+-----------+-----------+--------+--------------------+
## | id|calendarday|last_monday| indexCP| PreviousYearUnique |
## +---+-----------+-----------+--------+--------------------+
## | 1|2015-01-05 | 2015-01-05| 0.0076| 0.0076|
## | 1|2015-01-06 | 2015-01-05| 0.0026| 0.0076|
## | 1|2015-01-07 | 2015-01-05| 0.0016| 0.0076|
## | 1|2015-01-08 | 2015-01-05| 0.0006| 0.0076|
## | 2|2015-01-09 | 2015-01-05| 0.0012| 0.0076|
## | 2|2015-01-10 | 2015-01-05| 0.0014| 0.0076|
## | 1|2015-01-12 | 2015-01-12| 0.0026| 0.0026|
## | 1|2015-01-13 | 2015-01-12| 0.0086| 0.0026|
## | 1|2015-01-14 | 2015-01-12| 0.0046| 0.0026|
## | 1|2015-01-15 | 2015-01-12| 0.0021| 0.0026|
## | 2|2015-01-16 | 2015-01-12| 0.0042| 0.0026|
## | 2|2015-01-17 | 2015-01-12| 0.0099| 0.0026|
## +---+-----------+-----------+--------+--------------------+
有什么问题吗?
1条答案
按热度按时间oo7oh9g91#
你可以的
partitionBy
last_monday
结束calendarday
在unboundedPreceding
窗口,然后使用first
```from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("last_monday")
.orderBy(F.to_date("calendarday","yyyy-MM-dd"))
.rowsBetween(Window.unboundedPreceding,Window.currentRow)
df.withColumn("PreviousYearUnique", F.first("indexCP").over(w)).show()
+---+-----------+-----------+-------+------------------+
| id|calendarday|last_monday|indexCP|PreviousYearUnique|
+---+-----------+-----------+-------+------------------+
| 1| 2015-01-05| 2015-01-05| 0.0076| 0.0076|
| 1| 2015-01-06| 2015-01-05| 0.0026| 0.0076|
| 1| 2015-01-07| 2015-01-05| 0.0016| 0.0076|
| 1| 2015-01-08| 2015-01-05| 6.0E-4| 0.0076|
| 2| 2015-01-09| 2015-01-05| 0.0012| 0.0076|
| 2| 2015-01-10| 2015-01-05| 0.0014| 0.0076|
| 1| 2015-01-12| 2015-01-12| 0.0026| 0.0026|
| 1| 2015-01-13| 2015-01-12| 0.0086| 0.0026|
| 1| 2015-01-14| 2015-01-12| 0.0046| 0.0026|
| 1| 2015-01-15| 2015-01-12| 0.0021| 0.0026|
| 2| 2015-01-16| 2015-01-12| 0.0042| 0.0026|
| 2| 2015-01-17| 2015-01-12| 0.0099| 0.0026|
+---+-----------+-----------+-------+------------------+