递归Dataframe操作

7jmck4yq  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(403)

在我的spark应用程序中,我希望对循环中的Dataframe执行操作,并将结果写入hdfs。
伪代码:

var df = emptyDataframe
for n = 1 to 200000{
  someDf=read(n)
  df = df.mergeWith(somedf)
}
df.writetohdfs

在上面的例子中,当“mergewith”执行unionall时,我得到了很好的结果。
然而,当我在“mergewith”中执行一个(简单的)join时,作业会变得非常慢(两个执行器各有4个核心,超过1h),而且永远不会完成(作业会自动中止)。
在我的场景中,我使用仅包含~1mb文本数据的文件进行了~50次迭代。
因为合并的顺序在我的例子中很重要,所以我怀疑这是由于dag的生成,导致整个过程在我存储数据的时候运行。
现在我正在尝试在合并的Dataframe上使用.persist,但这似乎进展得相当缓慢。
编辑:
当作业运行时,我注意到(即使我进行了计数和.persist)内存中的Dataframe看起来不像静态Dataframe。它看起来像是一条连接到它所做的所有合并的路径,实际上线性地减慢了工作。
我的假设正确吗 var df 是罪魁祸首吗?

在我看来,问题的细目是:

dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....

当我期望df'acc和d是object时,我会以不同的方式激发事物,而不在乎我是否坚持或重新划分。它看起来像这样:

dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....

更新2
为了摆脱对df的持续不工作,我可以在将df转换为rdd和rdd并再次转换时“打破”血统。这有一点开销,但可以接受(作业在几分钟内完成,而不是几个小时/从不)我将对持久性进行更多的测试,并以解决方法的形式给出答案。
结果:这似乎只是表面上解决了这些问题。事实上,我回到了原点,得到了一份工作 java.lang.OutOfMemoryError: GC overhead limit exceeded

eyh26e7m

eyh26e7m1#

如果您有这样的代码:

var df = sc.parallelize(Seq(1)).toDF()

for(i<- 1 to 200000) {
  val df_add = sc.parallelize(Seq(i)).toDF()
  df = df.unionAll(df_add)
}

之后df将有400000个分区,这使得下面的操作效率低下(因为每个分区有1个任务)。
在分配Dataframe之前,尝试将分区的数量减少到例如200(使用例如。 df.coalesce(200).write.saveAsTable(....) )

tyky79it

tyky79it2#

下面是我最终使用的。它的性能足以满足我的用例,它可以工作,不需要持久化。
这在很大程度上是一种变通方法,而不是一种修复方法。

val mutableBufferArray = ArrayBuffer[DataFrame]()
mutableBufferArray.append(hiveContext.emptyDataframe())

for loop {

              val interm = mergeDataFrame(df, mutableBufferArray.last)
              val intermSchema = interm.schema
              val intermRDD = interm.rdd.repartition(8)

              mutableBufferArray.append(hiveContext.createDataFrame(intermRDD, intermSchema))
              mutableBufferArray.remove(0)

}

这就是我如何使钨丝服从的方法。通过从一个df到一个rdd再到后面,我最终得到的是一个真实的对象,而不是从前面到后面的整个钨生成的过程管道。
在我的代码中,我在写入磁盘之前迭代了几次(50-150次迭代似乎效果最好)。这就是我再次清除bufferarray以重新开始的地方。

相关问题