使用dataproc写入bigquery在使用spark bigquery连接器时很慢

sr4lhrrt  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(516)

我们有一个spark流应用程序,它从pubsub读取数据并应用一些转换,然后将javadstream转换为dataset,然后将结果写入bigquery normalize表。
下面是示例代码。所有normalize表都在currenttimestamp列上分区。有什么参数可以用来提高写性能吗?

pubSubMessageDStream
                .foreachRDD(new VoidFunction2<JavaRDD<PubSubMessageSchema>, Time>() {
            @Override
            public void call(JavaRDD<PubSubMessageSchema> v1, Time v2) throws Exception {
            Dataset<PubSubMessageSchema> pubSubDataSet = spark.createDataset(v1.rdd(), Encoders.bean(PubSubMessageSchema.class));
                       ---
                       ---
                       ---
                  for (Row payloadName : payloadNameList) {
                           Dataset<Row> normalizedDS = null;
                           if(payloadNameAList.contains(payloadName) {
                                 normalizedDS = dataSet.filter(col(colA.equalTo(<Value>)));
                           } else if(payloadNameBList.contains(payloadName) {
                                 normalizedDS = dataSet.filter(col(colA.equalTo(<Value>)));
                           }
                    normalizedDS.selectExpr(columnsBigQuery).write().format("bigquery")
                   .option("temporaryGcsBucket", gcsBucketName)
                   .option("table", tableName)
                   .option("project", projectId)
                   .option("parentProject", parentProjectId)
                   .mode(SaveMode.Append)
                   .save();
                 }
            }
        }
qv7cva1a

qv7cva1a1#

写入bigquery需要写入gcs,然后触发bigquery加载作业。你可以试着改变主意 intermediateFormat 查看它是否影响性能-从我们的测试来看,更好的格式取决于模式和数据大小。
此外,在即将发布的连接器版本0.19.0中,有一个针对spark 2.4的datasource v2 api的写实现,它应该可以将性能提高10-15%。

相关问题