我对spark还比较陌生,目前正在4个spark workers上运行一些基本的etl,从外部源读取项目,然后将它们保存到hdfs。奇怪的是,我的hdfs结果中缺少项。
因为我需要遵循某些文件系统约定,所以我希望将这些项拆分为单独的bucket,并将它们保存在单独的子文件夹中(我知道我在这里忽略了性能):
List<String> sources; // some list of strings
JavaRDD<Task> taskList; // a lot of tasks for each source
JavaRDD<Item> items = taskList.map(task -> new Extractor().execute(task));
for (String sourceId : sources) {
String path = "hdfs:///sources/" + sourceId";
JavaRDD<String> currentItems = items.filter(
// filter only matching source ID items
item -> item.getSource().equals(sourceId)).map(
// serialise each filtered item
item -> item.toString());
// save to hdfs
currentItems.saveAsTextFile("hdfs:///sources/" + sourceId);
}
jsc.stop(); // done
当我调试时 .collect().size()
在转换/筛选/Map期间的任何时候,都会显示预期/正确的项数。然而,当我在应用程序完成后查看hdfs中的文件时,我只发现每个源中比文件中预期的少1项(我知道spark将多个项写入每个part-0000x文件)。
有什么想法吗?我不是100%确定是否每个人都使用相同的rdd filter(..).map(..)
可能是问题所在。当我转储同一文件夹中的所有项目时(没有 for
谢南根)一切正常。所有项目都已写入。我试过了 items.cache()
以目前的解决方案,但这也无济于事。
我确实移除了 .filter(..)
操作和测试的一个小数据集的2个来源,提取6个项目每个。结果是每个文件夹中有12个项目,因此 filter(..
)行动显然是罪魁祸首。
更新:在查看了更多的输入源之后,减少源的数量,以便更容易地调试它,并找出问题是否只在处理的项多于worker时发生。代码的一些微小细节可能已经发生了变化,但在对越来越多的数据集进行了一些轻微的重构和测试之后,我再也无法观察到这些问题了。问题解决了(希望永远解决)。如果我知道原因,我会在这里更新。
我正在hadoop2.7.2上用yarn运行spark 1.6.0,我的spark应用程序是用java8编写的。应用程序运行顺利,并成功完成。
1条答案
按热度按时间vh0rcniy1#
我觉得你名单上的信息来源少了
List<String> sources
你用来过滤的,比你的taskList
.如果你跑了
taskList.map(item -> item.getSource()).distinct().count()
会等于sources.size()
?如果答案是否定的,使用
taskList.map(item -> item.getSource()).distinct().collectAsMap()
而不是sources
在你的for循环中。