spark scala中的saveastable:hdp3.x

cvxl0en2  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(477)

我在spark中有一个Dataframe,我正在把它作为一个表保存在我的配置单元中。但是下面是错误消息。

java.lang.RuntimeException:
    com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector
    does not allow create table as select.at scala.sys.package$.error(package.scala:27)

有人能帮我怎么把这个保存为Hive中的表吗。

val df3 = df1.join(df2, df1("inv_num") === df2("inv_num")  // Join both dataframes on id column
    ).withColumn("finalSalary", when(df1("salary") < df2("salary"), df2("salary") - df1("salary")) 
    .otherwise(
    when(df1("salary") > df2("salary"), df1("salary") + df2("salary"))  // 5000+3000=8000  check
    .otherwise(df2("salary"))))    // insert from second dataframe
    .drop(df1("salary"))
    .drop(df2("salary"))
    .withColumnRenamed("finalSalary","salary")

    }
    }

    //below code is not working when I'm executing below command its throwing error as 

    java.lang.RuntimeException:
    com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector
    does not allow create table as select.at scala.sys.package$.error(package.scala:27)

     df3.write.
     format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
    .option("database",  "dbname")
    .option("table", "tablename")
    .mode("Append")
    .saveAsTable("tablename")

注意:表已经在数据库中可用,我正在使用hdp3.x。

kyxcudwk

kyxcudwk1#

尝试 registerTempTable 然后-> spark.sql() ->那就写吧

df3.registerTempTable("tablename");
spark.sql("SELECT salary FROM tablename")
.write.format(HIVE_WAREHOUSE_CONNECTOR)
.option("database",  "dbname")
    .option("table", "tablename")
    .mode("Append")
.option("table", "newTable")
.save()
gcuhipw9

gcuhipw92#

看看下面的解决方案是否适合你,

val df3 = df1.join(df2, df1("inv_num") === df2("inv_num")  // Join both dataframes on id column
    ).withColumn("finalSalary", when(df1("salary") < df2("salary"), df2("salary") - df1("salary")) 
    .otherwise(
    when(df1("salary") > df2("salary"), df1("salary") + df2("salary"))  // 5000+3000=8000  check
    .otherwise(df2("salary"))))    // insert from second dataframe
    .drop(df1("salary"))
    .drop(df2("salary"))
    .withColumnRenamed("finalSalary","salary")

val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()

df3.createOrReplaceTempView("<temp-tbl-name>")
hive.setDatabase("<db-name>")
hive.createTable("<tbl-name>")
.ifNotExists()

sql("SELECT salary FROM <temp-tbl-name>")
.write
.format(HIVE_WAREHOUSE_CONNECTOR)
.mode("append")
.option("table", "<tbl-name>")
.save()
qrjkbowd

qrjkbowd3#

根据spark文件 saveAsTable 函数随使用的模式而更改,默认情况下为 ErrofIfExist . 在你的情况下,你正在使用Hive,尝试与 insertInto ,但请记住,Dataframe列的顺序必须与destiny相同。

相关问题