我在scala中为spark编写了以下程序:
val dfA = sqlContext.sql("select * from employees where id in ('Emp1', 'Emp2')" )
val dfB = sqlContext.sql("select * from employees where id not in ('Emp1', 'Emp2')" )
val dfN = dfA.withColumn("department", lit("Finance"))
val dfFinal = dfN.unionAll(dfB)
dfFinal.registerTempTable("intermediate_result")
dfA.unpersist
dfB.unpersist
dfN.unpersist
dfFinal.unpersist
val dfTmp = sqlContext.sql("select * from intermediate_result")
dfTmp.write.mode("overwrite").format("parquet").saveAsTable("employees")
dfTmp.unpersist
当我试图保存它时,出现以下错误:
org.apache.spark.sql.analysisexception:无法覆盖表 employees
这一点也正在被解读。;在org.apache.spark.sql.execution.datasources.prewritecheck.failanalysis(rules。scala:106)在org.apache.spark.sql.execution.datasources.prewritecheck$$anonfun$apply$3.apply(规则)。scala:182)在org.apache.spark.sql.execution.datasources.prewritecheck$$anonfun$apply$3.apply(规则)。scala:109)在org.apache.spark.sql.catalyst.trees.treenode.foreach(treenode。scala:111)在org.apache.spark.sql.execution.datasources.prewritecheck.apply(rules。scala:109)在org.apache.spark.sql.execution.datasources.prewritecheck.apply(rules。scala:105)在org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$2.apply(检查分析)。scala:218)在org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$2.apply(checkanalysis)。scala:218)在scala.collection.immutable.list.foreach(list。scala:318)
我的问题是:
我改变两个雇员的部门的做法正确吗
当我释放Dataframe时,为什么会出现这个错误
4条答案
按热度按时间dpiehjr41#
假设您正在读取和覆盖的是一个配置单元表。
请将时间戳引入配置单元表位置,如下所示
由于无法覆盖,我们将把输出文件写入一个新位置。
使用dataframeapi将数据写入新位置
写入数据后,将配置单元表位置更改为新位置
avkwfej42#
我会这样做,
为了模拟您的流程,我创建了两个Dataframe
union
写回同一张表t
(故意移除department_id = 4
在本例中)yhqotfr83#
下面是一个你可以尝试的方法。
您可以使用saveastable api将其写入另一个表中,而不是使用registertemptable api
然后,把它写到employees表中
最后,删除中间结果表。
j9per5c44#
我改变两个雇员的部门的做法正确吗
事实并非如此。只是重复一下在堆栈溢出上说过多次的话——ApacheSpark不是一个数据库。它不是为细粒度更新而设计的。如果您的项目需要这样的操作,请使用hadoop上的许多数据库之一。
当我释放Dataframe时,为什么会出现这个错误
因为你没有。你所做的就是给执行计划加上一个名字。检查点将是最接近于“释放”的东西,但是您真的不想在破坏性操作中丢失执行器的情况下结束。
您可以写入临时目录,删除输入并移动临时文件,但实际上-只需使用适合该作业的工具即可。