我尝试使用Spark Bigquery connector向BigQuery写入100k行。
我的行由两个大字符串组成(大约每个200-250个单词)、许多单个单词串和一些词典类型的数据(最大深度2,里面的数据很少),总共大约35个字段。
我对字符串做了一些处理,对于100k行,它几乎是即时的,但当涉及到将数据写入BQ时,问题就出现了。
我使用一个由5个工作节点组成的Spark集群,每个节点都有32 GB的RAM、8个vCPU和500 GB的SSD,总共160 GB的RAM对应40个vCPU。即使有了这些规格,将10万行写入BQ也需要50分钟。我调查了一下,因为我使用的是间接写入,所以数据首先在GCS上写入,然后由BQ读取。读取作业大约需要。20秒,这意味着对GCS的写入操作仅对100k行就需要50分钟。
这不可能是正常的行为,因为即使我在本地用我的家用电脑和Pandas运行相同的写入操作,所需的时间也会比这少得多。
我的Spark会话的首字母是这样的:
spark = SparkSession
.builder
.appName('extract-skills')
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.26.0,com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0')
.config('spark.executor.memory', '25g')
.config('spark.executor.cores', '8')
.config('spark.driver.memory', '12g')
.config('spark.executor.instances', '5')
.config("spark.driver.maxResultSize", "0")
.config("spark.kryoserializer.buffer.max", "2000M")
.getOrCreate()
我的写法如下:
result.
write.format('bigquery')
.mode("overwrite")
.option("writeMethod", "indirect")
.option("temporaryGcsBucket","my_bucket")
.option('table', 'my_project.my_dataset.my_table')
.save()
我是不是忘了什么东西?我找不到瓶颈,无法将writeMethod
设置为direct
,因为我需要写入分区表。
2条答案
按热度按时间gopyfrb31#
使用直写应该会更快,变化非常小:
此外,请检查DataFrame是否已正确分区。如果单个分区比其他分区大得多,那么您的资源使用效率很低。
llmtgqce2#
经过一些测试后,我发现我的问题出在Spark NLP上,我用它来处理我的字符串(在我的例子中是词汇化)。
我在没有运行Spark NLP进程的情况下运行了
write
操作,对于24M行,所用时间不到一分钟,甚至在indirect
写入模式下也是如此,从性能上看这似乎更合适。现在的问题是:为什么Spark NLP发展如此缓慢?