如何使用`.saveastable()`将查询结果保存到dataproc集群中的配置单元表中?

sulc1iza  于 2021-05-22  发布在  Spark
关注(0)|答案(1)|浏览(527)

我有一个查询结果,我正试图写入gcp上的配置单元表,并指向gcs bucket path,但是当我执行 saveAsTable() 方法,其失败并出现以下错误。

org.apache.spark.SparkException: Job 
 aborted.org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)

这是我的密码:

sparkSession.sql(eiSqlQuery)
      .repartition("col_1")
      .write
      .mode(if(AppConfig.isHistoryLoad) Overwrite else Append)
      .partitionBy("col_2")
      .saveAsTable("hive_schema.hive_table_name")

我也试着通过 parquet 的路径,而不是 saveAsTable 也可以先创建表并尝试 insertInto 同样,所有的测试都会失败,并出现以下错误。
我还有什么选择?

5kgi1eie

5kgi1eie1#

首先,你需要一个 SparkSession 启用配置单元时,例如:

// Scala
val spark = SparkSession
    .builder()
    .appName("Spark Hive Example")
    .config("spark.sql.warehouse.dir", warehouseLocation)
    .enableHiveSupport()
    .getOrCreate()

然后您应该能够创建配置单元表,例如(注意 USING hive sql中的部分):

// Scala
sql("CREATE TABLE IF NOT EXISTS my_table (key INT, value STRING) USING hive")

然后可以将Dataframe保存到表中:

// Scala
df.write.partitionBy("key").format("hive").saveAsTable("my_table")

请参见spark配置单元表和示例代码。

相关问题