我有一个嵌套for循环,它在内部循环中对一个Dataframe执行10次操作,并在完成内部循环后将得到的10个Dataframe连接到一个Dataframe中。
update:i use 一种字典,用于创建Dataframe列表以存储每个操作,然后在内部循环结束时合并它们。
然后用outloop的迭代次数将其写入parquet文件。outerloop有6个迭代,因此应该产生6个Parquet文件。
它是这样的:
train=0
for i in range(0,6):
train=train+30
#For loop to aggregate input and create 10 output dataframes
dfnames={}
for j in range(0,10):
ident="_"+str(j)
#Load dataframe of around 1M rows
df=spark.read.parquet("s3://path")
dfnames['df'+ident]= #Perform aggregations and operations
#Combine the 10 datframes into a single df
df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
#Write to output parquet file
df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"
在完成外循环的第三次迭代之前,它似乎工作得很好。然后出于某种原因,它用另一个尝试id重新启动循环。因此我得到了前3个文件,但它没有进入第4次迭代,而是重新启动,重新给出第一个文件。我没有得到任何失败的阶段或工作。
我试过用虚拟变量和print语句单独运行for循环(不加载大Dataframe等),它们可以很好地完成。我认为这与循环后内存被刷新的方式有关。
以下是我的emr spark运行条件:我在一个emr集群上运行它,该集群有5个执行器、5个驱动节点和10个示例,总共有50个内核。spark执行器和驱动程序内存各为45g,总计约583g。典型的随机读是250g,随机写是331g。
一些相关的spark环境变量如下所示:
在循环或内存管理方面,我有没有做错什么?任何洞察都将不胜感激!
1条答案
按热度按时间hi3rlvi21#
你怎么得到你的df1,df2。。。在这条线之前?
#Combine the 10 datframes into a single df df_out=df1.uniionByName(d2).unionByName(df3)...unionByName(df10)
我的猜测是,你的Dataframe计划越来越大,这可能会引起问题。我建议在内部循环中创建一个Dataframe列表并使用
reduce
方法来合并它们。像下面这样