我必须创建管道来从BigQuery传输数据并将其保存为json文件。但我得到了这个错误。SQL查询的结果是3000万条记录。如何改进这段代码?
错误:
[error](run-main-0)java.lang.OutOfMemoryError:Java堆空间[错误]
object tmp {
private val logger = LoggerFactory.getLogger(this.getClass)
var date = "2023-05-22"
def main(cmdlineArgs: Array[String]){
val (sc, args) = ContextAndArgs(cmdlineArgs)
val file_path = "src/main/scala/thunder/tmp.sql"
val sql_content = Source.fromFile(file_path).mkString
val queryConfig = QueryJobConfiguration.newBuilder(sql_content).build()
val client = BigQueryOptions.getDefaultInstance().getService()
val queryResult = client.query(queryConfig)
var result = queryResult.iterateAll().iterator().asScala.map(_.asScala.map(_.getValue).toArray).toSeq
val json_result = result.map { row =>
val pin_username = row(0).toString
val feature_name = row(1).toString
implicit val formats = DefaultFormats
write(Map(("pin_username"->pin_username),("feature_name" -> feature_name)))
}
sc.parallelize(json_result)
.saveAsTextFile("output", ".json")
sc.close().waitUntilFinish()
}
}
1条答案
按热度按时间disho6za1#
错误
java.lang.OutOfMemoryError: Java heap space
主要是由于您的worker内存负载非常高导致的,因为您正在处理3000万条SQL查询记录。解决这个问题的一个方法是增加工作线程的内存。最好将worker的大小设置为n1-highmem-4 or above
。此参数可用于选择机器类型:--workerMachineType
。另一种解决方法是,您还可以检查堆转储,以确定JVM内存不足时的内存不足(OOM)错误。重新运行带有标志
--dumpHeapOnOOM
的作业,将堆转储保存在本地,并使用--saveHeapDumpsToGcsPath=gs://<path_to_a_gcs_bucket>
将其保存在您具有写权限的特定存储桶上。但是,建议只在调试时使用热转储,因为它在当前服务的基础上有另一个成本。