通过使用两列的组合运行window对pysparkDataframe进行计数

2nc8po8w  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(262)

我有一个sparkDataframe(v.2.2.0),其中我想计算(通过组键)在某个时间范围内发生的所有事件,例如,从每行事件的开始日期到其他行的结束日期的5天。比如说


# For the sake of simplicity I included only 1 user, but there are multiple users

+-------------------+-------------------+-----+
|end_date           |start_date         |uid  |
+-------------------+-------------------+-----+
|2020-11-26 09:30:28|2020-11-26 08:30:22|user1|
|2020-11-26 10:41:00|2020-11-26 10:00:00|user1|
|2020-11-22 12:40:27|2020-11-22 08:37:18|user1|
|2020-11-22 15:22:20|2020-11-22 13:32:30|user1|
|2020-11-20 17:20:07|2020-11-20 16:04:04|user1|

我定义了一个窗口

days = lambda i: i * 86400 # 60*60*24 = number of seconds in a day
w = (Window()
   .partitionBy(col("uid"))
   .orderBy(col("end_date").cast("timestamp").cast("long"))
   .rangeBetween(-days(5), 0))

我对着Windows算了一下:

by_end = df.select(col("start_date"), f.count("end_date").over(w).alias("count"))
df = df.join(by_end, 'start_date', how='left')

我会得到这个:

+-------------------+-------------------+-----+-------+
|end_date           |start_date         |uid  |count  |
+-------------------+-------------------+-----+-------+
|2020-11-26 09:30:28|2020-11-26 08:30:22|user1|  4    |
|2020-11-26 10:41:00|2020-11-26 10:00:00|user1|  3    |
|2020-11-22 12:40:27|2020-11-22 08:37:18|user1|  3    |
|2020-11-22 15:22:20|2020-11-22 13:32:30|user1|  2    |
|2020-11-20 17:20:07|2020-11-20 16:04:04|user1|  1    |

但是,据我所知,这将按结束日期进行滚动窗口计数,这几乎是正确的,因为我需要从当前事件的开始日期到所有其他事件的结束日期。
有什么建议吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题