java—产生垃圾值的mapreduce键值对的输出值

zpgglvta  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(275)

问题陈述-找到最大值并将其与键一起打印
输入:

Key       Value
ABC       10
TCA       13
RTY       23
FTY       45

左侧列上的键将是唯一的。不允许重复。
输出:

FTY       45

因为45是所有值中最高的,所以它必须与键一起打印。
我已经写了mapreduce代码,基于这个链接中共享的伪代码,如何设计mapreduce的键值对来寻找集合中的最大值?
Map-

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;

public class Map 
            extends Mapper<LongWritable,Text,Text,IntWritable>
{

private Text maxKey = new Text();
private IntWritable maxValue = new IntWritable(Integer.MIN_VALUE);

@Override
protected void map( LongWritable key,Text value,Context context) 
                        throws IOException,InterruptedException
{
    String line = value.toString().trim();
    StringTokenizer token = new StringTokenizer(line);

    if(token.countTokens() == 2)
    {
        String str = token.nextToken();

        while(token.hasMoreTokens())
        {
            int temp = Integer.parseInt(token.nextToken());

            if(temp > maxValue.get())
            {
                maxValue.set(temp);
                maxKey.set(str);
            }
        }
    }

}

@Override
protected void cleanup(Context context)
        throws IOException,InterruptedException
{
    context.write(maxKey,maxValue);
}
}

减少

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class Reduce 
                extends Reducer<Text,IntWritable,Text,IntWritable>
{

private Text maxKey = new Text();
private IntWritable maxValue = new IntWritable(Integer.MIN_VALUE);

@Override
protected void reduce(Text key,Iterable<IntWritable> values,Context context)
                                        throws IOException, 
InterruptedException
    {
        Iterator<IntWritable> itr = values.iterator();

        while(itr.hasNext())
        {
            int temp = itr.next().get();
            if(temp > maxValue.get())
            {
                maxKey.set(key);
                maxValue.set(temp);
            }
        }

    }

@Override
protected void cleanup(Context context)
        throws IOException,InterruptedException
{
    context.write(maxKey,maxValue);
}
}

驾驶员等级:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapReduceDriver
{
public static void main(String[] args) throws Exception
{
    Job job = new Job();

    job.setJarByClass(MapReduceDriver.class);
    job.setJobName("DNA Codon Analysis - Part 2");

    FileInputFormat.addInputPath(job,new Path(args[0]));
    FileOutputFormat.setOutputPath(job,new Path(args[1]));

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setNumReduceTasks(1);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    System.exit(job.waitForCompletion(true)?0:1);

}
}

程序编译并运行以显示此输出-

-2147483648

maxvalue of map()和reduce()的设置可能不正确。如何正确设置值(使用integer.min\值初始化和比较后更新),以便reduce()函数接收正确的键值对?

qmelpv7a

qmelpv7a1#

由于您的键总是唯一的,因此无法在reducer中聚合它们。因此,如果您的数据集不是非常大,您可以使用一个公共键来编写Map器的输出,这将强制Map器的所有输出只转到一个reducer。
然后在reducer中,您可以对值进行迭代以比较并将最大值与键一起写入。
在mapper类中,将文件写入 context 使用公共密钥对

public class Map extends Mapper<LongWritable,Text,Text,Text>{
private final Text commonKey = new Text("CommonKey");

    @Override
    protected void map( LongWritable key,Text value,Context context) 
                            throws IOException,InterruptedException {
        String line = value.toString().trim();
        String[] kvpair = line.split("\\s+");
        context.write(commonKey, new Text(kvpair[0] + "," + kvpair[1]));
    }
}

然后在reducer中,找到最大值并写入上下文。

public static class Reduce extends Reducer<Text, Text, NullWritable, Text>{
    private final Integer MAXIMUM_VALUE = Integer.MIN_VALUE;
    public void reduce(Text commonKey, Iterable<Text> values, Context context){
        Integer finalMax = MAXIMUM_VALUE;
        String finalKey;
        for (Text value: values){
            String[] kvpair = value.toString().trim().split(",")
            if(Integer.parseInt(kvpair[1]) > finalMax){
                finalKey = kvpair[0];
                finalMax = Integer.parseInt(kvpair[1]);
            }
        }
        context.write(new Text(finalKey), new IntWritable(finalMax) );
    }
}

代码中可能会有一些错误。只是把它写在一个文本编辑器里,让你稍微了解一下如何以不同的方式处理你的问题。

相关问题