我知道在使用spark时,通常要避免“for”和“while”循环。我的问题是关于优化“while”循环,但是如果我遗漏了一个使其不必要的解决方案,我会洗耳恭听。
我不确定我能否用toy数据演示这个问题(处理时间很长,随着循环的进行而复杂化),但下面是一些伪代码:
### I have a function - called 'enumerator' - which involves several joins and window functions.
# I run this function on my base dataset, df0, and return df1
df1 = enumerator(df0, param1 = apple, param2 = banana)
# Check for some condition in df1, then count number of rows in the result
counter = df1 \
.filter(col('X') == some_condition) \
.count()
# If there are rows meeting this condition, start a while loop
while counter > 0:
print('Starting with counter: ', str(counter))
# Run the enumerator function on df1 again
df2 = enumerator(df1, param1= apple, param2 = banana)
# Check for the condition again, then continue the while loop if necessary
counter = df2 \
.filter(col('X') == some_condition) \
.count()
df1 = df2
# After the while loop finishes, I take the last resulting dataframe and I will do several more operations and analyses downstream
final_df = df2
枚举器函数的一个重要方面是“回顾”窗口中的序列,因此可能需要运行几次才能进行所有必要的更正。
在我心里,我知道这很难看,但是函数中的窗口/排名/顺序分析是至关重要的。我的理解是,随着循环的继续,底层的spark查询计划变得越来越复杂。在这种情况下,我应该采取哪些最佳做法?我应该在任何一点缓存-在while循环开始之前,还是在循环本身中?
1条答案
按热度按时间slwdgvem1#
您肯定应该缓存/持久化Dataframe,否则
while
循环将从零开始df0
. 此外,您可能希望取消对已用Dataframe的持久化,以释放磁盘/内存空间。要优化的另一点是不要做
count
,但使用更便宜的操作,如df.take(1)
. 如果没有结果counter == 0
.