java—在mapreduce程序中,驱动程序不调用reducer

yacmzcpb  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(390)

根据map-reduce编程模型,我编写了这个程序,其中驱动程序代码如下:我的驱动程序类

public class MRDriver extends Configured implements Tool
{
        @Override
    public int run(String[] strings) throws Exception {
        if(strings.length != 2)
        {
            System.err.println("usage : <inputlocation> <inputlocation> <outputlocation>");
            System.exit(0);
        }
        Job job = new Job(getConf(), "multiple files");
        job.setJarByClass(MRDriver.class);
        job.setMapperClass(MRMapper.class);        
        job.setReducerClass(MRReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(strings[0]));
        FileOutputFormat.setOutputPath(job, new Path(strings[1]));

        return job.waitForCompletion(true) ? 0 : 1;

        //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        System.exit(ToolRunner.run(conf, new MRDriver(), args));
    } 
}

我的mapper类

class MRMapper extends Mapper<LongWritable, Text, Text, Text>
{
    @Override
    public void map(LongWritable key, Text value, Context context)
    {
        try 
        {          
            StringTokenizer iterator;
            String idsimval = null;
            iterator = new StringTokenizer(value.toString(), "\t");
            String id = iterator.nextToken();
            String sentival = iterator.nextToken();
            if(iterator.hasMoreTokens())
            idsimval = iterator.nextToken();
            context.write(new Text("unique"), new Text(id + "_" + sentival + "_" + idsimval));

        } catch (IOException | InterruptedException e) 
        {
            System.out.println(e);
        }
    }

我的减速机课

class MRReducer extends Reducer<Text, Text, Text, Text> {

    String[] records;
    HashMap<Long, String> sentiMap = new HashMap<>();
    HashMap<Long, String> cosiMap = new HashMap<>();
    private String leftIdStr;
    private ArrayList<String> rightIDList, rightSimValList, matchingSimValList, matchingIDList;
    private double leftVal;
    private double rightVal;
    private double currDiff;
    private double prevDiff;
    private int finalIndex;
    Context newContext;
    private int i;

    public void reducer(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
        for (Text string : value) {
            records = string.toString().split("_");
            sentiMap.put(Long.parseLong(records[0]), records[1]);
            if (records[2] != null) {
                cosiMap.put(Long.parseLong(records[0]), records[2]);
            }
            if(++i == 2588)
            {
                newContext = context;
                newfun();
            }
            context.write(new Text("hello"), new Text("hii"));

        }
        context.write(new Text("hello"), new Text("hii"));
    }
        void newfun() throws IOException, InterruptedException
        {
            for (HashMap.Entry<Long, String> firstEntry : cosiMap.entrySet()) {
                try {
                    leftIdStr = firstEntry.getKey().toString();
                    rightIDList = new ArrayList<>();
                    rightSimValList = new ArrayList<>();
                    matchingSimValList = new ArrayList<>();
                    matchingIDList = new ArrayList<>();
                    for (String strTmp : firstEntry.getValue().split(" ")) {
                        rightIDList.add(strTmp.substring(0, 18));
                        rightSimValList.add(strTmp.substring(19));
                    }
                    String tmp = sentiMap.get(Long.parseLong(leftIdStr));
                    if ("NULL".equals(tmp)) {
                        leftVal = Double.parseDouble("0");
                    } else {
                        leftVal = Double.parseDouble(tmp);
                    }
                    tmp = sentiMap.get(Long.parseLong(rightIDList.get(0)));
                    if ("NULL".equals(tmp)) {
                        rightVal = Double.parseDouble("0");
                    } else {
                        rightVal = Double.parseDouble(tmp);
                    }
                    prevDiff = Math.abs(leftVal - rightVal);
                    int oldIndex = 0;
                    for (String s : rightIDList) {
                        try {
                            oldIndex++;
                            tmp = sentiMap.get(Long.parseLong(s));
                            if ("NULL".equals(tmp)) {
                                rightVal = Double.parseDouble("0");
                            } else {
                                rightVal = Double.parseDouble(tmp);
                            }
                            currDiff = Math.abs(leftVal - rightVal);
                            if (prevDiff > currDiff) {
                                prevDiff = currDiff;
                            }
                        } catch (Exception e) {
                        }

                    }
                    oldIndex = 0;
                    for (String s : rightIDList) {
                        tmp = sentiMap.get(Long.parseLong(s));
                        if ("NULL".equals(tmp)) {
                            rightVal = Double.parseDouble("0");
                        } else {
                            rightVal = Double.parseDouble(tmp);
                        }
                        currDiff = Math.abs(leftVal - rightVal);
                        if (Objects.equals(prevDiff, currDiff)) {
                            matchingSimValList.add(rightSimValList.get(oldIndex));
                            matchingIDList.add(rightIDList.get(oldIndex));
                        }
                        oldIndex++;
                    }
                    finalIndex = rightSimValList.indexOf(Collections.max(matchingSimValList));
                    newContext.write(new Text(leftIdStr), new Text(" " + rightIDList.get(finalIndex) + ":" + rightSimValList.get(finalIndex)));
                } catch (NumberFormatException nfe) {

                }

            }
        }
}

问题是什么?它属于map reduce程序还是hadoop系统配置?每当我运行这个程序时,它只将mapper输出写入hdfs。

xpcnnkqh

xpcnnkqh1#

在reducer类中,必须重写reduce方法。您正在声明一个reducer方法,这是不正确的。
尝试在reducer类中修改函数:

@Override
    public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {

相关问题