我试着用这个解决方案对我的减速机的输出进行排序 Hadoop
如本问题所述:
mapreduce按值降序排序
这一个和java8有一些冲突,所以我解决了如下问题:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;
public class HourlyTweetsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public Map<String , Integer> map = new LinkedHashMap<String , Integer>();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
map.put(key.toString() , sum);
result.set(sum);
context.write(key, result);
}
public void cleanup(Context context){
//Cleanup is called once at the end to finish off anything for reducer
//Here we will write our final output
Map<String , Integer> sortedMap = new HashMap<String , Integer>();
sortedMap = sortMap(map);
for (Map.Entry<String,Integer> entry : sortedMap.entrySet()){
context.write(new Text(entry.getKey()),new IntWritable(entry.getValue()));
}
}
public Map<String , Integer > sortMap (Map<String,Integer> unsortMap){
Map<String ,Integer> hashmap = new HashMap<String,Integer>();
int count=0;
List<Map.Entry<String,Integer>> list = new LinkedList<Map.Entry<String,Integer>>(unsortMap.entrySet());
//Sorting the list we created from unsorted Map
Collections.sort(list , new Comparator<Map.Entry<String,Integer>>(){
public int compare (Map.Entry<String , Integer> o1 , Map.Entry<String , Integer> o2 ){
//sorting in descending order
return o2.getValue().compareTo(o1.getValue());
}
});
for(Map.Entry<String, Integer> entry : list){
// only writing top 3 in the sorted map
// if(count>2)
// break;
hashmap.put(entry.getKey(),entry.getValue());
}
return hashmap ;
}
}
问题是在运行作业后,输出没有排序:
11 1041557
14 1304166
17 1434978
2 733462
20 1288767
23 1677571
5 460629
8 497403
11 1041557
23 1677571
2 733462
14 1304166
5 460629
17 1434978
8 497403
20 1288767
怎么解决?
2条答案
按热度按时间3j86kqsm1#
在hadoop的map/reduce上下文中,我不会判断这段代码是否需要额外的步骤来确保正确性。
但一个明显的错误是
sortMap
,这是线它创建了一个不保持任何特定顺序的Map,因此按排序顺序填充它没有任何效果。它应该是一个
LinkedHashMap
相反,就像链接问答的代码一样。请注意,这与调用者创建的Map无关:
在这里,对创建的Map的引用被
sortMap
因此,map示例完全过时了。但是,由于您要做的只是在排序后的Map上迭代一次以执行一个操作,因此不需要将排序后的列表复制到结果中Map
总之,您可以通过遍历列表来执行操作:它使用java 8的
Map.Entry.comparingByValue(Comparator.reverseOrder())
内置比较器。如果java 7需要兼容性,请使用问题中显示的比较器代码,请注意,此代码使用
ArrayList
而不是LinkedList
,正如您将要对它执行的所有三个操作一样,1)使用Map项集的内容初始化它,2)对它进行就地排序,3)对它进行迭代,使用ArrayList
. 对于java中的步骤2)来说尤其如此 8hgtggwj02#
我在这里看到两种选择:
只用一个减速机。这就要求所有的数据都能放在一台机器的内存中。然后,单个减速机的输入将按键(您想要的)的顺序进行排序。
使用totalorderpartitionerhttps://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/lib/partition/totalorderpartitioner.html
它在mapreduce管道中强制执行一个附加阶段,将元素划分为已排序的bucket。
下面是一个示例(不是我的),演示了如何使用totalorderpartioner:https://gist.github.com/asimjalis/e5627dc2ff2b23dac70b