我有下面的例子,我运行在Spark 3.3
import pyspark.sql.functions as F
from pyspark.sql import Window
inputData = [
("1", 333),
("1", 222),
("1", 111),
("2", 334)
]
inputDf = spark.createDataFrame(inputData, schema=["id", "val"])
window = Window.partitionBy("id")
aggregatedDf = (
inputDf.withColumn("min_val", F.min(F.col("val")).over(window))
.withColumn("max_val", F.max(F.col("val")).over(window))
).show()
输出符合预期,我得到了每个窗口的正确最小值/最大值
+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
| 1|333| 111| 333|
| 1|222| 111| 333|
| 1|111| 111| 333|
| 2|334| 334| 334|
+---+---+-------+-------+
当我将orderBy添加到窗口时,输出不同:
一个二个一个一个
如您所见,使用降序排列max_value是可以的,但min_value会随记录而变化
我试着在docu或这里找到更多的信息,但没有运气。对我来说,这一点也不直观。
我的期望是Spark将扫描给定分区中的所有记录,并为分区中的每个记录分配最小/最大值,这在窗口内没有排序的情况下是正确的,但在添加排序后工作方式会有所不同
有人知道为什么它会这样吗?
1条答案
按热度按时间py49o6xq1#
您需要添加Frame以获得所需的输出。
根据Docs:
注意如果未定义顺序,则默认使用无边界窗口框架(rowFrame、unboundedPreceding、unboundedFollow)。如果定义了顺序,则默认使用增长窗口框架(rangeFrame、unboundedPreceding、currentRow)。
本质上,Spark或任何SQL在处理当前行的函数时,默认情况下都会考虑Window直到该行,通过将Frame添加为-unboundedPreceding到unboundedFollowing,我们要求Spark考虑整个窗口。
例如,当处理 Dataframe 中第二行的
min
函数时(以降序方式按值排序),Spark会将id=1
的窗口视为第一行和第二行(在unboundedPreceding和CURRENT_ROW之间)这样就行了
输出:
要了解有关帧的更多信息,请阅读https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html
https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-window-functions-7b4e39ad3c86