我输入了这样的记录,a | 1 | y,b | 0 | n,c | 1 | n,d | 2 | y,e | 1 | y现在,在mapper中,我必须检查第三列的值。如果是“y”,则该记录必须直接写入输出文件,而不将该记录移动到reducer,否则,即“n”值记录必须移动到reducer进行进一步处理。。所以,a | 1 | y,d | 2 | y,e | 1 | y不应该转到reducer,但是b | 0 | n,c | 1 | n应该转到reducer,然后输出文件。我该怎么做??
pgky5nke1#
看看这样行不行,
public class Xxxx { public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); Random r = new Random(); FileSplit split = (FileSplit)context.getInputSplit(); String fileName = split.getPath().getName(); FSDataOutputStream out = fs.create(new Path(fileName + "-m-" + r.nextInt())); String parts[]; String line = value.toString(); String[] splits = line.split(","); for(String s : splits) { parts = s.split("\\|"); if(parts[2].equals("Y")) { out.writeBytes(line); }else { context.write(key, value); } } out.close(); fs.close(); } } public static class MyReducer extends Reducer<LongWritable, Text, LongWritable, Text> { public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text t : values) { context.write(key, t); } } } /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); conf.set("mapred.job.tracker", "localhost:9001"); Job job = new Job(conf, "Xxxx"); job.setJarByClass(Xxxx.class); Path outPath = new Path("/output_path"); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); FileInputFormat.addInputPath(job, new Path("/input.txt")); FileOutputFormat.setOutputPath(job, outPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
zpqajqem2#
在map函数中,您将逐行获得输入。使用|作为分隔符将其拆分(通过使用 String.split() 方法是准确的)它会像这样
String.split()
String[] line = value.toString().split('|');
通过访问此数组的第三个元素 line[2] 然后,使用一个简单的 if else 语句,发出具有n值的输出以供进一步处理。
line[2]
if else
ylamdve63#
您可能可以使用multipleoutputs-单击此处将“y”和“n”类型的记录从Map器中分离到两个不同的文件。接下来,为两个新生成的“y”和“n”类型的数据集运行saparate作业。对于“y”类型,将reducer的数量设置为0,这样就不用reducer了。对于“n”型,按你想要的方式使用异径管。希望这有帮助。
3条答案
按热度按时间pgky5nke1#
看看这样行不行,
zpqajqem2#
在map函数中,您将逐行获得输入。使用|作为分隔符将其拆分(通过使用
String.split()
方法是准确的)它会像这样通过访问此数组的第三个元素
line[2]
然后,使用一个简单的if else
语句,发出具有n值的输出以供进一步处理。ylamdve63#
您可能可以使用multipleoutputs-单击此处将“y”和“n”类型的记录从Map器中分离到两个不同的文件。
接下来,为两个新生成的“y”和“n”类型的数据集运行saparate作业。对于“y”类型,将reducer的数量设置为0,这样就不用reducer了。对于“n”型,按你想要的方式使用异径管。
希望这有帮助。