我有一个Dataframe df
使用以下模式(spark 2.4)
root
|-- segId: string (nullable = true)
|-- time: timestamp (nullable = true)
|-- val1: double (nullable = true)
哪里 segId
是段(将其视为唯一标识符) time
是进行测量时的时间戳 val1
是测量值
我需要计算 val1
超过几个 rangeBetween
s。例如,我想计算过去1分钟,2分钟,…,每段100分钟的平均值。
我不想创建一个100个窗口(它被分割和排序100次)。我想创建一个物理窗口(分区) segId
订购方式 time
一次),然后使用 rangeBetween
过去n分钟(在先前分区集上的逻辑偏移量)。
仅计算最后1、2和3分钟的代码示例:
win_physical = Window.partitionBy("segId").orderBy(F.col("time").cast("long"))
df = (
df.repartition("segId")
.orderBy(F.col("time").cast("long"))
.withColumn("mean1Mins", F.mean("val1").over(win_physical.rangeBetween( -(60-1), 0)))
.withColumn(
"mean2Mins", F.mean("val2").over(win_physical.rangeBetween(-(2*60-1), 0))
)
.withColumn(
"mean3Mins", F.mean("val1").over(win_physical.rangeBetween(-(3*60-1), 0))
)
.show()
)
在上面的示例中,物理平面图显示使用了三个窗口
== Physical Plan ==
CollectLimit 21
+- *(6) Project [segId#0, cast(time#8 as string) AS time#102, cast(val1#2 as string) AS val1#97, cast(val2#3L as string) AS val2#98, cast(mean1Mins#63 as string) AS mean1Mins#99, cast(mean2Mins#71 as string) AS mean2Mins#100, cast(mean3Mins#80 as string) AS mean3Mins#101]
+- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#81L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -179, currentrow$())) AS mean3Mins#80], [segId#0], [_w0#81L ASC NULLS FIRST]
+- *(5) Sort [segId#0 ASC NULLS FIRST, _w0#81L ASC NULLS FIRST], false, 0
+- *(5) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, mean2Mins#71, cast(time#8 as bigint) AS _w0#81L]
+- Window [avg(val2#3L) windowspecdefinition(segId#0, _w0#72L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -119, currentrow$())) AS mean2Mins#71], [segId#0], [_w0#72L ASC NULLS FIRST]
+- *(4) Sort [segId#0 ASC NULLS FIRST, _w0#72L ASC NULLS FIRST], false, 0
+- *(4) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, cast(time#8 as bigint) AS _w0#72L]
+- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#64L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -59, currentrow$())) AS mean1Mins#63], [segId#0], [_w0#64L ASC NULLS FIRST]
+- *(3) Sort [segId#0 ASC NULLS FIRST, _w0#64L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(segId#0, 1000)
+- *(2) Project [segId#0, time#8, val1#2, val2#3L, cast(time#8 as bigint) AS _w0#64L]
+- *(2) Sort [cast(time#8 as bigint) ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(cast(time#8 as bigint) ASC NULLS FIRST, 1000)
+- Exchange hashpartitioning(segId#0, 1000)
+- *(1) Project [segId#0, cast(time#1 as timestamp) AS time#8, val1#2, val2#3L]
+- Scan ExistingRDD[segId#0,time#1,val1#2,val2#3L]
我的问题是:
spark是否会为多个逻辑窗口重用同一物理分区(即一个分区和一次排序)?或者它会为每个人创建一个单独的分区和排序 rangeBetween
(计算密集型)?
有没有关于在计算上改进上述逻辑的建议 rangeBetween
在同一个分区上?
1条答案
按热度按时间mccptt671#
spark是否会为多个逻辑窗口重用同一物理分区(即一个分区和一次排序)?或者它会为每个rangebetween(计算密集型)创建单独的分区和排序吗?
我想是的。仅限
Exchange
操作员可以重新划分数据(靠近数据源)Scan ExistingRDD
).对于同一分区上不同rangebetween上的自定义聚合,有没有改进上述逻辑的建议?
我不知道。对不起的。