Apache Spark 什么是DAG中的ExternalRDDScan?

w8ntj3qf  于 2023-10-23  发布在  Apache
关注(0)|答案(2)|浏览(132)

DAG中的ExternalRDDScan是什么意思?
整个互联网对此没有任何解释。

e5nqia27

e5nqia271#

根据源代码,ExternalRDDScan是将任意对象的现有RDD转换为InternalRow s的数据集的表示,即创建DataFrame。让我们验证我们的理解是否正确:

scala> import spark.implicits._
import spark.implicits._

scala> val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.toDF().explain()
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
gudnpqoy

gudnpqoy2#

ExternalRDD是查询执行计划中DataFrame/Dataset(并非所有情况下)的逻辑表示,即在由spark创建的DAG中。
创建外部RDD
1.当您从RDD创建DataFrame时(* 即使用DataFrame(),toDF()
1.当您从RDD创建数据集时(
即使用RDD数据集(),toDS()*)
在运行时,当ExternalRDD要加载到内存中时,会执行一个扫描操作,该操作由ExternalRDDScan表示(在内部,扫描策略被解析为ExternalRDDScanExec)。请看下面的示例:

scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> sampleRDD.toDF.queryExecution
res0: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]

可以看到,在查询执行计划中,DataFrame对象由ExternalRDD表示,物理计划包含一个 * 扫描操作,该操作在执行期间解析为ExternalRDDScan(ExternalRDDScanExec)*。
这同样适用于spark数据集。

scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> sampleRDD.toDS.queryExecution.logical
res9: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#23]
+- ExternalRDD [obj#22]

scala> spark.createDataset(sampleRDD).queryExecution.logical
res18: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#39]
+- ExternalRDD [obj#38]

上面的例子是在spark版本2.4.2中运行的
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan-ExternalRDD.html

相关问题