解压缩hadoop hdfs目录中的所有gzip文件

hwamh0ep  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(465)

在我的hdfs上,我有一堆gzip文件,我想把它们解压成正常格式。有没有一个api来做这个?或者我怎样才能写一个函数来完成这个任务?
我不想使用任何命令行工具;相反,我想通过编写java代码来完成这个任务。

igsr9ssn

igsr9ssn1#

你需要一个 CompressionCodec 解压文件。gzip的实现是 GzipCodec . 你得到一个 CompressedInputStream 通过编解码器和简单的io输出结果。比如说:假设你有一个文件 file.gz ```
//path of file
String uri = "/uri/to/file.gz";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);

CompressionCodecFactory factory = new CompressionCodecFactory(conf);
// the correct codec will be discovered by the extension of the file
CompressionCodec codec = factory.getCodec(inputPath);

if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}

// remove the .gz extension
String outputUri =
CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());

InputStream is = codec.createInputStream(fs.open(inputPath));
OutputStream out = fs.create(new Path(outputUri));
IOUtils.copyBytes(is, out, conf);

// close streams

更新
如果您需要获取目录中的所有文件,那么 `FileStatus` 就像

FileSystem fs = FileSystem.get(new Configuration());
FileStatus[] statuses = fs.listStatus(new Path("hdfs/path/to/dir"));

那就循环一下

for (FileStatus status: statuses) {
CompressionCodec codec = factory.getCodec(status.getPath());
...
InputStream is = codec.createInputStream(fs.open(status.getPath());
...
}

r8xiu3jd

r8xiu3jd2#

我使用一个身份Maphadoop作业来改变压缩/分割大小等。

class IdentityMap(args: Args) extends ConfiguredJob(args) {
  CombineFileMultipleTextLine(args.list("in"): _*).read.mapTo[String, String]('line -> 'line)(identity)
  .write(if (args.boolean("compress")) TsvCompressed(args("out")) else TextLine(args("out")))
}

常规配置抽象类:

abstract class ConfiguredJob(args: Args) extends Job(args) {
  override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {
    val Megabyte = 1024 * 1024
    val conf = super.config(mode)
    val splitSizeMax = args.getOrElse("splitSizeMax", "1024").toInt * Megabyte
    val splitSizeMin = args.getOrElse("splitSizeMin", "512").toInt * Megabyte
    val jobPriority = args.getOrElse("jobPriority","NORMAL")
    val maxHeap = args.getOrElse("maxHeap","512m")
    conf ++ Map("mapred.child.java.opts" -> ("-Xmx" + maxHeap),
      "mapred.output.compress" -> (if (args.boolean("compress")) "true" else "false"),
      "mapred.min.split.size" -> splitSizeMin.toString,
      "mapred.max.split.size" -> splitSizeMax.toString,
//      "mapred.output.compression.codec" -> args.getOrElse("codec", "org.apache.hadoop.io.compress.BZip2Codec"), //Does not work, has to be -D flag
      "mapred.job.priority" -> jobPriority)
  }
}

相关问题