Google DataProc Pyspark-BigQuery连接器速度超慢

wdebmtf2  于 2022-10-07  发布在  Go
关注(0)|答案(2)|浏览(164)

我尝试使用Spark Bigquery connector向BigQuery写入100k行。

我的行由两个大字符串组成(大约每个200-250个单词)、许多单个单词串和一些词典类型的数据(最大深度2,里面的数据很少),总共大约35个字段。

我对字符串做了一些处理,对于100k行,它几乎是即时的,但当涉及到将数据写入BQ时,问题就出现了。

我使用一个由5个工作节点组成的Spark集群,每个节点都有32 GB的RAM、8个vCPU和500 GB的SSD,总共160 GB的RAM对应40个vCPU。即使有了这些规格,将10万行写入BQ也需要50分钟。我调查了一下,因为我使用的是间接写入,所以数据首先在GCS上写入,然后由BQ读取。读取作业大约需要。20秒,这意味着对GCS的写入操作仅对100k行就需要50分钟。

这不可能是正常的行为,因为即使我在本地用我的家用电脑和Pandas运行相同的写入操作,所需的时间也会比这少得多。

我的Spark会话的首字母是这样的:

spark = SparkSession 
    .builder 
    .appName('extract-skills') 
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.26.0,com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0') 
    .config('spark.executor.memory', '25g') 
    .config('spark.executor.cores', '8') 
    .config('spark.driver.memory', '12g') 
    .config('spark.executor.instances', '5') 
    .config("spark.driver.maxResultSize", "0") 
    .config("spark.kryoserializer.buffer.max", "2000M")
    .getOrCreate()

我的写法如下:

result. 
    write.format('bigquery') 
        .mode("overwrite") 
        .option("writeMethod", "indirect") 
        .option("temporaryGcsBucket","my_bucket") 
        .option('table', 'my_project.my_dataset.my_table') 
        .save()

我是不是忘了什么东西?我找不到瓶颈,无法将writeMethod设置为direct,因为我需要写入分区表。

gopyfrb3

gopyfrb31#

使用直写应该会更快,变化非常小:

result. 
    write.format('bigquery') 
        .mode("overwrite") 
        .option("writeMethod", "direct") 
        .option('table', 'my_project.my_dataset.my_table') 
        .save()

此外,请检查DataFrame是否已正确分区。如果单个分区比其他分区大得多,那么您的资源使用效率很低。

llmtgqce

llmtgqce2#

经过一些测试后,我发现我的问题出在Spark NLP上,我用它来处理我的字符串(在我的例子中是词汇化)。

我在没有运行Spark NLP进程的情况下运行了write操作,对于24M行,所用时间不到一分钟,甚至在indirect写入模式下也是如此,从性能上看这似乎更合适。

现在的问题是:为什么Spark NLP发展如此缓慢?

相关问题