如何使用spark并行化列表?

jq6vz3qz  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(446)

假设我读了整个文件:

JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(inputDataPath);

然后,我有下面的Map器,它是:

JavaRDD<List<String>> processingFiles = filesRDD.map(fileNameContent -> {
     List<String> results = new ArrayList<String>();

     for ( some loop ) {
         if (condition) {
             results.add(someString);
         }
     }
     . . .

     return results;
});

为了便于讨论,假设在Map器中我需要创建一个字符串列表,从每个文件返回。现在,每个列表中的每个字符串都可以单独查看,以后需要单独处理。我不希望spark一次处理每个列表,而是一次处理每个列表的每个字符串。稍后当我使用 collect() 我得到一份名单。
一种方法是:如何分别为每个字符串并行化列表列表,而不是分别为每个列表并行化列表?

chhkpiq4

chhkpiq41#

与其将filesrddMap为列表列表,不如将其平面Map,这样就可以得到字符串的rdd。
编辑:根据请求添加注解
map是一个1:1函数,其中1个输入行->1个输出行。flatmap是一个1:n函数,其中1个输入行->多个(或0个)输出行。如果您使用flatmap,您可以设计它,使您的输出rdd是字符串的rdd,而当前您的输出rdd是字符串列表的rdd。听起来这就是你想要的。我不是javaspark用户,所以我不能给出语法细节。检查这里的语法帮助

相关问题