从dataset.map阶段0开始的Spark www.example.com在其他阶段重复

qv7cva1a  于 2023-01-13  发布在  Apache
关注(0)|答案(1)|浏览(122)

这是与我的另一个职位:Spark Java Map processing entire data set in all executors。我有一个简单的使用案例:
1.从4个分区中的DB读取数据

properties.setProperty("partitionColumn", "num_rows");
 properties.setProperty("lowerBound", "0");
 properties.setProperty("upperBound", getTotalRowCount(ID));
 properties.setProperty("numPartitions", "4");
 properties.setProperty("Driver", driver);
 properties.setProperty("user", user);
 properties.setProperty("password", password);
 Dataset<Row> records = SparkSession.getActiveSession().get().read().jdbc(jdbcUrl, table, properties);

1.在循环中处理记录集以转换/过滤/格式化等,

Dataset<String> stringRecordSet = dbRecordsSet.map((MapFunction<Row, String> )xmlRow -> {
         return TransformationService.extractXMLBlobToString(xmlRow);
     }, Encoders.STRING());   Dataset < Row > jsonDataSet = sparkSession.read().json(stringifiedDataSet);

1.只需保存以下内容:
jsonDataSet.写入().格式("csv").保存(文件路径);
发布日期:我的数据集有40行。第2步像预期的那样在循环中并行处理每个分区。但是在第3步中,当我将记录集保存到文件系统中时,stage3将重新执行stage2以再次处理所有分区。map中的函数(TransformationService. extractXMLBlobToString)再次调用所有40条记录。在生产中,我有数百万个数据集。stage0有一个循环来处理正确格式的数据,stage1是save,但是这个阶段会重新处理循环,所以我看到一个1000万条记录的数据集被不必要地处理了两次。为什么会发生这种情况?如果我在步骤2和3之间进行缓存,它增加了两倍的时间。一点帮助都没有。我的处理时间从5分钟变成了10分钟。有没有人能帮我理解为什么SAVE又调用了循环?

utugiqy6

utugiqy61#

您的map转换执行了两次,因为您在其结果上调用了2个操作,而没有在中间进行缓存。这两个操作是:

  • Dataset < Row > jsonDataSet = sparkSession.read().json(stringifiedDataSet);
  • 这是一个动作,因为你没有提供任何模式给你的DataFrameReader对象,所以它必须推断你的 Dataframe 的模式,需要立即动作
  • jsonDataSet.write().format("csv").save(filepath);

您可以尝试:

  • 缓存stringRecordSet。这是此类问题的默认解决方案
  • 在这种情况下,您可以通过为sparkSession.read().json(stringifiedDataSet);操作提供一个模式来将其转换为惰性转换,如下所示:sparkSession.read().schema(schema).json(stringifiedDataSet);

相关问题