Spark窗口聚合函数无法直观地处理记录排序

tktrz96b  于 2023-01-31  发布在  Apache
关注(0)|答案(1)|浏览(122)

我有下面的例子,我运行在Spark 3.3

import pyspark.sql.functions as F
from pyspark.sql import Window

inputData = [
  ("1", 333),
  ("1", 222),
  ("1", 111),
  ("2", 334)
]
inputDf = spark.createDataFrame(inputData, schema=["id", "val"])

window = Window.partitionBy("id")
aggregatedDf = (
    inputDf.withColumn("min_val", F.min(F.col("val")).over(window))
    .withColumn("max_val", F.max(F.col("val")).over(window))
).show()

输出符合预期,我得到了每个窗口的正确最小值/最大值

+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
|  1|333|    111|    333|
|  1|222|    111|    333|
|  1|111|    111|    333|
|  2|334|    334|    334|
+---+---+-------+-------+

当我将orderBy添加到窗口时,输出不同:
一个二个一个一个
如您所见,使用降序排列max_value是可以的,但min_value会随记录而变化
我试着在docu或这里找到更多的信息,但没有运气。对我来说,这一点也不直观。
我的期望是Spark将扫描给定分区中的所有记录,并为分区中的每个记录分配最小/最大值,这在窗口内没有排序的情况下是正确的,但在添加排序后工作方式会有所不同
有人知道为什么它会这样吗?

py49o6xq

py49o6xq1#

您需要添加Frame以获得所需的输出。
根据Docs
注意如果未定义顺序,则默认使用无边界窗口框架(rowFrame、unboundedPreceding、unboundedFollow)。如果定义了顺序,则默认使用增长窗口框架(rangeFrame、unboundedPreceding、currentRow)。
本质上,Spark或任何SQL在处理当前行的函数时,默认情况下都会考虑Window直到该行,通过将Frame添加为-unboundedPreceding到unboundedFollowing,我们要求Spark考虑整个窗口。
例如,当处理 Dataframe 中第二行的min函数时(以降序方式按值排序),Spark会将id=1的窗口视为第一行和第二行(在unboundedPreceding和CURRENT_ROW之间)
这样就行了

window = Window.partitionBy("id")\
.orderBy(F.col("val").desc())\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

输出:

要了解有关帧的更多信息,请阅读https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html
https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-window-functions-7b4e39ad3c86

相关问题