如何使用spark java解压存储在hdfs中的文件

kx7yvsdv  于 2021-06-01  发布在  Hadoop
关注(0)|答案(3)|浏览(484)
List<String> list= jsc.wholeTextFiles(hdfsPath).keys().collect();
        for (String string : list) {
        System.out.println(string);
        }

在这里我得到所有的zip文件。从这里我无法继续如何提取每个文件和存储到hdfs路径与相同的zipname文件夹

zzwlnbp8

zzwlnbp81#

你可以用下面的方式,但我们只需要在 zipFilesRdd.collect().forEach 在将内容写入hdfs之前。map和flat map给出了此时不可序列化的任务。

public void readWriteZipContents(String zipLoc,String hdfsBasePath){
    JavaSparkContext jsc = new JavaSparkContext(new SparkContext(new SparkConf()));
    JavaPairRDD<String, PortableDataStream> zipFilesRdd = jsc.binaryFiles(zipLoc);
    zipFilesRdd.collect().forEach(file -> {
        ZipInputStream zipStream = new ZipInputStream(file._2.open());
        ZipEntry zipEntry = null;
        Scanner sc = new Scanner(zipStream);
        try {
            while ((zipEntry = zipStream.getNextEntry()) != null) {
                String entryName = zipEntry.getName();
                if (!zipEntry.isDirectory()) {
                    //create the path in hdfs and write its contents
                   Configuration configuration = new Configuration();
                    configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
                    configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
                    FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:8020"), configuration);
                    FSDataOutputStream hdfsfile = fs.create(new Path(hdfsBasePath + "/" + entryName));
                   while(sc.hasNextLine()){
                       hdfsfile.writeBytes(sc.nextLine());
                   }
                   hdfsfile.close();
                   hdfsfile.flush();
                }
                zipStream.closeEntry();
            }
        } catch (IllegalArgumentException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        sc.close();
        //return fileNames.iterator();
    });
}
xv8emn3q

xv8emn3q2#

对于gzip文件,wholetextfiles应该自动压缩所有内容。但是,对于zip文件,我知道的唯一方法是使用二进制文件并手动解压缩数据。

sc
    .binaryFiles(hdfsDir)
    .mapValues(x=> { 
        var result = scala.collection.mutable.ArrayBuffer.empty[String]
        val zis = new ZipInputStream(x.open())
        var entry : ZipEntry = null
        while({entry = zis.getNextEntry();entry} != null) {
            val scanner = new Scanner(zis)
            while (sc.hasNextLine()) {result+=sc.nextLine()} 
        }
        zis.close()
        result
    }

这将为您提供一个(pair)rdd[string,arraybuffer[string]],其中键是hdfs上文件的名称,值是zip文件的解压缩内容(arraybuffer的每个元素一行)。如果给定的zip文件包含多个文件,则所有内容都将聚合。您可以调整代码以满足您的确切需要。例如,flatmapvalues而不是mapvalues将扁平化所有内容(rdd[string,string]),以利用spark的并行性。
还要注意,在while条件中,“{entry=is.getnextentry();在java中,entry}可以替换为(entry=is.getnextentry())。然而,在scala中,做作的结果是单位,因此这将产生一个无限循环。

wlzqhblo

wlzqhblo3#

提出这个用scala编写的解决方案。
使用spark2(版本2.3.0.cloudera2)、scala(版本2.11.8)进行测试

def extractHdfsZipFile(source_zip : String, target_folder : String,
    sparksession : SparkSession) : Boolean = {

    val hdfs_config = sparksession.sparkContext.hadoopConfiguration
    val buffer = new Array[Byte](1024)

    /*
     .collect -> run on driver only, not able to serialize hdfs Configuration
    */
    val zip_files = sparksession.sparkContext.binaryFiles(source_zip).collect.
      foreach{ zip_file: (String, PortableDataStream) =>
        // iterate over zip_files
        val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
        var zip_entry: ZipEntry = null

        try {
          // iterate over all ZipEntry from ZipInputStream
          while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
            // skip directory
            if (!zip_entry.isDirectory()) {
              println(s"Extract File: ${zip_entry.getName()}, with Size: ${zip_entry.getSize()}")
              // create new hdfs file
              val fs : FileSystem = FileSystem.get(hdfs_config)
              val hdfs_file : FSDataOutputStream = fs.create(new Path(target_folder + "/" + zip_entry.getName()))

              var len : Int = 0
              // write until zip_stream is null
              while({len = zip_stream.read(buffer); len > 0}) {
                hdfs_file.write(buffer, 0, len)
              }
              // close and flush hdfs_file
              hdfs_file.close()
              hdfs_file.flush()
            }
            zip_stream.closeEntry()
          }
          zip_stream.close()
        } catch {
          case zip : ZipException => {
            println(zip.printStackTrace)
            println("Please verify that you do not use compresstype9.")
            // for DEBUG throw exception
            //false
            throw zip
          }
          case e : Exception => {
            println(e.printStackTrace)
            // for DEBUG throw exception
            //false
            throw e
          }
        }
    }
    true
  }

相关问题