Spark:在spark Dataframe 上,聚集函数和窗口函数之间有区别吗?

juud5qan  于 2022-11-25  发布在  Apache
关注(0)|答案(2)|浏览(169)

我想对Spark Dataframe(Spark 2.1)中的一列求和,有两种方法:
1-使用Window函数:

val windowing = Window.partitionBy("id")
dataframe
.withColumn("sum", sum(col("column_1")) over windowing)

2-使用agg函数:

dataframe
.groupBy("id")
.agg(sum(col("column_1")).alias("sum"))

就性能而言,最好的方法是什么?这两种方法有什么区别?

wlzqhblo

wlzqhblo1#

您可以在 windows 内(第一种情况)或群组时(第二种情况)使用聚总函数。不同之处在于,使用 windows 时,每个数据列都会与在其整个 windows 上计算的聚总结果相关联。然而,群组时,每个群组都会与该群组的聚总结果相关联(一组数据列变成只有一个数据列)。
在你的情况下,你会得到这个。

val dataframe = spark.range(6).withColumn("key", 'id % 2)
dataframe.show
+---+---+
| id|key|
+---+---+
|  0|  0|
|  1|  1|
|  2|  0|
|  3|  1|
|  4|  0|
|  5|  1|
+---+---+

情况1:窗口设置

val windowing = Window.partitionBy("key")
dataframe.withColumn("sum", sum(col("id")) over windowing).show
+---+---+---+                                                                   
| id|key|sum|
+---+---+---+
|  0|  0|  6|
|  2|  0|  6|
|  4|  0|  6|
|  1|  1|  9|
|  3|  1|  9|
|  5|  1|  9|
+---+---+---+

案例2:分组

dataframe.groupBy("key").agg(sum('id)).show
+---+-------+
|key|sum(id)|
+---+-------+
|  0|      6|
|  1|      9|
+---+-------+
v2g6jxz6

v2g6jxz62#

如@Oli所述,聚合函数可用于窗口(第一种情况)以及分组(第二种情况)。就性能而言,“分组聚合函数”比“窗口聚合函数”快得多。我们可以通过分析物理计划来可视化此功能。

df.groupBy("id").agg(sum($"expense").alias("total_expense")).explain()
df.show
+---+----------+                                                                   
|  id|  expense|
+---+----------+
|   1|      100|
|   2|      300|
|   1|      100|
|   3|      200|
+---+----------+

1-含窗口的聚合:

df.withColumn("total_expense", sum(col("expense")) over window).show
+---+----------+-------------------+                                                     
| id|   expense|      total_expense|
+---+----------+-------------------+
|  3|       200|                200|
|  1|       100|                200|
|  1|       100|                200|
|  2|       300|                300|
+---+----------+-------------------+

df.withColumn("total_expense", sum(col("expense")) over window).explain
== Physical Plan ==
Window [sum(cast(expense#9 as bigint)) windowspecdefinition(id#8, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total_expense#265L], [id#8]
+- *(2) Sort [id#8 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#8, 200), true, [id=#144]
      +- *(1) Project [_1#3 AS id#8, _2#4 AS expense#9]
         +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#4]
            +- Scan[obj#2]

2-使用GroupBy进行聚合:

df.groupBy("id").agg(sum($"expense").alias("total_expense")).show
+---+------------------+                                                             
| id|     total_expense|
+---+------------------+
|  3|               200|
|  1|               200|
|  2|               300|
+---+------------------+

df.groupBy("id").agg(sum($"expense").alias("total_expense")).explain()
    == Physical Plan ==
    *(2) HashAggregate(keys=[id#8], functions=[sum(cast(expense#9 as bigint))])
    +- Exchange hashpartitioning(id#8, 200), true, [id=#44]
       +- *(1) HashAggregate(keys=[id#8], functions=[partial_sum(cast(expense#9 as bigint))])
          +- *(1) Project [_1#3 AS id#8, _2#4 AS expense#9]
             +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#4]
                +- Scan[obj#2]

根据执行计划,我们可以看到,在窗口的情况下,有一个总洗牌和一个排序,而在groupby的情况下,有一个reduced洗牌(洗牌后,本地聚合partial_sum)。

相关问题