我创建了一个rdd,如下所示:
JavaPairRDD<String,String> inputDataFiles = sparkContext.wholeTextFiles("hdfs://ip:8020/user/cdhuser/inputFolder/");
在这个rdd上我执行了一个 map
处理单个文件并调用 foreach
触发相同的 map
.
JavaRDD<Object[]> output = inputDataFiles.map(new Function<Tuple2<String,String>,Object[]>()
{
private static final long serialVersionUID = 1L;
@Override
public Object[] call(Tuple2<String,String> v1) throws Exception
{
System.out.println("in map!");
//do something with v1.
return Object[]
}
});
output.foreach(new VoidFunction<Object[]>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Object[] t) throws Exception {
//do nothing!
System.out.println("in foreach!");
}
});
这段代码非常适合在本地笔记本电脑上进行独立设置,同时访问本地文件和远程hdfs文件。
在集群中,相同的代码不会产生任何结果。我的直觉是,数据还没有到达各个执行者,因此 map
以及 foreach
不起作用。可能是猜测。但我不明白为什么这在集群中不起作用。我甚至看不到报纸上的书面声明 map
以及 foreach
正在以集群执行模式打印。
我注意到在独立输出中有一行代码在集群执行中没有看到。
16/09/07 17:35:35 INFO WholeTextFileRDD: Input split: Paths:/user/cdhuser/inputFolder/data1.txt:0+657345,/user/cdhuser/inputFolder/data10.txt:0+657345,/user/cdhuser/inputFolder/data2.txt:0+657345,/user/cdhuser/inputFolder/data3.txt:0+657345,/user/cdhuser/inputFolder/data4.txt:0+657345,/user/cdhuser/inputFolder/data5.txt:0+657345,/user/cdhuser/inputFolder/data6.txt:0+657345,/user/cdhuser/inputFolder/data7.txt:0+657345,/user/cdhuser/inputFolder/data8.txt:0+657345,/user/cdhuser/inputFolder/data9.txt:0+657345
我也有类似的代码 textFile()
以前对集群上的单个文件有效。问题在于 wholeTextFiles()
只是。
请建议什么是最好的方式来让这个工作或其他替代方法。
我的设置是cloudera5.7发行版和spark服务。我把主人当作 yarn-client
.
这个 action
可以是任何东西。调用 map
. 我也试过了 System.out.println("Count is:"+output.count());
,我得到了正确的答案 10
,因为文件夹中有10个文件,但Map仍然拒绝工作。
谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!