这是与我的另一个职位:Spark Java Map processing entire data set in all executors。我有一个简单的使用案例:
1.从4个分区中的DB读取数据
properties.setProperty("partitionColumn", "num_rows");
properties.setProperty("lowerBound", "0");
properties.setProperty("upperBound", getTotalRowCount(ID));
properties.setProperty("numPartitions", "4");
properties.setProperty("Driver", driver);
properties.setProperty("user", user);
properties.setProperty("password", password);
Dataset<Row> records = SparkSession.getActiveSession().get().read().jdbc(jdbcUrl, table, properties);
1.在循环中处理记录集以转换/过滤/格式化等,
Dataset<String> stringRecordSet = dbRecordsSet.map((MapFunction<Row, String> )xmlRow -> {
return TransformationService.extractXMLBlobToString(xmlRow);
}, Encoders.STRING()); Dataset < Row > jsonDataSet = sparkSession.read().json(stringifiedDataSet);
1.只需保存以下内容:
jsonDataSet.写入().格式("csv").保存(文件路径);
发布日期:我的数据集有40行。第2步像预期的那样在循环中并行处理每个分区。但是在第3步中,当我将记录集保存到文件系统中时,stage3将重新执行stage2以再次处理所有分区。map中的函数(TransformationService. extractXMLBlobToString)再次调用所有40条记录。在生产中,我有数百万个数据集。stage0有一个循环来处理正确格式的数据,stage1是save,但是这个阶段会重新处理循环,所以我看到一个1000万条记录的数据集被不必要地处理了两次。为什么会发生这种情况?如果我在步骤2和3之间进行缓存,它增加了两倍的时间。一点帮助都没有。我的处理时间从5分钟变成了10分钟。有没有人能帮我理解为什么SAVE又调用了循环?
1条答案
按热度按时间utugiqy61#
您的
map
转换执行了两次,因为您在其结果上调用了2个操作,而没有在中间进行缓存。这两个操作是:Dataset < Row > jsonDataSet = sparkSession.read().json(stringifiedDataSet);
DataFrameReader
对象,所以它必须推断你的 Dataframe 的模式,需要立即动作jsonDataSet.write().format("csv").save(filepath);
您可以尝试:
stringRecordSet
。这是此类问题的默认解决方案sparkSession.read().json(stringifiedDataSet);
操作提供一个模式来将其转换为惰性转换,如下所示:sparkSession.read().schema(schema).json(stringifiedDataSet);