为什么tuplewritable在传递给recurder时变为空

js81xvg6  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(391)

我有一个Map(object key,text value,context context),在context.write()的上下文中放置一个tuplewritable。在reduce(文本键、iterable值、上下文)中,我读取了tuplewritable,但它是空的。下面是我的代码。这使我困惑,任何帮助将不胜感激。

package boc.competition.team1;

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.join.TupleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;;

public class App 
{
    public static class SCSTransMap extends Mapper<Object,Text,Text,TupleWritable>{
        private Text name = new Text();

        @Override
        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
                IntWritable i = new IntWritable(1);
                TupleWritable result = new TupleWritable(new IntWritable[] { i, new IntWritable(3)});
                System.out.println(result.get(0)+"====="+result.get(1));
//------here print the right value  1=====3
                context.write(name, result);
            }
        }
    }
    public static class reducer extends Reducer<Text,TupleWritable,Text,Text>{
        @Override
        public void reduce(Text key,Iterable<TupleWritable> values,Context context) throws IOException,InterruptedException{

            for(TupleWritable tuple:values) {
                System.out.println(tuple.get(0)+"====="+tuple.get(1));
// and here print 0=======0
            }

        }
    }

    public static void main( String[] args ) throws Exception
    {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf,"team1Job");
        job.setJarByClass(App.class);
        job.setReducerClass(reducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TupleWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        MultipleInputs.addInputPath(job, new Path("C:\\Program Files\\PuTTY\\data\\scs\\Scs_Journal.csv"), TextInputFormat.class,SCSTransMap.class);
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

        System.exit(job.waitForCompletion(true)?0:1);
    }
}
bgibtngc

bgibtngc1#

我使用一个用户定义的可写类而不是tuplewritable类来传递map中的值,以减少用户定义的可写性

package boc.competition.team1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;

public class IntPairWritable implements Writable {
        private IntWritable value1;
        private IntWritable value2;

        public IntPairWritable() {
            value1 = new IntWritable();
            value2 = new IntWritable();
        }

        public IntPairWritable(int value1, int value2) {
            this.value1 = new IntWritable(value1);
            this.value2 = new IntWritable(value2);
        }

        public int getInt1() {
            return value1.get();
        }

        public int getInt2() {
            return value2.get();
        }

        @Override
        public String toString() {
            return value1.toString()+" "+value2.toString();
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            value1.readFields(in);
            value2.readFields(in);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            value1.write(out);
            value2.write(out);
        }
}
brvekthn

brvekthn2#

根据 TupleWritable.java 源文件:


* This is *not* a general-purpose tuple type. In almost all cases, users are

 * encouraged to implement their own serializable types, which can perform
 * better validation and provide more efficient encodings than this class is
 * capable. TupleWritable relies on the join framework for type safety and
 * assumes its instances will rarely be persisted, assumptions not only
 * incompatible with, but contrary to the general case.

另请参见chris douglas-3的答案:
您需要访问tuplewritable::setwritten(int)。如果你想用
tuplewritable在join包之外,那么您需要
(可能还有相关的方法,比如clearwrited(int))public和
重新编译。
这样说似乎是有把握的 TupleWritable 不是mapreduce作业的公共使用类。

相关问题