for循环在emr中不断重启(pyspark)

q43xntqr  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(559)

我有一个嵌套for循环,它在内部循环中对一个Dataframe执行10次操作,并在完成内部循环后将得到的10个Dataframe连接到一个Dataframe中。
update:i use 一种字典,用于创建Dataframe列表以存储每个操作,然后在内部循环结束时合并它们。
然后用outloop的迭代次数将其写入parquet文件。outerloop有6个迭代,因此应该产生6个Parquet文件。
它是这样的:

train=0
for i in range(0,6):
    train=train+30
    #For loop to aggregate input and create 10 output dataframes
    dfnames={}
    for j in range(0,10):
        ident="_"+str(j)  
        #Load dataframe of around 1M rows
        df=spark.read.parquet("s3://path")
        dfnames['df'+ident]= #Perform aggregations and operations
    #Combine the 10 datframes into a single df
    df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
    #Write to output parquet file
    df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"

在完成外循环的第三次迭代之前,它似乎工作得很好。然后出于某种原因,它用另一个尝试id重新启动循环。因此我得到了前3个文件,但它没有进入第4次迭代,而是重新启动,重新给出第一个文件。我没有得到任何失败的阶段或工作。
我试过用虚拟变量和print语句单独运行for循环(不加载大Dataframe等),它们可以很好地完成。我认为这与循环后内存被刷新的方式有关。
以下是我的emr spark运行条件:我在一个emr集群上运行它,该集群有5个执行器、5个驱动节点和10个示例,总共有50个内核。spark执行器和驱动程序内存各为45g,总计约583g。典型的随机读是250g,随机写是331g。
一些相关的spark环境变量如下所示:

在循环或内存管理方面,我有没有做错什么?任何洞察都将不胜感激!

hi3rlvi2

hi3rlvi21#

你怎么得到你的df1,df2。。。在这条线之前? #Combine the 10 datframes into a single df df_out=df1.uniionByName(d2).unionByName(df3)...unionByName(df10) 我的猜测是,你的Dataframe计划越来越大,这可能会引起问题。
我建议在内部循环中创建一个Dataframe列表并使用 reduce 方法来合并它们。
像下面这样

from functools import reduce
from pyspark.sql import DataFrame
df_list = []
for j in range(0,10):  
        #Load dataframe of around 1M rows
        df = spark.read.parquet("s3://path")
        transformed_df = #do your transforms
        df_list.append(transformed_df)

final_df = reduce(DataFrame.unionByName, df_list)

相关问题