我还没有找到一个明确的答案这个问题,即使有多个类似的问题,所以。
我没有填写下面代码的所有细节,因为实际的转换对于我的问题并不重要。
// Adding _corrupt_record to have records that are not valid json
val inputDf = spark.read.schema(someSchema.add("_corrupt_record", StringType)).json(path)
/**
* The following lazy-persists the DF and does not return a new DF. Since
* Spark>=2.3 the queries from raw JSON/CSV files are disallowed when the
* referenced columns only include the internal corrupt record column
* (named _corrupt_record by default). Caching is the workaround.
*/
inputDf.persist
val uncorruptedDf = inputDf.filter($"_corrupt_record".isNull)
val corruptedDf = inputDf.filter($"_corrupt_record".isNotNull)
// Doing a count on both derived DFs - corruptedDf will also be output for further investigation
log.info("Not corrupted records: " + uncorruptedDf.count)
log.info("Corrupted records: " + corruptedDf.count)
corruptedDf.write.json(corruptedOutputPath)
// Not corrupted data will be used for some complicated transformations
val finalDf = uncorruptedDf.grouby(...).agg(...)
log.info("Finally chosen records: " + finalDf.count)
finalDf.write.json(outputPath)
如您所见,我标记了输入Dataframe inputDf
为了坚持(请看这里的原因),但从来没有做过 count
在上面。然后我导出了两个Dataframe,对这两个Dataframe我都做了一个 count
.
问题1:我什么时候做 uncorruptedDf.count
,它对父Dataframe做了什么 inputdf
? 它是否会触发整个系统的缓存 inputDf
,对应于 uncorruptedDf.count
,还是什么都没有?rdd文件说明:
当您持久化rdd时,每个节点将其计算的任何分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他操作中重用这些分区。
问题2:在这一点上(在这两个问题之前)有意义吗 count
)至 persist
派生的Dataframe corruptedDf
以及 uncorruptedDf
不持久 inputDf
? 因为在每个派生的Dataframe上有两个动作,所以我想说是的,但我不确定。如果是的话。。下面的父df在什么位置取消持久化(a) 、(b)或(c)?
uncorruptedDf.persist
corruptedDf.persist
// (A) I don't think I should inputDf.unpersist here, since derived DFs are not yet persisted
log.info("Not corrupted records: " + uncorruptedDf.count)
log.info("Corrupted records: " + corruptedDf.count)
// (B) This seems a reasonable place, to free some memory
val finalDf = uncorruptedDf.grouby(...).agg(...)
log.info("Finally chosen records: " + finalDf.count)
finalDf.write.json(outputPath)
// (C) Is there any value from unpersisting here?
问题3:与前一个问题相同,但适用于 finalDf
与 corruptedDf
. 可以看出,我在屏幕上执行了两个动作 finalDf
: count
以及 write
.
提前谢谢!
1条答案
按热度按时间nr9pn0ug1#
对于问题1:是的,当调用第一个未损坏的count.count()时,它将持久化inputdf,但不会持久化对inputdf所做的任何转换。下次计数时,它不会从json文件中读取数据,而是从缓存的分区中读取数据。
对于问题2:我认为你不应该坚持输入,因为你什么也得不到。坚持腐败和未腐败的是有意义的,因为你正在执行多个行动。您只是在inputdf上执行转换以过滤损坏和未损坏的记录,spark非常聪明,可以在物理规划阶段将其作为一个步骤进行组合。因此,您不应该坚持inputdf,这样您就不必担心取消持久化。
对于问题3:您不应该持久化最终的Dataframe,因为您只对它执行一个操作,即将它作为json文件写入物理路径。
ps:不要尝试缓存/持久化每个Dataframe,因为缓存本身会影响性能,并且必须根据指定的存储级别执行额外的工作以将数据保留在内存中或保存到磁盘。如果转换较少且不复杂,则最好避免缓存。您可以在dataframe上使用explain命令来查看物理和逻辑计划。