如何在pyspark中跨多个逻辑偏移窗口重用相同的分区?

7d7tgy0s  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(369)

我有一个Dataframe df 使用以下模式(spark 2.4)

root
 |-- segId: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- val1: double (nullable = true)

哪里 segId 是段(将其视为唯一标识符) time 是进行测量时的时间戳 val1 是测量值
我需要计算 val1 超过几个 rangeBetween s。例如,我想计算过去1分钟,2分钟,…,每段100分钟的平均值。
我不想创建一个100个窗口(它被分割和排序100次)。我想创建一个物理窗口(分区) segId 订购方式 time 一次),然后使用 rangeBetween 过去n分钟(在先前分区集上的逻辑偏移量)。
仅计算最后1、2和3分钟的代码示例:

win_physical = Window.partitionBy("segId").orderBy(F.col("time").cast("long"))

df = (
    df.repartition("segId")
    .orderBy(F.col("time").cast("long"))
    .withColumn("mean1Mins", F.mean("val1").over(win_physical.rangeBetween( -(60-1), 0)))
    .withColumn(
        "mean2Mins", F.mean("val2").over(win_physical.rangeBetween(-(2*60-1), 0))
    )
    .withColumn(
        "mean3Mins", F.mean("val1").over(win_physical.rangeBetween(-(3*60-1), 0))
    )
    .show()
)

在上面的示例中,物理平面图显示使用了三个窗口

== Physical Plan ==
CollectLimit 21
+- *(6) Project [segId#0, cast(time#8 as string) AS time#102, cast(val1#2 as string) AS val1#97, cast(val2#3L as string) AS val2#98, cast(mean1Mins#63 as string) AS mean1Mins#99, cast(mean2Mins#71 as string) AS mean2Mins#100, cast(mean3Mins#80 as string) AS mean3Mins#101]
   +- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#81L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -179, currentrow$())) AS mean3Mins#80], [segId#0], [_w0#81L ASC NULLS FIRST]
      +- *(5) Sort [segId#0 ASC NULLS FIRST, _w0#81L ASC NULLS FIRST], false, 0
         +- *(5) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, mean2Mins#71, cast(time#8 as bigint) AS _w0#81L]
            +- Window [avg(val2#3L) windowspecdefinition(segId#0, _w0#72L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -119, currentrow$())) AS mean2Mins#71], [segId#0], [_w0#72L ASC NULLS FIRST]
               +- *(4) Sort [segId#0 ASC NULLS FIRST, _w0#72L ASC NULLS FIRST], false, 0
                  +- *(4) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, cast(time#8 as bigint) AS _w0#72L]
                     +- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#64L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -59, currentrow$())) AS mean1Mins#63], [segId#0], [_w0#64L ASC NULLS FIRST]
                        +- *(3) Sort [segId#0 ASC NULLS FIRST, _w0#64L ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(segId#0, 1000)
                              +- *(2) Project [segId#0, time#8, val1#2, val2#3L, cast(time#8 as bigint) AS _w0#64L]
                                 +- *(2) Sort [cast(time#8 as bigint) ASC NULLS FIRST], true, 0
                                    +- Exchange rangepartitioning(cast(time#8 as bigint) ASC NULLS FIRST, 1000)
                                       +- Exchange hashpartitioning(segId#0, 1000)
                                          +- *(1) Project [segId#0, cast(time#1 as timestamp) AS time#8, val1#2, val2#3L]
                                             +- Scan ExistingRDD[segId#0,time#1,val1#2,val2#3L]

我的问题是:
spark是否会为多个逻辑窗口重用同一物理分区(即一个分区和一次排序)?或者它会为每个人创建一个单独的分区和排序 rangeBetween (计算密集型)?
有没有关于在计算上改进上述逻辑的建议 rangeBetween 在同一个分区上?

mccptt67

mccptt671#

spark是否会为多个逻辑窗口重用同一物理分区(即一个分区和一次排序)?或者它会为每个rangebetween(计算密集型)创建单独的分区和排序吗?
我想是的。仅限 Exchange 操作员可以重新划分数据(靠近数据源) Scan ExistingRDD ).
对于同一分区上不同rangebetween上的自定义聚合,有没有改进上述逻辑的建议?
我不知道。对不起的。

相关问题