for循环导致Dataframe大小增加和作业失败

klh5stk1  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(181)

我的pyspark代码中有一个for循环。当我在大约5个循环上测试代码时,它工作得很好。但是当我在我的核心数据集上运行它时,导致160个循环,我的pyspark作业(在emr集群上提交)失败了。它第一次尝试第二次就失败了。
以下是spark history server中运行的作业的屏幕截图:

最初的工作 Attempt ID 1 在下午4:13运行,4小时后第二次尝试 Attempt ID 2 完成之后就失败了。当我打开作业时,我没有看到任何失败的任务或阶段。我猜这是因为for循环的大小越来越大。
下面是输出的stderr日志:它失败,状态为1

这是我的伪代码:


# Load Dataframe

df=spark.read.parquet("s3://path")
df=df.persist(StorageLevel.MEMORY_AND_DISK) # I will be using this df in the for loop
flist=list(df.select('key').distinct().toPandas()['key'])
output=[]

for i in flist:
    df2=df.filter(col('key)==i))
    Perform operations on df2 by each key that result in a dataframe df3
    output.append(df3)

final_output = reduce(DataFrame.unionByName, output)

我认为 output Dataframe的大小不断增加,作业最终会失败。我正在运行9个工作节点和8个vcore,每个节点有50gb的内存。
有没有办法写出 output Dataframe到一个检查点在一组循环数之后,清除内存,然后从spark中停止的地方继续循环?
编辑:我的预期输出如下:

key        mean   prediction
3172742   0.0448    1
3172742   0.0419    1
3172742   0.0482    1
3172742   0.0471    1
3672767   0.0622    2
3672767   0.0551    2
3672767   0.0406    1

我可以使用groupby函数,因为我正在执行kmeans集群,它不允许groupby。所以我必须迭代每个键来执行kmeans集群。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题