Context
在最近的SO-post中,我发现使用withColumn
可以在处理堆叠/链式列表达式和不同的windows规范时改进DAG。但是,在本例中,withColumn
实际上使DAG更差,并且与使用select
的结果不同。
可复制示例
首先,一些测试数据(PySpark 2.4.4独立版):
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dfp = pd.DataFrame(
{
"col1": np.random.randint(0, 5, size=100),
"col2": np.random.randint(0, 5, size=100),
"col3": np.random.randint(0, 5, size=100),
"col4": np.random.randint(0, 5, size=100),
"col5": np.random.randint(0, 5, size=100),
}
)
df = spark.createDataFrame(dfp)
df.show(5)
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
| 0| 3| 2| 2| 2|
| 1| 3| 3| 2| 4|
| 0| 0| 3| 3| 2|
| 3| 0| 1| 4| 4|
| 4| 0| 3| 3| 3|
+----+----+----+----+----+
only showing top 5 rows
这个例子很简单。中包含2个窗口规范和基于它们的4个独立列表达式:
w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")
col_w1_1 = F.max("col5").over(w1).alias("col_w1_1")
col_w1_2 = F.sum("col5").over(w1).alias("col_w1_2")
col_w2_1 = F.max("col5").over(w2).alias("col_w2_1")
col_w2_2 = F.sum("col5").over(w2).alias("col_w2_2")
expr = [col_w1_1, col_w1_2, col_w2_1, col_w2_2]
withColumn - 4 shuffles
如果withColumn
与交替窗口等级库一起使用,DAG会创建不必要的混洗:
df.withColumn("col_w1_1", col_w1_1)\
.withColumn("col_w2_1", col_w2_1)\
.withColumn("col_w1_2", col_w1_2)\
.withColumn("col_w2_2", col_w2_2)\
.explain()
== Physical Plan ==
Window [sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#147L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(4) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#143L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(3) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#145L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#141L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]
select - 2 shuffles
如果所有列都以select
传递,则DAG正确。
df.select("*", *expr).explain()
== Physical Plan ==
Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#119L, sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#121L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#115L, sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#117L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]
提问
关于为什么应该避免withColumn
,有一些现有的信息,但是它们主要涉及多次调用withColumn
,并且它们没有解决偏离DAG的问题(参见here和here)。有人知道withColumn
和select
之间的DAG为什么不同吗?Spark的优化算法应该适用于任何情况,不应该依赖于不同的方式来表达完全相同的事情。
先谢谢你了。
3条答案
按热度按时间7vux5j2d1#
这看起来像是由
withColumn
引起的内部投影的结果。它在Spark文档中有记录官方的建议是按照Jay的建议,在处理多个列时执行select操作
myzjeezk2#
当使用嵌套的withColumns和window函数时?
假设我想做:
我有很多内存问题和高溢出,即使是非常小的数据集。如果我使用select而不是withColumn做同样的事情,它执行得更快。
to94eoyn3#
关于@Victor3y的回答:
如果您熟悉SQL而不是Spark的内部工作原理,那么
withColumn
文档的含义可能并不完全明显:该方法在内部引入投影。因此,多次调用它,例如,通过循环来添加多个列可能会生成大计划,这可能会导致性能问题。
将这种差异可视化的一种方法是使用SQL:将
withColumn
看作是将原始DF查询 Package 在子查询中。因此,在使用带有
WITH
子句的SQL子查询的情况下,使用withColumn
是有意义的,但如果您只想添加N个独立的列,则最好使用select
。