我有很多数据要分块取出,比如说3块,而不是一次全部缓存在内存中。但是,我想在以后的同一时间保存它(动作)。
这是当前的简化策略:
for query in [query1,query2,query3]:
df = spark.sql(query)
df.cache()
df1 = df.filter('a')
df2 = df.filter('b')
final_output_1 = final_output_1.join(df1)
final_output_2 = final_output_2.join(df2)
df.unpersist()
final_output_1.write.saveAsTable()
final_output_2.write.saveAsTable()
所以第一个问题:你愿意吗 unpersist()
因为还没有行动,所以不能在这里工作 df
?
第二个问题:你是怎么做的 df.cache()
在这里工作,当我重复使用 df
for循环中的变量?我知道它是不可变的,所以它可以复制,但是 unpersist()
真的清除了记忆?
1条答案
按热度按时间anhgbhbe1#
当你想一次又一次地重复使用一个Dataframe时,spark中会用到缓存,
例如:Map表
一旦缓存了df,就需要一个动作操作来物理地将数据移动到内存中,因为spark是基于延迟执行的。
对你来说
不会按预期工作,因为在此之后您没有执行操作。
要使缓存工作,您需要运行df.count()或df.show()或任何其他操作,以便将数据移动到内存中,否则您的数据将不会移动到内存中,您将得不到任何好处。因此df.unpersist()也是多余的。
第一个问题:
不,您的df.cache()和df.unpersist()将不起作用,因为没有缓存任何数据,因此它们对unpersist没有任何意义。
第二个问题:
是的,您可以使用相同的变量名,如果执行了操作,数据将被缓存,并且在操作之后df.unpersist()将取消每个循环中的数据持久化。因此上一个df与下一个循环中的下一个df没有连接。正如您所说的,它们是不可变的,并且由于您将新查询分配给每个循环中的同一个变量,因此它将充当一个新的df(与以前的df无关)。
基于你的代码,我认为你不需要做缓存,因为你只执行一个操作。
请参阅何时缓存Dataframe?如果我缓存一个sparkDataframe,然后覆盖引用,原始Dataframe还会被缓存吗?