我有很多压缩文件(以gbs为单位)在zip格式,并想写Map只解压他们的工作。我的mapper类看起来像
import java.util.zip.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.*;
public class DecompressMapper extends Mapper <LongWritable, Text, LongWritable, Text>
{
private static final int BUFFER_SIZE = 4096;
public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Context context) throws IOException
{
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
this.unzip(fileName, new File(fileName).getParent() + File.separator + "/test_poc");
}
public void unzip(String zipFilePath, String destDirectory) throws IOException {
File destDir = new File(destDirectory);
if (!destDir.exists()) {
destDir.mkdir();
}
ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath));
ZipEntry entry = zipIn.getNextEntry();
// iterates over entries in the zip file
while (entry != null) {
String filePath = destDirectory + File.separator + entry.getName();
if (!entry.isDirectory()) {
// if the entry is a file, extracts it
extractFile(zipIn, filePath);
} else {
// if the entry is a directory, make the directory
File dir = new File(filePath);
dir.mkdir();
}
zipIn.closeEntry();
entry = zipIn.getNextEntry();
}
zipIn.close();
}
private void extractFile(ZipInputStream zipIn, String filePath) throws IOException {
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath));
byte[] bytesIn = new byte[BUFFER_SIZE];
int read = 0;
while ((read = zipIn.read(bytesIn)) != -1) {
bos.write(bytesIn, 0, read);
}
bos.close();
}
}
还有我的驾驶课
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DecompressJob extends Configured implements Tool{
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new DecompressJob(),args);
System.exit(res);
}
public int run(String[] args) throws Exception
{
Job conf = Job.getInstance(getConf());
conf.setJobName("MapperOnly");
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(DecompressMapper.class);
conf.setNumReduceTasks(0);
Path inp = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(conf, inp);
FileOutputFormat.setOutputPath(conf, out);
return conf.waitForCompletion(true) ? 0: 1;
}
}
看来我的Map绘制类工作不好。我没有得到所需目录中的解压缩文件。感谢您的帮助。谢谢。。。
2条答案
按热度按时间6gpjuf901#
好吧,在hadoop文件系统中没有具体的解压方法,但是经过长时间的研究,我已经找到了如何在hadoop文件系统中直接解压的方法。前提是你需要在某个位置复制zip文件,然后运行mapreduce作业。很明显,hadoop不理解zipfile输入格式,因此我们需要定制Map器和reducer,以便控制Map器发出和reducer消耗的内容。请注意,这个mapreduce将在单个Map器上运行,因为在自定义hadoop提供的record reader类时,我们禁用了split方法,即使其为false。因此mapreduce将文件名作为键,未压缩文件的内容作为值。当reducer使用它时,我将output outputkey设置为null,这样reducer只保留解压的内容,并且reducer的数量设置为1,这样所有转储都在一个部件文件中。
我们都知道hadoop不能单独处理zip文件,但是java可以借助它自己的zipfile类来处理,zipfile类可以通过zipinputstrem读取zip文件内容,通过zipentry读取zip条目,所以我们编写了一个自定义的zipinputformat类来扩展fileinputformat。
请注意,recordreader类返回zipfileRecordReader,这是我们刚才讨论的hadoop recordreader类的自定义版本
为了方便起见,我在源代码中给出了一些注解,这样您就可以很容易地理解如何使用缓冲区内存读写文件。现在让我们为上述类编写Map器类
让我们快速编写相同的减速机
让我们为mapper和reducer快速配置作业
请注意,在作业类中,我们已将inputformatclass配置为zipfileinputformat类,outputformatclass配置为textoutputformat类。
mavenize项目,让依赖项保持原样来运行代码。导出jar文件并将其部署到hadoop集群上。在cdh5.5Yarn上测试和部署。pom文件的内容如下
np8igboo2#
上面的代码没有什么问题
我正在使用mr1api和mr2api。千万别那么做。
使用java io函数。hadoop无法识别其文件系统中的javaio函数。
路径生成不正确。
我们在编写map reduce程序时需要小心,因为hadoop使用完全不同的文件系统,我们在编写代码时需要考虑到这一点,并且永远不要混合使用mr1和mr2api。