如何解释pyspark .explain()以及pyspark如何对操作排序,

oxiaedzo  于 2022-11-25  发布在  Apache
关注(0)|答案(1)|浏览(130)

我在使用一些pyspark代码的时候发现了一些我不理解或不期望的行为。我在另一列中编写了一些生成随机数据(二进制数)的代码,并对随机生成的数据进行了过滤。但是,当我多次执行.show()时,我经常看到不满足我的条件的数据。似乎我的随机生成数据函数被调用了两次;一次在过滤器之前,一次在过滤器之后。我尝试使用.explain(),但我不明白如何读取或解释它。我还不清楚应该按照什么顺序读取.explain()的输出。
有人能帮我理解一下,用.explain(),我做的简单代码,它的工作方式和我预期的非常不同吗?

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, udf
import random
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

_random_udf = udf(lambda x: int(random.randint(0, 1)), IntegerType())

inputDf = spark.createDataFrame([{'row': i} for i in range(10)])
random_result = inputDf.withColumn("status", _random_udf(col("row")))
non_zero_filter = random_result.filter(col('status') != 0)

non_zero_filter.show()

使用.show()的输出示例。我不希望在status列中看到包含0s的行。
| 横列|状态|
| - -|- -|
| 2个|一个|
| 三个|第0页|
| 五个|第0页|
| 六个|一个|
| 七个|第0页|
explain()的输出如下

non_zero_filter.explain()

== Physical Plan ==
== Physical Plan ==
*(3) Project [row#296L, pythonUDF0#314 AS status#299]
+- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#314]
   +- *(2) Project [row#296L]
      +- *(2) Filter NOT (pythonUDF0#313 = 0)
         +- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#313]
            +- *(1) Scan ExistingRDD[row#296L]
h6my8fg2

h6my8fg21#

结果中的变化是因为你使用返回random.randint模块生成随机整数。每次运行的结果都进入一个新的random_result Dataframe 。
让我们看一下计划。您打印了.explain(),这意味着您没有查看扩展计划。您只要求了physical plan。打印.explain(True)将为您提供详细计划。
物理计划指定sparks逻辑计划在集群上执行的方式。在df下面,有一个RDD。简单地说,pyspark代码编译成RDD。
因此,在本例中,为了筛选出您想要的行,spark扫描RDD。然后应用筛选器和项目。因此,在本例中有三个作业。由括号中的数字加上星星前缀表示。您可以通过查看spark UI查看每个作业的详细信息。
有关filter行为,请参阅下面的@Emma注解

相关问题