如何避免spark数据集中的重复计算?

kyxcudwk  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(311)

我是新来的。我正在做数据转换,使用spark进行分组。重新计算多次发生。我希望你的建议避免重新计算
示例输入:json数组

[{ "id": 1, "filepath": "/tmp/file1.xml" }, { "id": 2, "filepath": "/tmp/file2.xml" }]

示例文件内容:

<xml><key>1</key><value>2</value></xml>

流量:
加载文件 JavaPairRDD<String,String> textFile = sparkContext.wholeTextFiles("/tmp/file1.xml,/tmp/file2.xml"); transform files//transformerdata()调用api将文件从xml转换为json JavaPairRDD<String,String> transformedOutput = textFile.flatMapToPair(TransformerData()); 样本转换数据 { "key": 1, "value": 1 } 成功案例:将文件名作为第一个参数返回,将解析后的记录作为第二个参数示例记录<“file1.xml”,“{”key“:1”,value“:1}”>
失败案例:(如果文件转换中发生任何问题)返回'failed'字符串前缀filename作为第一个参数,null第二个参数sample record<“failed\u file1.xml”,null>
筛选失败的记录 JavaPairRDD<String,String> failedOutput = transformedOutput.filter((Function<Tuple2<String, String>, Boolean>) v1 -> v1._1.startsWith("failed")); 如果有失败的记录,则更新db并存在

if (failedOutput.count() > 0) {
    UpdateDB();
    throw new Exception("Parsing failed.");
}

如果没有失败的记录,则形成customrecord rdd JavaRDD<CustomRecord> customRecords = GeCustomRecordRDD(transformedOutput); ```
class CustomRecord {
private String fileName;
private String key;
private String record;
}

创建Dataframe以便对记录进行分组 `Dataset<Row> customRecordsDataSet = spark.createDataFrame(customRecords, CustomRecord.class);` 基于密钥对数据集进行分组

Dataset groupedDataSet = customRecordsDataSet.orderBy(customRecordsDataSet.col("fileName"))
.groupBy(customRecordsDataSet.col("key"))
.agg(
functions.collect_list(customRecordsDataSet.col("record"))
)

循环数据集以将每个关键记录发布到kafka

groupedDataSet.foreach(new ForeachFunction() {
public void call(Row r) throws Exception {
publishMessage(FormMessage(r.get(0), r.get(1)));
}
});

分组记录1:[1,2]这里1是文件中的键,1,2是值。根据键对记录进行分组。
这里计算发生在3次
在failedoutput.count()期间-一次
在groupeddataset.foreach期间-两次
我希望在循环groupeddataset时避免重复计算。记录没有变化

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题