Spark+Scala:以有效的方式合并多个 Dataframe

vlurs2pr  于 2023-08-05  发布在  Scala
关注(0)|答案(1)|浏览(129)

我正在处理数千个NetCDF文件,以创建一个主 Dataframe ,并在其上进行进一步的计算。早些时候,我可以在Spark 2.x中使用SciSpark快速完成这项工作,但在Spark 3.x中似乎不再工作了。所以我现在使用了通用的NetCDF-javareader。我不得不重新实现,如果可能的话,我也想提高性能。
在早期版本中,通过SciSpark,我能够以一种需要太多空间的方式创建主 Dataframe ,因为它将所有输入文件中的所有内容加载到内存中。现在我写了一个更有效的函数,因为它为每个文件创建一个 Dataframe ,只保留最小的内容,而不是所有的数据。这样的基于文件的 Dataframe 被合并以形成主 Dataframe 。然而,在这里我面临着一个问题,因为进程不断崩溃,而且在运行时似乎也占用了更少的CPU。这里需要关注的基本代码是:

def fileListToDataframe(flist: String): DataFrame = {
    val ipFiles = scala.io.Source.fromFile(flist).mkString.split("\n")

    val dataFrames = ipFiles.par.map {x => fileToDF(x)}  // frac. of data is stored per file
    val mergedDataFrame = dataFrames.par.reduce(_ union _)

    return mergedDataFrame
}

字符串
样本误差:

ERROR TaskSchedulerImpl: Lost executor 60 on 122.xx.yy.zz: Command exited with code 0
INFO task 15.0 in stage 7718.0 (TID 132079) failed because while it was being computed, its executor exited for a reason unrelated to the task. Not counting this failure towards the maximum number of failures for the task.
WARN Dispatcher: Message RequestMessage(server:33867, NettyRpcEndpointRef(spark://CoarseGrainedScheduler@server:33867), ReviveOffers) dropped due to sparkEnv is stopped. Could not find CoarseGrainedScheduler.


上面的代码可以处理500个文件,但是当输入10K个文件时,这个过程似乎并不完整。主 Dataframe 只需要几GB的RAM(而系统有2TiB)。有没有办法让上面的计算更好?

b1zrtrql

b1zrtrql1#

我最终通过以下方式解决了这个问题:
1.创建一个列中包含文件名的 Dataframe
1.使用case类转换为数据集,并使用包含文件阅读的方法。
这使我能够得到基于Spark的解决方案;然而,执行器的数量也影响性能。奇怪的是,使用1个executor和所有可用的内核,几乎可以得到100%的CPU使用率(尽管比最佳的executor数量要慢一点)。

相关问题