我们有一个spark流应用程序,它从pubsub读取数据并应用一些转换,然后将javadstream转换为dataset,然后将结果写入bigquery normalize表。
下面是示例代码。所有normalize表都在currenttimestamp列上分区。有什么参数可以用来提高写性能吗?
pubSubMessageDStream
.foreachRDD(new VoidFunction2<JavaRDD<PubSubMessageSchema>, Time>() {
@Override
public void call(JavaRDD<PubSubMessageSchema> v1, Time v2) throws Exception {
Dataset<PubSubMessageSchema> pubSubDataSet = spark.createDataset(v1.rdd(), Encoders.bean(PubSubMessageSchema.class));
---
---
---
for (Row payloadName : payloadNameList) {
Dataset<Row> normalizedDS = null;
if(payloadNameAList.contains(payloadName) {
normalizedDS = dataSet.filter(col(colA.equalTo(<Value>)));
} else if(payloadNameBList.contains(payloadName) {
normalizedDS = dataSet.filter(col(colA.equalTo(<Value>)));
}
normalizedDS.selectExpr(columnsBigQuery).write().format("bigquery")
.option("temporaryGcsBucket", gcsBucketName)
.option("table", tableName)
.option("project", projectId)
.option("parentProject", parentProjectId)
.mode(SaveMode.Append)
.save();
}
}
}
1条答案
按热度按时间qv7cva1a1#
写入bigquery需要写入gcs,然后触发bigquery加载作业。你可以试着改变主意
intermediateFormat
查看它是否影响性能-从我们的测试来看,更好的格式取决于模式和数据大小。此外,在即将发布的连接器版本0.19.0中,有一个针对spark 2.4的datasource v2 api的写实现,它应该可以将性能提高10-15%。