有以下技巧可以修剪Apache Spark Dataframe 谱系,特别是对于迭代计算:
def getCachedDataFrame(df: DataFrame): DataFrame = {
val rdd = df.rdd.cache()
df.sqlContext.createDataFrame(rdd, df.schema)
}
这看起来像是某种纯粹的魔术,但是现在我想知道为什么我们需要在RDD上调用cache()
方法?在这个沿袭修整逻辑中使用缓存的目的是什么?
有以下技巧可以修剪Apache Spark Dataframe 谱系,特别是对于迭代计算:
def getCachedDataFrame(df: DataFrame): DataFrame = {
val rdd = df.rdd.cache()
df.sqlContext.createDataFrame(rdd, df.schema)
}
这看起来像是某种纯粹的魔术,但是现在我想知道为什么我们需要在RDD上调用cache()
方法?在这个沿袭修整逻辑中使用缓存的目的是什么?
1条答案
按热度按时间2hh7jdfx1#
要理解缓存的用途,了解不同类型的RDD操作会有所帮助:转换和操作。来自文档:
RDD支持两种类型的操作:转换,从现有数据集创建新数据集;操作,在数据集上运行计算后将值返回到驱动程序。
还要考虑这一点:
Spark中的所有转换都是惰性的,因为它们不会立即计算结果,而是只记住应用于某个基本数据集的转换(例如文件)。只有当动作需要返回结果给驱动程序时,才会计算转换。这种设计使Spark能够更有效地运行。例如,我们可以认识到,通过Map创建的数据集将在Reduce中使用,并且仅将Reduce的结果而不是较大的Map数据集返回给驱动程序。
所以Spark的转换(比如
map
)都是惰性的,因为这有助于Spark在创建查询计划时更明智地考虑需要进行哪些计算。这与缓存有什么关系?
请看下面的代码:
在这里,我们读入一些文件,应用一些转换,并对同一 Dataframe 应用2个操作:
cleansedDF.write.parquet
和cleansedDF.count
。正如代码中的注解所解释的那样,如果我们像这样运行代码,我们实际上将计算两次这些转换,因为转换是惰性的,只有当操作需要执行它们时,它们才会执行。
如何防止这种重复计算?使用缓存:我们可以告诉Spark保留一些转换的"保存"结果,这样它们就不必被计算多次。这可以是在磁盘/内存/...
有了这些知识,我们的代码可能是这样的:
我调整了这个代码块中的注解,以突出显示与上一个代码块的不同之处。
注意
.persist
也存在,使用.cache
时使用默认的存储级别,使用.persist
时可以指定存储级别,this SO answer对此做了很好的解释。希望这有帮助!