我的pyspark代码中有一个for循环。当我在大约5个循环上测试代码时,它工作得很好。但是当我在我的核心数据集上运行它时,导致160个循环,我的pyspark作业(在emr集群上提交)失败了。它第一次尝试第二次就失败了。
以下是spark history server中运行的作业的屏幕截图:
最初的工作 Attempt ID 1
在下午4:13运行,4小时后第二次尝试 Attempt ID 2
完成之后就失败了。当我打开作业时,我没有看到任何失败的任务或阶段。我猜这是因为for循环的大小越来越大。
下面是输出的stderr日志:它失败,状态为1
这是我的伪代码:
# Load Dataframe
df=spark.read.parquet("s3://path")
df=df.persist(StorageLevel.MEMORY_AND_DISK) # I will be using this df in the for loop
flist=list(df.select('key').distinct().toPandas()['key'])
output=[]
for i in flist:
df2=df.filter(col('key)==i))
Perform operations on df2 by each key that result in a dataframe df3
output.append(df3)
final_output = reduce(DataFrame.unionByName, output)
我认为 output
Dataframe的大小不断增加,作业最终会失败。我正在运行9个工作节点和8个vcore,每个节点有50gb的内存。
有没有办法写出 output
Dataframe到一个检查点在一组循环数之后,清除内存,然后从spark中停止的地方继续循环?
编辑:我的预期输出如下:
key mean prediction
3172742 0.0448 1
3172742 0.0419 1
3172742 0.0482 1
3172742 0.0471 1
3672767 0.0622 2
3672767 0.0551 2
3672767 0.0406 1
我可以使用groupby函数,因为我正在执行kmeans集群,它不允许groupby。所以我必须迭代每个键来执行kmeans集群。
暂无答案!
目前还没有任何答案,快来回答吧!