mapreduce输出排序

jtoj6r0c  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(317)

我试着用这个解决方案对我的减速机的输出进行排序 Hadoop 如本问题所述:
mapreduce按值降序排序
这一个和java8有一些冲突,所以我解决了如下问题:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;

public class HourlyTweetsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();
    public Map<String , Integer> map = new LinkedHashMap<String , Integer>();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        map.put(key.toString() , sum);

        result.set(sum);
        context.write(key, result);
    }

    public void cleanup(Context context){
        //Cleanup is called once at the end to finish off anything for reducer
        //Here we will write our final output
        Map<String , Integer>  sortedMap = new HashMap<String , Integer>();
        sortedMap = sortMap(map);

        for (Map.Entry<String,Integer> entry : sortedMap.entrySet()){
            context.write(new Text(entry.getKey()),new IntWritable(entry.getValue()));
        }
    }

    public Map<String , Integer > sortMap (Map<String,Integer> unsortMap){

        Map<String ,Integer> hashmap = new HashMap<String,Integer>();
        int count=0;
        List<Map.Entry<String,Integer>> list = new LinkedList<Map.Entry<String,Integer>>(unsortMap.entrySet());
        //Sorting the list we created from unsorted Map
        Collections.sort(list , new Comparator<Map.Entry<String,Integer>>(){
            public int compare (Map.Entry<String , Integer> o1 , Map.Entry<String , Integer> o2 ){
                //sorting in descending order
                return o2.getValue().compareTo(o1.getValue());
            }
        });

        for(Map.Entry<String, Integer> entry : list){
            // only writing top 3 in the sorted map
            // if(count>2)
            // break;
            hashmap.put(entry.getKey(),entry.getValue());
        }

        return hashmap ;
    }

}

问题是在运行作业后,输出没有排序:

11  1041557
14  1304166
17  1434978
2   733462
20  1288767
23  1677571
5   460629
8   497403
11  1041557
23  1677571
2   733462
14  1304166
5   460629
17  1434978
8   497403
20  1288767

怎么解决?

3j86kqsm

3j86kqsm1#

在hadoop的map/reduce上下文中,我不会判断这段代码是否需要额外的步骤来确保正确性。
但一个明显的错误是 sortMap ,这是线

Map<String ,Integer> hashmap = new HashMap<String,Integer>();

它创建了一个不保持任何特定顺序的Map,因此按排序顺序填充它没有任何效果。它应该是一个 LinkedHashMap 相反,就像链接问答的代码一样。
请注意,这与调用者创建的Map无关:

Map<String , Integer>  sortedMap = new HashMap<String , Integer>();
sortedMap = sortMap(map);

在这里,对创建的Map的引用被 sortMap 因此,map示例完全过时了。但是,由于您要做的只是在排序后的Map上迭代一次以执行一个操作,因此不需要将排序后的列表复制到结果中 Map 总之,您可以通过遍历列表来执行操作:

public void cleanup(Context context) {
    //Cleanup is called once at the end to finish off anything for reducer
    //Here we will write our final output

    List<Map.Entry<String,Integer>> list = new ArrayList<>(map.entrySet());

    Collections.sort(list, Map.Entry.comparingByValue(Comparator.reverseOrder()));

    for(Map.Entry<String,Integer> entry: list) {
        context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
    }
}

它使用java 8的 Map.Entry.comparingByValue(Comparator.reverseOrder()) 内置比较器。如果java 7需要兼容性,请使用问题中显示的比较器代码,

new Comparator<Map.Entry<String, Integer>>() {
    public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
        //sorting in descending order
        return o2.getValue().compareTo(o1.getValue());
    }
}

请注意,此代码使用 ArrayList 而不是 LinkedList ,正如您将要对它执行的所有三个操作一样,1)使用Map项集的内容初始化它,2)对它进行就地排序,3)对它进行迭代,使用 ArrayList . 对于java中的步骤2)来说尤其如此 8

hgtggwj0

hgtggwj02#

我在这里看到两种选择:
只用一个减速机。这就要求所有的数据都能放在一台机器的内存中。然后,单个减速机的输入将按键(您想要的)的顺序进行排序。
使用totalorderpartitionerhttps://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/lib/partition/totalorderpartitioner.html
它在mapreduce管道中强制执行一个附加阶段,将元素划分为已排序的bucket。
下面是一个示例(不是我的),演示了如何使用totalorderpartioner:https://gist.github.com/asimjalis/e5627dc2ff2b23dac70b

相关问题