gzip使用spark在hdfs上压缩har文件

mccptt67  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(482)

我在hadoop存档中有大量的数据 .har 格式。从那以后, har 不包括任何压缩,我试图进一步gzip它在和存储在hdfs。我唯一可以毫无差错地工作的是:

harFile.coalesce(1, "true")
.saveAsTextFile("hdfs://namenode/archive/GzipOutput", classOf[org.apache.hadoop.io.compress.GzipCodec])
//`coalesce` because Gzip isn't splittable.

但是,这并不能给出正确的结果。生成了gzip文件,但输出无效(一行表示rdd类型等)
任何帮助都将不胜感激。我也愿意接受任何其他方法。
谢谢。

5jdjgkvh

5jdjgkvh1#

用于创建现有hdfs文件的压缩版本的java代码段。
用我不久前编写的java应用程序的零碎片段在一个文本编辑器中匆匆构建,因此没有经过测试;可能会出现一些拼写错误和空白。

// HDFS API
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
// native Hadoop compression libraries
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.Lz4Codec;

..............

  // Hadoop "Configuration" (and its derivatives for  HDFS, HBase etc.) constructors try to auto-magically
  //  find their config files by searching CLASSPATH for directories, and searching each dir for hard-coded  
  //  name "core-site.xml", plus "hdfs-site.xml" and/or "hbase-site.xml" etc.
  // WARNING - if these config files are not found, the "Configuration" reverts to hard-coded defaults without
  //  any warning, resulting in bizarre error messages later > let's run some explicit controls here
  Configuration cnfHadoop = new Configuration() ;
  String propDefaultFs =cnfHadoop.get("fs.defaultFS") ;
  if (propDefaultFs ==null || ! propDefaultFs.startsWith("hdfs://"))
  { throw new IllegalArgumentException(
                "HDFS configuration is missing - no proper \"core-site.xml\" found, please add\n"
               +"directory /etc/hadoop/conf/ (or custom dir with custom XML conf files) in CLASSPATH"
               ) ;
  }
/*
  // for a Kerberised cluster, either you already have a valid TGT in the default
  //  ticket cache (via "kinit"), or you have to authenticate by code
  UserGroupInformation.setConfiguration(cnfHadoop) ;
  UserGroupInformation.loginUserFromKeytab("user@REALM", "/some/path/to/user.keytab") ;

* /

  FileSystem fsCluster =FileSystem.get(cnfHadoop) ;
  Path source = new Path("/some/hdfs/path/to/XXX.har") ;
  Path target = new Path("/some/hdfs/path/to/XXX.har.gz") ;

  // alternative: "BZip2Codec" for better compression (but higher CPU cost)
  // alternative: "SnappyCodec" or "Lz4Codec" for lower compression (but much lower CPU cost)
  CompressionCodecFactory codecBootstrap = new CompressionCodecFactory(cnfHadoop) ;
  CompressionCodec codecHadoop =codecBootstrap.getCodecByClassName(GzipCodec.class.getName()) ;
  Compressor compressorHadoop =codecHadoop.createCompressor() ;

  byte[] buffer = new byte[16*1024*1024] ;
  int bufUsedCapacity ;
  InputStream  sourceStream =fsCluster.open(source) ;
  OutputStream targetStream =codecHadoop.createOutputStream(fsCluster.create(target, true), compressorHadoop) ;
  while ((bufUsedCapacity =sourceStream.read(buffer)) >0)
  { targetStream.write(buffer, 0, bufUsedCapacity) ; }
  targetStream.close() ;
  sourceStream.close() ;

..............

相关问题