将值写入文件而不移动到reducer

zwghvu4y  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(321)

我输入了这样的记录,a | 1 | y,b | 0 | n,c | 1 | n,d | 2 | y,e | 1 | y
现在,在mapper中,我必须检查第三列的值。如果是“y”,则该记录必须直接写入输出文件,而不将该记录移动到reducer,否则,即“n”值记录必须移动到reducer进行进一步处理。。
所以,a | 1 | y,d | 2 | y,e | 1 | y不应该转到reducer,但是b | 0 | n,c | 1 | n应该转到reducer,然后输出文件。
我该怎么做??

pgky5nke

pgky5nke1#

看看这样行不行,

public class Xxxx {

    public static class MyMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {        

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {               

            FileSystem fs = FileSystem.get(context.getConfiguration());
            Random r = new Random();                
            FileSplit split = (FileSplit)context.getInputSplit();
            String fileName = split.getPath().getName();                
            FSDataOutputStream out = fs.create(new Path(fileName + "-m-" + r.nextInt()));                               
            String parts[];
            String line = value.toString();
            String[] splits = line.split(",");
            for(String s : splits) {
                parts = s.split("\\|");
                if(parts[2].equals("Y")) {                  
                    out.writeBytes(line);
                }else {
                    context.write(key, value);
                }
            }
            out.close();
            fs.close();
        }       
    }

    public static class MyReducer extends
            Reducer<LongWritable, Text, LongWritable, Text> {
        public void reduce(LongWritable key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            for(Text t : values) {
            context.write(key, t);
            }
        }
    }

    /**
     * @param args
     * @throws IOException 
     * @throws InterruptedException 
     * @throws ClassNotFoundException 
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub

        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://localhost:9000");
        conf.set("mapred.job.tracker", "localhost:9001");
        Job job = new Job(conf, "Xxxx");
        job.setJarByClass(Xxxx.class);
        Path outPath = new Path("/output_path");
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        FileInputFormat.addInputPath(job, new Path("/input.txt"));
        FileOutputFormat.setOutputPath(job, outPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
zpqajqem

zpqajqem2#

在map函数中,您将逐行获得输入。使用|作为分隔符将其拆分(通过使用 String.split() 方法是准确的)它会像这样

String[] line = value.toString().split('|');

通过访问此数组的第三个元素 line[2] 然后,使用一个简单的 if else 语句,发出具有n值的输出以供进一步处理。

ylamdve6

ylamdve63#

您可能可以使用multipleoutputs-单击此处将“y”和“n”类型的记录从Map器中分离到两个不同的文件。
接下来,为两个新生成的“y”和“n”类型的数据集运行saparate作业。对于“y”类型,将reducer的数量设置为0,这样就不用reducer了。对于“n”型,按你想要的方式使用异径管。
希望这有帮助。

相关问题