我正在尝试运行一个mapreduce程序,使用作业的输出文件作为第二个作业的输入文件。我有当前代码:
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(BookAnalyzer.class);
job.setJobName("N-Gram Extraction");
FileSystem fs = FileSystem.get(conf);
FileStatus[] status_list = fs.listStatus(new Path(args[0]));
if (status_list != null) {
for (FileStatus status : status_list) {
FileInputFormat.addInputPath(job, status.getPath());
}
}
Path nGramOutput = new Path(args[1]);
FileOutputFormat.setOutputPath(job, nGramOutput);
job.setMapperClass(BookNGramMapper.class);
job.setReducerClass(BookNGramReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
if(job.waitForCompletion(true)) {
Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2);
job2.setJarByClass(BookAnalyzer.class);
job2.setJobName("Term-frequency");
FileSystem fs2 = FileSystem.get(conf2);
FileStatus[] status_list2 = fs2.listStatus(nGramOutput);
if (status_list2 != null) {
for (FileStatus status : status_list2) {
FileInputFormat.addInputPath(job2, status.getPath());
}
}
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
job2.setMapperClass(TermFreqMapper.class);
job2.setReducerClass(TermFreqReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
我得到一个错误,说输入路径(ngramoutput)不存在,但是如果我的第一个作业执行正确,那么应该创建args[1]中的文件。
所以,
args[0]=初始文件
args[1]=第一个作业的输出文件,第二个作业的输入文件
args[2]=第二个作业的输出文件
任何建议都太好了!
谢谢!
2条答案
按热度按时间okxuctiv1#
应该设置作业之间的依赖关系。
如果我记得api:
job2.adddependingjobs(job1);
rta7y2nd2#
这是做链接工作的一种方法。
试试这个
使用的路径是
所以运行作业的命令是
你不必给出三个论点