hadoop:如何启动2Map器和2还原器

kkbh8khc  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(267)

我正在尝试开发和hadoop应用程序。我想在我的主要方法中启动2Map器和2还原器。但是,我不断得到一个演员的错误,这使我问我如何才能做到这一点?
Map器1:

@SuppressWarnings("javadoc")
public class IntervallMapper1 extends Mapper<LongWritable, Text, Text, LongWritable> {
    private static Logger logger = Logger.getLogger(IntervallMapper1.class.getName());

    private static Category categoriy;
    private static Value value;

    private String[] values = new String[4];
    private final static LongWritable one = new LongWritable(1);

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        if(!this.categoriy.valueIsMissing(value.toString())){ // Luftdruck und Windstärke vorhanden...
            this.logger.info("Key: " + values[0] + values[1]);
            values = this.value.getValues(value.toString());
            context.write(new Text(values[0] + values[1]), this.one); // Station-Datum als Key und Value = 1
        }
    }
}

减速器1:

@SuppressWarnings("javadoc")
public class IntervallReducer1 extends Reducer<Text, LongWritable, Text, LongWritable> {
    private static Logger logger = Logger.getLogger(IntervallReducer1.class.getName());

    private String key = null;
    private static LongWritable result = new LongWritable();
    private long sum;

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {

        for (LongWritable value : values) {
            if(this.key == null){
                logger.info("Erster Durchlauf");
                System.out.println("---> " + value.get());
                sum = value.get();
                this.key = key.toString().substring(0, 10);
            } else if (key.toString().contains(this.key)) { // TODO: key.toString().substring(0, 10)
                logger.info("Key bereit vorhanden");
                System.out.println("---> " + sum);
                sum += value.get();
            } else { // Falls Key nicht bereit vorhanden
                logger.info("Key nicht vorhanden");
                result.set(sum);
                logger.info("Value: " + sum);
                context.write(new Text(this.key), result);
                this.key = key.toString().substring(0, 10);
                sum = value.get();
            }
        }
    }
}

Map器2:

@SuppressWarnings("javadoc")
public class IntervallMapper1 extends Mapper<LongWritable, Text, Text, LongWritable> {
    private static Logger logger = Logger.getLogger(IntervallMapper1.class.getName());

    private static Category categoriy;
    private static Value value;

    private String[] values = new String[4];
    private final static LongWritable one = new LongWritable(1);

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        if(!this.categoriy.valueIsMissing(value.toString())){ // Luftdruck und Windstärke vorhanden...
            this.logger.info("Key: " + values[0] + values[1]);
            values = this.value.getValues(value.toString());
            context.write(new Text(values[0] + values[1]), this.one); // Station-Datum als Key und Value = 1
        }
    }
}

主要内容:

@SuppressWarnings("javadoc")
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setMapperClass(IntervallMapper1.class);
//      job.setCombinerClass(IntervallReducer1.class);
        job.setReducerClass(IntervallReducer1.class);
        job.setMapperClass(IntervallMapper2.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setJarByClass(IntervallStart.class);

        job.waitForCompletion(true);
    }

错误:

Error: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
    at ncdcW03.IntervallMapper2.map(IntervallMapper2.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题