java—将多个参数发送到不同类型的reducer中

bvn4nwqk  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(382)

我试图从csv文件发送2个float参数和1个int参数到reducer。浮点值结果已成功加载到hdfs中,但int结果在最终输出中不可见。我验证了context.write方法是否返回了正确的值,但不明白为什么不显示int值?谁能帮帮我吗。

public class MaxTemperature extends Configured implements Tool {

public static class MapMapper extends Mapper<LongWritable, Text, Text, PairWritable>{

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String regex = ",";//''single quote not applicable for comma.
        String[] val = value.toString().split(regex);

        FloatWritable[]  vv = new FloatWritable[2];
        vv[0]= new FloatWritable(Float.parseFloat(val[3]));
        vv[1]=new FloatWritable(Float.parseFloat(val[13]));

        int k= Integer.parseInt(val[17]);
        IntWritable kk = new IntWritable(k);                
        float dd=Float.parseFloat(val[3]);
        PairWritable ddd = new PairWritable();
        context.write(new Text(val[2]), ddd.set(vv[0], vv[1],kk));

    }

} 

 public static class PairWritable extends ArrayWritable implements Writable{

    public PairWritable() {
        super(FloatWritable.class);
        // TODO Auto-generated constructor stub
    }

    private FloatWritable floatone;
     private FloatWritable floattwo;
     private IntWritable intone;

     public String toString() {

            String s = Float.toString(floatone.get());
            String  a=Float.toString(floattwo.get());
            String q = s+'\t'+a;  
            return q;
          }

    public PairWritable set(float f1, float f2, int k){
         FloatWritable ff1 = new FloatWritable(f1);
         FloatWritable ff2 = new FloatWritable(f2);
         IntWritable inq = new IntWritable(k);
         set(ff1, ff2,inq);
         return this;
     }

     public PairWritable set(FloatWritable f1, FloatWritable f2, IntWritable f3){
         this.floatone=f1;
         this.floattwo=f2;
         this.intone=f3;
         return this;
     }

     public float getone(){
         return floatone.get();
     }

     public float gettwo(){
         return floattwo.get();
     }

     public int getthreeint(){
         return intone.get();
     }

     public void write(DataOutput out) throws IOException {
            out.writeFloat(floatone.get());
            out.writeFloat(floattwo.get());
            out.writeInt(intone.get());
        }

        public void readFields(DataInput in) throws IOException {
            floatone = new FloatWritable(in.readFloat());
            floattwo = new FloatWritable(in.readFloat());
            intone= new IntWritable(in.readInt());
        }

 }

public static class Mapreducers extends Reducer<Text,PairWritable, Text,PairWritable>{

    public void reduce(Text key, Iterable<PairWritable> values,Context context) throws IOException, InterruptedException{

        float sumone =0;
        float sumtwo=0;
        int sumthree=0;

        for(PairWritable dd: values){
        sumone+=dd.getone();
        sumtwo+=dd.gettwo();
        sumthree+=dd.getthreeint();

        }

        //FloatWritable result1 = new FloatWritable(sumone);
        //FloatWritable result2 = new FloatWritable(sumtwo);
        //IntWritable result3 = new IntWritable(sumthree);
        PairWritable ddd = new PairWritable();
        context.write(key, ddd.set(sumone, sumtwo,sumthree));

}

}

public int run(String[] args) throws Exception {
    Job job = new Job();
    job.setJarByClass(MaxTemperature.class);
    job.setJobName("MaxTemperature");

    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(args[0]), conf);

    if(fs.exists(new Path(args[1]))){
        fs.delete(new Path(args[1]),true);
    }

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(MapMapper.class);
    job.setCombinerClass(Mapreducers.class);
    //job.setNumReduceTasks(0);
    job.setReducerClass(Mapreducers.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(PairWritable.class);

    return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception{
    int xx =1;
    xx = ToolRunner.run(new MaxTemperature(), args);
    System.exit(xx);    
}

}
nxagd54h

nxagd54h1#

这里的问题是,我没有在tostring方法中将int转换成string。
使用时间:

public String toString() {
        String s = Float.toString(floatone.get());
        String  a=Float.toString(floattwo.get());
        String q = s+'\t'+a;  
        return q;
      }

应该使用:

public String toString() {

            String s = Float.toString(floatone.get());
            String  a=Float.toString(floattwo.get());
            String r=Integer.toString(intone.get());
            String q = s+'\t'+a+'\t'+r;  
            return q;
          }

相关问题