我有一个关于使用sparkDataframe的基本问题。
考虑以下伪代码:
val df1 = // Lazy Read from csv and create dataframe
val df2 = // Filter df1 on some condition
val df3 = // Group by on df2 on certain columns
val df4 = // Join df3 with some other df
val subdf1 = // All records from df4 where id < 0
val subdf2 = // All records from df4 where id > 0
* Then some more operations on subdf1 and subdf2 which won't trigger spark evaluation yet*
// Write out subdf1
// Write out subdf2
假设我从主Dataframe开始 df1
(我懒洋洋地从csv中读取),在这个Dataframe上做一些操作(filter,groupby,join),然后根据一个条件(例如,id>0和id<0)分割这个Dataframe。然后我进一步对这些子Dataframe进行操作(让我们命名这些子Dataframe) subdf1, subdf2
)最后写出两个子Dataframe。
请注意 write
函数是触发spark求值的唯一命令,其余函数(filter、groupby、join)会导致延迟求值。
现在当我写出来 subdf1
,我很清楚,延迟求值开始了,所有语句都是从读取csv开始求值的,以创建df1。
当我们开始写的时候,我的问题来了 subdf2
. spark是否理解代码中的分歧 df4
并在命令执行时存储此Dataframe subdf1
遇到了什么?还是从创作的第一行开始 df1
重新评估所有中间Dataframe?如果是这样的话,是一个好主意吗 cache
Dataframe df4
(假设我有足够的记忆力)?
如果有关系的话,我用的是scala spark。任何帮助都将不胜感激。
1条答案
按热度按时间iaqfqrcu1#
不,spark不能从你的代码中推断出来。一切都会重新开始。为了证实这一点,你可以
subdf1.explain()
以及subdf2.explain()
您应该看到,这两个Dataframe都有从最开始的查询计划df1
我读过。所以你是对的,你应该
df4
以避免从df1
,如果你有足够的内存。当然,记住要通过这样做来解除持久性df4.unpersist()
如果你不再需要df4
以便进一步计算。