mapreduce作业:调用reducer时Map程序不调用

b1zrtrql  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(302)

我有四个类,分别是mapperone,reducerone,mappertwo,reducertwo。我想要一个链。mapperone-->reducerone-->输出文件生成,输入到mappertwo-->mappertwo-->reducertwo-->最终输出文件。

我的驾驶员等级代码:

public class StockDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    System.out.println(" Driver invoked------");
    Configuration config = new Configuration();
    config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
    config.set("mapred.textoutputformat.separator", " --> ");

    String inputPath="In\\NYSE_daily_prices_Q_less.csv";

    String outpath = "C:\\Users\\Outputs\\run1";
    String outpath2 = "C:\\UsersOutputs\\run2";

    Job job1 = new Job(config,"Stock Analysis: Creating key values");
    job1.setInputFormatClass(TextInputFormat.class);
    job1.setOutputFormatClass(TextOutputFormat.class);

    job1.setMapOutputKeyClass(Text.class);
    job1.setMapOutputValueClass(StockDetailsTuple.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(Text.class);

    job1.setMapperClass(StockMapperOne.class);
    job1.setReducerClass(StockReducerOne.class);

    FileInputFormat.setInputPaths(job1, new Path(inputPath));
    FileOutputFormat.setOutputPath(job1, new Path(outpath));

    //THE SECOND MAP_REDUCE TO DO CALCULATIONS

    Job job2 = new Job(config,"Stock Analysis: Calculating Covariance");
    job2.setInputFormatClass(TextInputFormat.class);
    job2.setOutputFormatClass(TextOutputFormat.class);
    job2.setMapOutputKeyClass(LongWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(Text.class);
    job2.setMapperClass(StockMapperTwo.class);
    job2.setReducerClass(StockReducerTwo.class);

    String outpath3=outpath+"\\part-r-00000";
    System.out.println("OUT PATH 3: " +outpath3 );
    FileInputFormat.setInputPaths(job2, new Path(outpath3));
    FileOutputFormat.setOutputPath(job2, new Path(outpath2));

    if(job1.waitForCompletion(true)){
    System.out.println(job2.waitForCompletion(true));
    }
}

 }

我的mapperone和reducerone正在正确执行,输出文件存储在正确的路径中。现在,当执行第二个作业时,只调用reducer。下面是我的mappertwo和reducertwo代码。

Map器2

公共类stockmappertwo扩展Map器{

public void map(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
    System.out.println("------ MAPPER 2 CALLED-----");

    for(Text val: values){
        System.out.println("KEY: "+ key.toString() + "   VALUE: "+ val.toString());
        //context.write(new Text("mapper2"), new Text("hi"));
        context.write(new LongWritable(2), new Text("hi"));
    }

}
 }

减速器2

public class StockReducerTwo extends Reducer<LongWritable, Text, Text, Text>{

public void reduce(LongWritable key, Iterable<Text>values, Context context) throws IOException, InterruptedException{

        System.out.println(" REDUCER 2 INVOKED");

        context.write(new Text("hello"), new Text("hi"));

}
  }

我对这个配置的怀疑是

为什么即使在job2.setmapperclass(stockmappertwo.class)中设置了Map器,也会跳过它;
如果我没有设置job2.setmapoutputkeyclass(longwritable.class);job2.setmapoutputvalueclass(text.class);即使是reducer也不会被调用。这个错误就要来了。
java.io.ioexception:Map中的键类型不匹配:应为org.apache.hadoop.io.text,已在org.apache.hadoop.mapred.maptask$mapoutputbuffer.collect(maptask)收到org.apache.hadoop.io.longwritable。java:870)在org.apache.hadoop.mapred.maptask$newoutputcollector.write(maptask。java:573)在org.apache.hadoop.mapreduce.taskinputoutputcontext.write(taskinputoutputcontext。java:80)在org.apache.hadoop.mapreduce.mapper.map(mapper。java:124)在org.apache.hadoop.mapreduce.mapper.run(mapper。java:144)
这是怎么发生的?请帮忙。我无法正确调用Map器和还原器。请引导我。

dfddblmv

dfddblmv1#

很抱歉发布此问题。我没有注意到我的Map是错误的。
而不是这个

public void map(LongWritable key,Text values, Context context) throws IOException, InterruptedException{

我把它当作

public void map(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{

我花了很长时间才注意到这个错误。我不知道为什么没有正确的错误来显示错误。不管怎样,现在已经解决了。

相关问题