我有一个mapreduce程序如下
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
public class Sample {
public static class SampleMapper extends MapReduceBase implements
Mapper<Text, Text, Text, Text> {
private Text word = new Text();
@Override
public void map(Text key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString(),",");
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(key, word);
}
}
}
public static class SampleReducer extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
private Text result = new Text();
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
StringBuffer aggregation = new StringBuffer();
while (values.hasNext()) {
aggregation.append("|" + values.next().toString());
}
result.set(aggregation.toString());
output.collect(key, result);
}
}
public static void main(String args[]) throws IOException {
JobConf conf = new JobConf(Sample.class);
conf.setJobName("Sample");
conf.setMapperClass(SampleMapper.class);
conf.setReducerClass(SampleReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
我已经做了jar,我一直在努力得到输出。但是正在创建的输出文件是空的。
我正在使用以下命令运行作业
hadoop jar mapreduce.jar Sample /tmp/input tmp/output
jar是我打包的jar,我的输入文件如下
1 a,b,c
2 e,f
1 x,y,z
2 g
预期产量
1 a|b|c|x|y|z
2 e|f|g
2条答案
按热度按时间moiiocjp1#
我猜,既然您使用keyvaluetextinputformat作为输入格式,那么它找不到分隔符字节,因此使用整行值作为键(值为“”)。这意味着您在mapper中的迭代不会经过任何循环,也不会写出任何内容。在配置中使用属性名mapreduce.input.keyvaluelinerecordreader.key.value.separator将“”保留为分隔符字节。
wz3gfoph2#
尝试将配置对象传递给jobconf,我猜您的jobconf无法获得hadoop/hdfs配置。