编辑:已使用解决 RDD.collectAsMap()
我正在尝试复制第28-30页的问题解决方案http://on-demand.gputechconf.com/gtc/2016/presentation/s6424-michela-taufer-apache-spark.pdf
我有一个hashmap,我在map函数之外示例化它。hashmap包含以下数据:
{1:2, 2:3, 3:2, 4:2, 5:3}
以前定义的rdd previousrdd具有以下类型:
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>>
有以下数据:
1: [(1,2), (1,5)]
2: [(2,1), (2,3), (2,5)]
3: [(3,2), (3,4)]
4: [(4,3), (4,5)]
5: [(5,1), (5,2), (5,4)]
我尝试使用flatmaptopair创建一个新rdd:
JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, Integer, Integer>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> integerIterableTuple2) throws Exception {
Integer count;
ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
count = hashMap.get(integerIterableTuple2._1);
for (Tuple2<Integer, Integer> t : integerIterableTuple2._2) {
Integer tcount = hashMap.get(t._2);
if (count < tcount || (count.equals(tcount) && integerIterableTuple2._1 < t._2)) {
list.add(t);
}
}
return list.iterator();
}
});
但在这方面 hashMap.get(t._2)
在for循环中,大部分时间都是空的。我已经检查了hashmap中是否有正确的值。
有没有办法在spark函数中正确获取hashmap的值?
2条答案
按热度按时间fjaof16o1#
应该有用。spark应该捕获您的变量,序列化它,并随每个任务发送给每个worker。你可以试试广播这张Map
使用结果而不是
hashMap
. 它在内存方面也更有效(每个执行器共享存储)。juzqafwq2#
我对类变量也有类似的问题。您可以尝试将变量设为局部变量,或多声明一个,如下所示:
我认为这是由于Spark序列化机制。你可以在这里了解更多。