DAG中的ExternalRDDScan是什么意思?整个互联网对此没有任何解释。
e5nqia271#
根据源代码,ExternalRDDScan是将任意对象的现有RDD转换为InternalRow s的数据集的表示,即创建DataFrame。让我们验证我们的理解是否正确:
ExternalRDDScan
InternalRow
DataFrame
scala> import spark.implicits._ import spark.implicits._ scala> val rdd = sc.parallelize(Array(1, 2, 3, 4, 5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26 scala> rdd.toDF().explain() == Physical Plan == *(1) SerializeFromObject [input[0, int, false] AS value#2] +- Scan ExternalRDDScan[obj#1]
gudnpqoy2#
ExternalRDD是查询执行计划中DataFrame/Dataset(并非所有情况下)的逻辑表示,即在由spark创建的DAG中。创建外部RDD1.当您从RDD创建DataFrame时(* 即使用DataFrame(),toDF())1.当您从RDD创建数据集时( 即使用RDD数据集(),toDS()*)在运行时,当ExternalRDD要加载到内存中时,会执行一个扫描操作,该操作由ExternalRDDScan表示(在内部,扫描策略被解析为ExternalRDDScanExec)。请看下面的示例:
scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5)) sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> sampleRDD.toDF.queryExecution res0: org.apache.spark.sql.execution.QueryExecution = == Parsed Logical Plan == SerializeFromObject [input[0, int, false] AS value#2] +- ExternalRDD [obj#1] == Analyzed Logical Plan == value: int SerializeFromObject [input[0, int, false] AS value#2] +- ExternalRDD [obj#1] == Optimized Logical Plan == SerializeFromObject [input[0, int, false] AS value#2] +- ExternalRDD [obj#1] == Physical Plan == *(1) SerializeFromObject [input[0, int, false] AS value#2] +- Scan[obj#1]
可以看到,在查询执行计划中,DataFrame对象由ExternalRDD表示,物理计划包含一个 * 扫描操作,该操作在执行期间解析为ExternalRDDScan(ExternalRDDScanExec)*。这同样适用于spark数据集。
scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5)) sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> sampleRDD.toDS.queryExecution.logical res9: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = SerializeFromObject [input[0, int, false] AS value#23] +- ExternalRDD [obj#22] scala> spark.createDataset(sampleRDD).queryExecution.logical res18: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = SerializeFromObject [input[0, int, false] AS value#39] +- ExternalRDD [obj#38]
上面的例子是在spark版本2.4.2中运行的参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan-ExternalRDD.html
2条答案
按热度按时间e5nqia271#
根据源代码,
ExternalRDDScan
是将任意对象的现有RDD转换为InternalRow
s的数据集的表示,即创建DataFrame
。让我们验证我们的理解是否正确:gudnpqoy2#
ExternalRDD是查询执行计划中DataFrame/Dataset(并非所有情况下)的逻辑表示,即在由spark创建的DAG中。
创建外部RDD
1.当您从RDD创建DataFrame时(* 即使用DataFrame(),toDF())
1.当您从RDD创建数据集时( 即使用RDD数据集(),toDS()*)
在运行时,当ExternalRDD要加载到内存中时,会执行一个扫描操作,该操作由ExternalRDDScan表示(在内部,扫描策略被解析为ExternalRDDScanExec)。请看下面的示例:
可以看到,在查询执行计划中,DataFrame对象由ExternalRDD表示,物理计划包含一个 * 扫描操作,该操作在执行期间解析为ExternalRDDScan(ExternalRDDScanExec)*。
这同样适用于spark数据集。
上面的例子是在spark版本2.4.2中运行的
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan-ExternalRDD.html