我尝试使用以下方法获取Dataframe的分区数:
df.rdd.getNumPartitions.toString
但当我监视spark日志时,我发现它会旋转很多个阶段,这是一个代价高昂的操作。据我所知,dataframe通过元数据向rdd添加了一个结构层。那么,为什么在转换为rdd的过程中要花费这么多时间呢?
kpbwa7wx1#
Dataframe是一个优化的分布式表格集合。因为它保持了一种表格格式(类似于sql表),所以它可以保留元数据,以便在后台执行一些优化。这种优化是由诸如催化剂和钨等辅助项目执行的rdd不包含任何模式,如果需要,您需要提供一个模式。所以rdd没有Dataframe那么优化(catalyst根本不涉及)将Dataframe转换为rdd强制spark循环所有元素,将它们从高度优化的catalyst空间转换为scala空间。从中检查代码 .rdd ```lazy val rdd: RDD[T] = {val objectType = exprEnc.deserializer.dataTyperddQueryExecution.toRdd.mapPartitions { rows =>rows.map(_.get(0, objectType).asInstanceOf[T])}}
.rdd
@transient private lazy val rddQueryExecution: QueryExecution = {val deserialized = CatalystSerde.deserializeTsparkSession.sessionState.executePlan(deserialized)}
因此,首先,它执行计划并将输出作为 `RDD[InternalRow]` 顾名思义,只供内部使用,需要转换成 `RDD[Row]` 然后它在所有行上循环转换它们。如您所见,它不仅仅是删除模式 希望这能回答你的问题。
1条答案
按热度按时间kpbwa7wx1#
Dataframe是一个优化的分布式表格集合。因为它保持了一种表格格式(类似于sql表),所以它可以保留元数据,以便在后台执行一些优化。
这种优化是由诸如催化剂和钨等辅助项目执行的
rdd不包含任何模式,如果需要,您需要提供一个模式。所以rdd没有Dataframe那么优化(catalyst根本不涉及)
将Dataframe转换为rdd强制spark循环所有元素,将它们从高度优化的catalyst空间转换为scala空间。
从中检查代码
.rdd
```lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
rddQueryExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
@transient private lazy val rddQueryExecution: QueryExecution = {
val deserialized = CatalystSerde.deserializeT
sparkSession.sessionState.executePlan(deserialized)
}