sc
.binaryFiles(hdfsDir)
.mapValues(x=> {
var result = scala.collection.mutable.ArrayBuffer.empty[String]
val zis = new ZipInputStream(x.open())
var entry : ZipEntry = null
while({entry = zis.getNextEntry();entry} != null) {
val scanner = new Scanner(zis)
while (sc.hasNextLine()) {result+=sc.nextLine()}
}
zis.close()
result
}
def extractHdfsZipFile(source_zip : String, target_folder : String,
sparksession : SparkSession) : Boolean = {
val hdfs_config = sparksession.sparkContext.hadoopConfiguration
val buffer = new Array[Byte](1024)
/*
.collect -> run on driver only, not able to serialize hdfs Configuration
*/
val zip_files = sparksession.sparkContext.binaryFiles(source_zip).collect.
foreach{ zip_file: (String, PortableDataStream) =>
// iterate over zip_files
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry: ZipEntry = null
try {
// iterate over all ZipEntry from ZipInputStream
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
// skip directory
if (!zip_entry.isDirectory()) {
println(s"Extract File: ${zip_entry.getName()}, with Size: ${zip_entry.getSize()}")
// create new hdfs file
val fs : FileSystem = FileSystem.get(hdfs_config)
val hdfs_file : FSDataOutputStream = fs.create(new Path(target_folder + "/" + zip_entry.getName()))
var len : Int = 0
// write until zip_stream is null
while({len = zip_stream.read(buffer); len > 0}) {
hdfs_file.write(buffer, 0, len)
}
// close and flush hdfs_file
hdfs_file.close()
hdfs_file.flush()
}
zip_stream.closeEntry()
}
zip_stream.close()
} catch {
case zip : ZipException => {
println(zip.printStackTrace)
println("Please verify that you do not use compresstype9.")
// for DEBUG throw exception
//false
throw zip
}
case e : Exception => {
println(e.printStackTrace)
// for DEBUG throw exception
//false
throw e
}
}
}
true
}
3条答案
按热度按时间zzwlnbp81#
你可以用下面的方式,但我们只需要在
zipFilesRdd.collect().forEach
在将内容写入hdfs之前。map和flat map给出了此时不可序列化的任务。xv8emn3q2#
对于gzip文件,wholetextfiles应该自动压缩所有内容。但是,对于zip文件,我知道的唯一方法是使用二进制文件并手动解压缩数据。
这将为您提供一个(pair)rdd[string,arraybuffer[string]],其中键是hdfs上文件的名称,值是zip文件的解压缩内容(arraybuffer的每个元素一行)。如果给定的zip文件包含多个文件,则所有内容都将聚合。您可以调整代码以满足您的确切需要。例如,flatmapvalues而不是mapvalues将扁平化所有内容(rdd[string,string]),以利用spark的并行性。
还要注意,在while条件中,“{entry=is.getnextentry();在java中,entry}可以替换为(entry=is.getnextentry())。然而,在scala中,做作的结果是单位,因此这将产生一个无限循环。
wlzqhblo3#
提出这个用scala编写的解决方案。
使用spark2(版本2.3.0.cloudera2)、scala(版本2.11.8)进行测试