sparksql滑动窗口差分计算

67up9zun  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(364)

如何在spark中计算滑动窗口而不诉诸spark流?
注意:我不想使用 WINDOW PARTITION BY ORDER BY k ROWS 在当前时间之前/之后,但使用时间戳。这个 window 操作员有这样一种模式:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([{"a": "x", "b": "2021-02-21 01:00:00", "c": "3"},
                            {"a": "x", "b": "2021-02-21 02:00:00", "c": "4"},
                            {"a": "x", "b": "2021-02-21 03:00:00", "c": "2"}])

hour_interval = str(4) + ' hour'
sliding_window = str(60) + ' minute'
from pyspark.sql.functions import col, min, max, sum, when, lit, window, date_format
import time

df_aggregated_time_window = df.groupBy("a", window("b", windowDuration=hour_interval,
                                slideDuration=sliding_window, startTime="30 minute")).agg(min("c").alias("min_c"))
df_aggregated_time_window.show(truncate=False)

+---+------------------------------------------+-----+
|a  |window                                    |min_c|
+---+------------------------------------------+-----+
|x  |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3    |
|x  |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2    |
|x  |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3    |
|x  |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2    |
|x  |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2    |
|x  |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2    |
+---+------------------------------------------+-----+

我想要的结果是,对于3个输入行和3个输出行中的每一行,返回4小时基于时间的窗口(=state)的滑动增量,该窗口每小时提前一小时,每小时触发一次(但是,由于这是批处理,因此不流触发应该没那么重要)。
相反,我用基数>所需行数得到上面的结果。

编辑

期望输出:
输入:

x,2021-02-21 01:00:00",3
x,2021-02-21 02:00:00",4
x,2021-02-21 03:00:00",4
x,2021-02-21 04:00:00",1

输出:

x,2021-02-21 01:00:00", NULL // no single previous record to be found in the previous 3 hours (including self)
x,2021-02-21 02:00:00",3 // as we are currently only computing `min` for simplicity (later it should be max - min to see the deltas) within the last 3 hours the value is 3 (coincidentally the previous row)
x,2021-02-21 03:00:00",3 // within 4 hour window 3 is still the smallest
x,2021-02-21 04:00:00",1 // within the previous <= 3 hours (including self) 1 is smallest
enyaitl3

enyaitl31#

恐怕你的假设 window 表达式不正确。根据其文件:

def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column

给定指定列的时间戳,将行压缩为一个或多个时间窗口。窗口开始是包含的,但窗口结束是独占的,例如12:05将在窗口[12:05,12:10]中,但不在[12:00,12:05]中。。。
因此,在4小时窗口和1小时滑动步骤的情况下,将有6个桶可以应用聚合:

[2021-02-20 22:00:00, 2021-02-21 02:00:00)  <-- first bucket that contains the earliest b = 2021-02-21 01:00:00
[2021-02-20 23:00:00, 2021-02-21 03:00:00)
[2021-02-21 00:00:00, 2021-02-21 04:00:00)
[2021-02-21 01:00:00, 2021-02-21 05:00:00)
[2021-02-21 02:00:00, 2021-02-21 06:00:00)
[2021-02-21 03:00:00, 2021-02-21 07:00:00) <-- last bucket that contains the latest b = 2021-02-21 03:00:00

我不完全理解“我不想按顺序使用窗口分区…”,因为这将使您能够有效地满足您的要求,即为每个输入获取一个输出行,计算为当前小时和前3小时的状态。

相关问题