如何在spark中计算滑动窗口而不诉诸spark流?
注意:我不想使用 WINDOW PARTITION BY ORDER BY k ROWS
在当前时间之前/之后,但使用时间戳。这个 window
操作员有这样一种模式:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([{"a": "x", "b": "2021-02-21 01:00:00", "c": "3"},
{"a": "x", "b": "2021-02-21 02:00:00", "c": "4"},
{"a": "x", "b": "2021-02-21 03:00:00", "c": "2"}])
hour_interval = str(4) + ' hour'
sliding_window = str(60) + ' minute'
from pyspark.sql.functions import col, min, max, sum, when, lit, window, date_format
import time
df_aggregated_time_window = df.groupBy("a", window("b", windowDuration=hour_interval,
slideDuration=sliding_window, startTime="30 minute")).agg(min("c").alias("min_c"))
df_aggregated_time_window.show(truncate=False)
+---+------------------------------------------+-----+
|a |window |min_c|
+---+------------------------------------------+-----+
|x |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3 |
|x |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2 |
|x |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3 |
|x |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2 |
|x |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2 |
|x |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2 |
+---+------------------------------------------+-----+
我想要的结果是,对于3个输入行和3个输出行中的每一行,返回4小时基于时间的窗口(=state)的滑动增量,该窗口每小时提前一小时,每小时触发一次(但是,由于这是批处理,因此不流触发应该没那么重要)。
相反,我用基数>所需行数得到上面的结果。
编辑
期望输出:
输入:
x,2021-02-21 01:00:00",3
x,2021-02-21 02:00:00",4
x,2021-02-21 03:00:00",4
x,2021-02-21 04:00:00",1
输出:
x,2021-02-21 01:00:00", NULL // no single previous record to be found in the previous 3 hours (including self)
x,2021-02-21 02:00:00",3 // as we are currently only computing `min` for simplicity (later it should be max - min to see the deltas) within the last 3 hours the value is 3 (coincidentally the previous row)
x,2021-02-21 03:00:00",3 // within 4 hour window 3 is still the smallest
x,2021-02-21 04:00:00",1 // within the previous <= 3 hours (including self) 1 is smallest
1条答案
按热度按时间enyaitl31#
恐怕你的假设
window
表达式不正确。根据其文件:给定指定列的时间戳,将行压缩为一个或多个时间窗口。窗口开始是包含的,但窗口结束是独占的,例如12:05将在窗口[12:05,12:10]中,但不在[12:00,12:05]中。。。
因此,在4小时窗口和1小时滑动步骤的情况下,将有6个桶可以应用聚合:
我不完全理解“我不想按顺序使用窗口分区…”,因为这将使您能够有效地满足您的要求,即为每个输入获取一个输出行,计算为当前小时和前3小时的状态。