java—rdd的最后一项未保存到hdfs

y0u0uwnf  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(397)

我对spark还比较陌生,目前正在4个spark workers上运行一些基本的etl,从外部源读取项目,然后将它们保存到hdfs。奇怪的是,我的hdfs结果中缺少项。
因为我需要遵循某些文件系统约定,所以我希望将这些项拆分为单独的bucket,并将它们保存在单独的子文件夹中(我知道我在这里忽略了性能):

List<String> sources; // some list of strings
JavaRDD<Task> taskList; // a lot of tasks for each source
JavaRDD<Item> items = taskList.map(task -> new Extractor().execute(task));
for (String sourceId : sources) {
  String path = "hdfs:///sources/" + sourceId";
  JavaRDD<String> currentItems = items.filter(
    // filter only matching source ID items
    item -> item.getSource().equals(sourceId)).map(
    // serialise each filtered item
    item -> item.toString());

  // save to hdfs
  currentItems.saveAsTextFile("hdfs:///sources/" + sourceId);
}
jsc.stop(); // done

当我调试时 .collect().size() 在转换/筛选/Map期间的任何时候,都会显示预期/正确的项数。然而,当我在应用程序完成后查看hdfs中的文件时,我只发现每个源中比文件中预期的少1项(我知道spark将多个项写入每个part-0000x文件)。
有什么想法吗?我不是100%确定是否每个人都使用相同的rdd filter(..).map(..) 可能是问题所在。当我转储同一文件夹中的所有项目时(没有 for 谢南根)一切正常。所有项目都已写入。我试过了 items.cache() 以目前的解决方案,但这也无济于事。
我确实移除了 .filter(..) 操作和测试的一个小数据集的2个来源,提取6个项目每个。结果是每个文件夹中有12个项目,因此 filter(.. )行动显然是罪魁祸首。
更新:在查看了更多的输入源之后,减少源的数量,以便更容易地调试它,并找出问题是否只在处理的项多于worker时发生。代码的一些微小细节可能已经发生了变化,但在对越来越多的数据集进行了一些轻微的重构和测试之后,我再也无法观察到这些问题了。问题解决了(希望永远解决)。如果我知道原因,我会在这里更新。
我正在hadoop2.7.2上用yarn运行spark 1.6.0,我的spark应用程序是用java8编写的。应用程序运行顺利,并成功完成。

vh0rcniy

vh0rcniy1#

我觉得你名单上的信息来源少了 List<String> sources 你用来过滤的,比你的 taskList .
如果你跑了 taskList.map(item -> item.getSource()).distinct().count() 会等于 sources.size() ?
如果答案是否定的,使用 taskList.map(item -> item.getSource()).distinct().collectAsMap() 而不是 sources 在你的for循环中。

相关问题