如何更新spark中的少量记录

xsuvu9jc  于 2021-06-26  发布在  Hive
关注(0)|答案(4)|浏览(364)

我在scala中为spark编写了以下程序:

val dfA = sqlContext.sql("select * from employees where id in ('Emp1', 'Emp2')" )
val dfB = sqlContext.sql("select * from employees where id not in ('Emp1', 'Emp2')" )
val dfN = dfA.withColumn("department", lit("Finance"))
val dfFinal = dfN.unionAll(dfB)
dfFinal.registerTempTable("intermediate_result")

dfA.unpersist
dfB.unpersist
dfN.unpersist
dfFinal.unpersist

val dfTmp = sqlContext.sql("select * from intermediate_result")
dfTmp.write.mode("overwrite").format("parquet").saveAsTable("employees")
dfTmp.unpersist

当我试图保存它时,出现以下错误:
org.apache.spark.sql.analysisexception:无法覆盖表 employees 这一点也正在被解读。;在org.apache.spark.sql.execution.datasources.prewritecheck.failanalysis(rules。scala:106)在org.apache.spark.sql.execution.datasources.prewritecheck$$anonfun$apply$3.apply(规则)。scala:182)在org.apache.spark.sql.execution.datasources.prewritecheck$$anonfun$apply$3.apply(规则)。scala:109)在org.apache.spark.sql.catalyst.trees.treenode.foreach(treenode。scala:111)在org.apache.spark.sql.execution.datasources.prewritecheck.apply(rules。scala:109)在org.apache.spark.sql.execution.datasources.prewritecheck.apply(rules。scala:105)在org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$2.apply(检查分析)。scala:218)在org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$2.apply(checkanalysis)。scala:218)在scala.collection.immutable.list.foreach(list。scala:318)
我的问题是:
我改变两个雇员的部门的做法正确吗
当我释放Dataframe时,为什么会出现这个错误

dpiehjr4

dpiehjr41#

假设您正在读取和覆盖的是一个配置单元表。
请将时间戳引入配置单元表位置,如下所示

create table table_name (
  id                int,
  dtDontQuery       string,
  name              string
)
 Location hdfs://user/table_name/timestamp

由于无法覆盖,我们将把输出文件写入一个新位置。
使用dataframeapi将数据写入新位置

df.write.orc(hdfs://user/xx/tablename/newtimestamp/)

写入数据后,将配置单元表位置更改为新位置

Alter table tablename set Location hdfs://user/xx/tablename/newtimestamp/
avkwfej4

avkwfej42#

我会这样做,

>>> df = sqlContext.sql("select * from t")
>>> df.show()
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            2|        Fitness|
|            3|       Footwear|
|            4|        Apparel|
|            5|           Golf|
|            6|       Outdoors|
|            7|       Fan Shop|
+-------------+---------------+

为了模拟您的流程,我创建了两个Dataframe union 写回同一张表 t (故意移除 department_id = 4 在本例中)

>>> df1 = sqlContext.sql("select * from t where department_id < 4")
>>> df2 = sqlContext.sql("select * from t where department_id > 4")
>>> df3 = df1.unionAll(df2)
>>> df3.registerTempTable("df3")
>>> sqlContext.sql("insert overwrite table t select * from df3")
DataFrame[]  
>>> sqlContext.sql("select * from t").show()
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            2|        Fitness|
|            3|       Footwear|
|            5|           Golf|
|            6|       Outdoors|
|            7|       Fan Shop|
+-------------+---------------+
yhqotfr8

yhqotfr83#

下面是一个你可以尝试的方法。
您可以使用saveastable api将其写入另一个表中,而不是使用registertemptable api

dfFinal.write.mode("overwrite").saveAsTable("intermediate_result")

然后,把它写到employees表中

val dy = sqlContext.table("intermediate_result")
  dy.write.mode("overwrite").insertInto("employees")

最后,删除中间结果表。

j9per5c4

j9per5c44#

我改变两个雇员的部门的做法正确吗
事实并非如此。只是重复一下在堆栈溢出上说过多次的话——ApacheSpark不是一个数据库。它不是为细粒度更新而设计的。如果您的项目需要这样的操作,请使用hadoop上的许多数据库之一。
当我释放Dataframe时,为什么会出现这个错误
因为你没有。你所做的就是给执行计划加上一个名字。检查点将是最接近于“释放”的东西,但是您真的不想在破坏性操作中丢失执行器的情况下结束。
您可以写入临时目录,删除输入并移动临时文件,但实际上-只需使用适合该作业的工具即可。

相关问题