我在使用一些pyspark代码的时候发现了一些我不理解或不期望的行为。我在另一列中编写了一些生成随机数据(二进制数)的代码,并对随机生成的数据进行了过滤。但是,当我多次执行.show()时,我经常看到不满足我的条件的数据。似乎我的随机生成数据函数被调用了两次;一次在过滤器之前,一次在过滤器之后。我尝试使用.explain(),但我不明白如何读取或解释它。我还不清楚应该按照什么顺序读取.explain()的输出。
有人能帮我理解一下,用.explain(),我做的简单代码,它的工作方式和我预期的非常不同吗?
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, udf
import random
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
_random_udf = udf(lambda x: int(random.randint(0, 1)), IntegerType())
inputDf = spark.createDataFrame([{'row': i} for i in range(10)])
random_result = inputDf.withColumn("status", _random_udf(col("row")))
non_zero_filter = random_result.filter(col('status') != 0)
non_zero_filter.show()
使用.show()的输出示例。我不希望在status
列中看到包含0s
的行。
| 横列|状态|
| - -|- -|
| 2个|一个|
| 三个|第0页|
| 五个|第0页|
| 六个|一个|
| 七个|第0页|
explain()的输出如下
non_zero_filter.explain()
== Physical Plan ==
== Physical Plan ==
*(3) Project [row#296L, pythonUDF0#314 AS status#299]
+- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#314]
+- *(2) Project [row#296L]
+- *(2) Filter NOT (pythonUDF0#313 = 0)
+- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#313]
+- *(1) Scan ExistingRDD[row#296L]
1条答案
按热度按时间h6my8fg21#
结果中的变化是因为你使用返回
random.randint
模块生成随机整数。每次运行的结果都进入一个新的random_result
Dataframe 。让我们看一下计划。您打印了
.explain()
,这意味着您没有查看扩展计划。您只要求了physical plan
。打印.explain(True)
将为您提供详细计划。物理计划指定sparks逻辑计划在集群上执行的方式。在df下面,有一个RDD。简单地说,pyspark代码编译成RDD。
因此,在本例中,为了筛选出您想要的行,spark扫描RDD。然后应用筛选器和项目。因此,在本例中有三个作业。由括号中的数字加上星星前缀表示。您可以通过查看spark UI查看每个作业的详细信息。
有关
filter
行为,请参阅下面的@Emma注解