我目前有一个.txt.gz
文件列表,这些文件本质上是.tsv
(我无法控制源文件格式,它们都> 10 GB),我基本上需要以parquet格式读入、解压缩并写出到目标文件路径,以便稍后处理。
每个文件的性质都不一样,不能合并,所以不能传递完整路径。
现在,我有一个简单的UDF,它对任何文件名做一些预处理,读入它,然后尝试将它写回目录-我将这个UDF和一个文件名列表传递给.map
函数,希望它能分布在执行器上......一个简单的例子是,如果我有4个文件和4个执行器,每个执行器都有3个内核,那么每个执行器都会得到自己的文件,它会解压缩/读入该文件(理想情况下为3个内核中的每个内核解压到3个分区上),然后将其写回parquet格式
// Find a file, unzip it, partition and write to parquet with no filename extension
def unzip_and_write_delta(fileName:String, outFileDir:String) : Int = {
val inFileDir:String = if (domino_source.contains(fileName)) mounted_source_filepath1 else mounted_source_filepath2
val inFullPath:String = inFileDir.concat(fileName)
val fileOutputName = fileName.replace(".txt.gz", "") //.replace(".rar", "")
val outFullPath:String = outFileDir.concat(fileOutputName)
val returned = spark.read.option("sep", "\t").option("header", "true").option("numPartitions", 3).csv(inFullPath).write.parquet(outFullPath)
// val returned = sc.textFile(inFullPath).map(_.split("\t")).write.parquet(outFullPath)
return 0
}
然后将此函数馈送到map函数:
val out_val = full_filelist.map(
f => unzip_and_write_delta(fileName=f, outFileDir=mounted_target_filepath)
)
sc.textFile命令昨天似乎还能工作,但结果充满了糟糕的编码,所以我今天将其切换到spark.read.option
,现在它们似乎都没有并行做任何事情......每个文件都被顺序加载到单个执行器上的单个分区上,整个文件被读取,然后被写入,最终绕过了任何效率增益。
Executor Panel
我已经尝试了spark.read和sc.textFile命令的多种组合,包括有和没有repartition
命令,都设置了numPartitions
选项,有和没有list.map()
函数,似乎没有任何东西可以并行工作
至少我希望每个执行器都能处理它自己的文件,并且作为一个额外的好处,能够跨内核读取和处理(无法找到csv的很多东西,但似乎是textFile的选项......尽管编码工作得不好)
1条答案
按热度按时间yzxexxkh1#
通过将
.par
添加到map函数中,至少可以并行运行这导致了这个执行者小组:Executor Panel