pyspark(py)spark中的缓存和循环

mefy6pfw  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(694)

我知道在使用spark时,通常要避免“for”和“while”循环。我的问题是关于优化“while”循环,但是如果我遗漏了一个使其不必要的解决方案,我会洗耳恭听。
我不确定我能否用toy数据演示这个问题(处理时间很长,随着循环的进行而复杂化),但下面是一些伪代码:


### I have a function - called 'enumerator' - which involves several joins and window functions.

# I run this function on my base dataset, df0, and return df1

df1 = enumerator(df0, param1 = apple, param2 = banana)

# Check for some condition in df1, then count number of rows in the result

counter = df1 \
.filter(col('X') == some_condition) \
.count()

# If there are rows meeting this condition, start a while loop

while counter > 0:
  print('Starting with counter: ', str(counter))

  # Run the enumerator function on df1 again
  df2 = enumerator(df1, param1= apple, param2 = banana)

  # Check for the condition again, then continue the while loop if necessary
  counter = df2 \
  .filter(col('X') == some_condition) \
  .count()

  df1 = df2

# After the while loop finishes, I take the last resulting dataframe and I will do several more operations and analyses downstream

final_df = df2

枚举器函数的一个重要方面是“回顾”窗口中的序列,因此可能需要运行几次才能进行所有必要的更正。
在我心里,我知道这很难看,但是函数中的窗口/排名/顺序分析是至关重要的。我的理解是,随着循环的继续,底层的spark查询计划变得越来越复杂。在这种情况下,我应该采取哪些最佳做法?我应该在任何一点缓存-在while循环开始之前,还是在循环本身中?

slwdgvem

slwdgvem1#

您肯定应该缓存/持久化Dataframe,否则 while 循环将从零开始 df0 . 此外,您可能希望取消对已用Dataframe的持久化,以释放磁盘/内存空间。
要优化的另一点是不要做 count ,但使用更便宜的操作,如 df.take(1) . 如果没有结果 counter == 0 .

df1 = enumerator(df0, param1 = apple, param2 = banana)
df1.cache()

# Check for some condition in df1, then count number of rows in the result

counter = len(df1.filter(col('X') == some_condition).take(1))

while counter > 0:
  print('Starting with counter: ', str(counter))

  df2 = enumerator(df1, param1 = apple, param2 = banana)
  df2.cache()

  counter = len(df2.filter(col('X') == some_condition).take(1))
  df1.unpersist()    # unpersist df1 as it will be overwritten

  df1 = df2

final_df = df2

相关问题