我正在将一个文件从hdfs加载到 JavaRDD
想更新一下 RDD
. 为此,我将它转换为 IndexedRDD
(https://github.com/amplab/spark-indexedrdd)我不能像现在这样 Classcast Exception
. 基本上,我会使键值对和更新的关键。 IndexedRDD
支持更新。有什么办法可以转换吗?
JavaPairRDD<String, String> mappedRDD = lines.flatMapToPair( new PairFlatMapFunction<String, String, String>()
{
@Override
public Iterable<Tuple2<String, String>> call(String arg0) throws Exception {
String[] arr = arg0.split(" ",2);
System.out.println( "lenght" + arr.length);
List<Tuple2<String, String>> results = new ArrayList<Tuple2<String, String>>();
results.addAll(results);
return results;
}
});
IndexedRDD<String,String> test = (IndexedRDD<String,String>) mappedRDD.collectAsMap();
1条答案
按热度按时间pftdvrlh1#
这个
collectAsMap()
返回一个java.util.Map
包含您的JavaPairRDD
,但与Spark无关。我的意思是,这个函数是收集一个节点中的值并使用普通java。因此,你不能把它扔给IndexedRDD
或任何其他RDD
它只是一个普通的类型Map
.我没用过
IndexedRDD
,但从示例中可以看出,需要通过向其构造函数传递PairRDD
:所以在你的代码里应该是: