在我的spark应用程序中,我希望对循环中的Dataframe执行操作,并将结果写入hdfs。
伪代码:
var df = emptyDataframe
for n = 1 to 200000{
someDf=read(n)
df = df.mergeWith(somedf)
}
df.writetohdfs
在上面的例子中,当“mergewith”执行unionall时,我得到了很好的结果。
然而,当我在“mergewith”中执行一个(简单的)join时,作业会变得非常慢(两个执行器各有4个核心,超过1h),而且永远不会完成(作业会自动中止)。
在我的场景中,我使用仅包含~1mb文本数据的文件进行了~50次迭代。
因为合并的顺序在我的例子中很重要,所以我怀疑这是由于dag的生成,导致整个过程在我存储数据的时候运行。
现在我正在尝试在合并的Dataframe上使用.persist,但这似乎进展得相当缓慢。
编辑:
当作业运行时,我注意到(即使我进行了计数和.persist)内存中的Dataframe看起来不像静态Dataframe。它看起来像是一条连接到它所做的所有合并的路径,实际上线性地减慢了工作。
我的假设正确吗 var df
是罪魁祸首吗?
在我看来,问题的细目是:
dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....
当我期望df'acc和d是object时,我会以不同的方式激发事物,而不在乎我是否坚持或重新划分。它看起来像这样:
dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....
更新2
为了摆脱对df的持续不工作,我可以在将df转换为rdd和rdd并再次转换时“打破”血统。这有一点开销,但可以接受(作业在几分钟内完成,而不是几个小时/从不)我将对持久性进行更多的测试,并以解决方法的形式给出答案。
结果:这似乎只是表面上解决了这些问题。事实上,我回到了原点,得到了一份工作 java.lang.OutOfMemoryError: GC overhead limit exceeded
2条答案
按热度按时间eyh26e7m1#
如果您有这样的代码:
之后df将有400000个分区,这使得下面的操作效率低下(因为每个分区有1个任务)。
在分配Dataframe之前,尝试将分区的数量减少到例如200(使用例如。
df.coalesce(200).write.saveAsTable(....)
)tyky79it2#
下面是我最终使用的。它的性能足以满足我的用例,它可以工作,不需要持久化。
这在很大程度上是一种变通方法,而不是一种修复方法。
这就是我如何使钨丝服从的方法。通过从一个df到一个rdd再到后面,我最终得到的是一个真实的对象,而不是从前面到后面的整个钨生成的过程管道。
在我的代码中,我在写入磁盘之前迭代了几次(50-150次迭代似乎效果最好)。这就是我再次清除bufferarray以重新开始的地方。