如何更新/删除spark hive中的数据?

deyfvvtc  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(600)

我认为我的题目不能解释这个问题,所以问题是:
详细信息build.sbt:

name := "Hello"
scalaVersion := "2.11.8"
version      := "1.0"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "2.1.0"

代码:

val sparkSession = SparkSession.builder().enableHiveSupport().appName("HiveOnSpark").master("local").getOrCreate()
val hiveql : HiveContext  = new HiveContext(sparkSession.sparkContext);

hiveql.sql("drop table if exists test")
hiveql.sql("create table test (id int, name string) stored as orc tblproperties(\"transactional\"=\"true\")")
hiveql.sql("insert into test values(1,'Yash')")
hiveql.sql("insert into test values(2,'Yash')")
hiveql.sql("insert into test values(3,'Yash')")
hiveql.sql("select * from test").show()
hiveql.sql("delete from test where id= 1")

问题:

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
Operation not allowed: delete from(line 1, pos 0)

== SQL ==
delete from test where id= 1
^^^

at org.apache.spark.sql.catalyst.parser.ParserUtils$.operationNotAllowed(ParserUtils.scala:39)
at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitFailNativeCommand$1.apply(SparkSqlParser.scala:925)
at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitFailNativeCommand$1.apply(SparkSqlParser.scala:916)
at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:93)
at org.apache.spark.sql.execution.SparkSqlAstBuilder.visitFailNativeCommand(SparkSqlParser.scala:916)
at org.apache.spark.sql.execution.SparkSqlAstBuilder.visitFailNativeCommand(SparkSqlParser.scala:52)
at org.apache.spark.sql.catalyst.parser.SqlBaseParser$FailNativeCommandContext.accept(SqlBaseParser.java:952)
at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:42)
at org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleStatement$1.apply(AstBuilder.scala:66)
at org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleStatement$1.apply(AstBuilder.scala:66)
at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:93)
at org.apache.spark.sql.catalyst.parser.AstBuilder.visitSingleStatement(AstBuilder.scala:65)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parsePlan$1.apply(ParseDriver.scala:54)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parsePlan$1.apply(ParseDriver.scala:53)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:82)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:699)
at main.scala.InitMain$.delayedEndpoint$main$scala$InitMain$1(InitMain.scala:41)
at main.scala.InitMain$delayedInit$body.apply(InitMain.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at main.scala.InitMain$.main(InitMain.scala:9)
at main.scala.InitMain.main(InitMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

更新查询也有同样的问题。
所以现在我已经在sparksql中完成了这个,这个,更新查询,这个,这个,还有很多其他的。
我知道spark不支持update/delete,但我现在的情况是需要在这两个操作中使用它们。有人能建议/帮忙吗。

frebpwbc

frebpwbc1#

您可以使用llap库来启用更新和删除。

s2j5cfk0

s2j5cfk02#

一个不太好的解决办法是
加载现有数据(我建议使用dataframeapi)
使用更新/删除的记录创建新的Dataframe
将Dataframe重写到磁盘
为配置单元表选择适当的分区可以最大限度地减少要重新写入的数据量。

相关问题