要更新的spark rdd

d7v8vwbk  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(282)

我正在将一个文件从hdfs加载到 JavaRDD 想更新一下 RDD . 为此,我将它转换为 IndexedRDD (https://github.com/amplab/spark-indexedrdd)我不能像现在这样 Classcast Exception . 基本上,我会使键值对和更新的关键。 IndexedRDD 支持更新。有什么办法可以转换吗?

JavaPairRDD<String, String> mappedRDD =  lines.flatMapToPair( new PairFlatMapFunction<String, String, String>()
    {
        @Override
        public Iterable<Tuple2<String, String>> call(String arg0) throws Exception {

            String[] arr = arg0.split(" ",2);
            System.out.println( "lenght" + arr.length);
             List<Tuple2<String, String>> results = new ArrayList<Tuple2<String, String>>();
             results.addAll(results);
            return results;
        }
    });        

    IndexedRDD<String,String> test = (IndexedRDD<String,String>) mappedRDD.collectAsMap();
pftdvrlh

pftdvrlh1#

这个 collectAsMap() 返回一个 java.util.Map 包含您的 JavaPairRDD ,但与Spark无关。我的意思是,这个函数是收集一个节点中的值并使用普通java。因此,你不能把它扔给 IndexedRDD 或任何其他 RDD 它只是一个普通的类型 Map .
我没用过 IndexedRDD ,但从示例中可以看出,需要通过向其构造函数传递 PairRDD :

// Create an RDD of key-value pairs with Long keys.
val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0)))
// Construct an IndexedRDD from the pairs, hash-partitioning and indexing
// the entries.
val indexed = IndexedRDD(rdd).cache()

所以在你的代码里应该是:

IndexedRDD<String,String> test = new IndexedRDD<String,String>(mappedRDD.rdd());

相关问题