我在PySpark中尝试SQL合并delta表时得到这个错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o64.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:744)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:875)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:504)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:165)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:165)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:158)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:158)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:178)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:308)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:606)
at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:308)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:323)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:277)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:256)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
下面是SQL合并查询:
merge into re_obu_logistic_status_cockpit_report
using (
select *
from re_data_to_merge
) info_to_merge
on (
re_obu_logistic_status_cockpit_report.date = info_to_merge.date
and re_obu_logistic_status_cockpit_report.status_id = info_to_merge.status_id
and re_obu_logistic_status_cockpit_report.materialnumber = info_to_merge.materialnumber
and re_obu_logistic_status_cockpit_report.sales_partner_id = info_to_merge.sales_partner_id
)
when matched
then update set re_obu_logistic_status_cockpit_report.status = info_to_merge.status
,re_obu_logistic_status_cockpit_report.sales_partner_name = info_to_merge.sales_partner_name
,re_obu_logistic_status_cockpit_report.obu_count = info_to_merge.obu_count
,re_obu_logistic_status_cockpit_report.obu_count_pct = info_to_merge.obu_count_pct
,re_obu_logistic_status_cockpit_report.dan_last_changed_tsz = info_to_merge.dan_last_changed_tsz
,re_obu_logistic_status_cockpit_report.dan_last_changed_epoch = info_to_merge.dan_last_changed_epoch
when not matched
then insert (
re_obu_logistic_status_cockpit_report.date
,re_obu_logistic_status_cockpit_report.status_id
,re_obu_logistic_status_cockpit_report.status
,re_obu_logistic_status_cockpit_report.materialnumber
,re_obu_logistic_status_cockpit_report.sales_partner_id
,re_obu_logistic_status_cockpit_report.sales_partner_name
,re_obu_logistic_status_cockpit_report.obu_count
,re_obu_logistic_status_cockpit_report.obu_count_pct
,re_obu_logistic_status_cockpit_report.dan_last_changed_tsz
,re_obu_logistic_status_cockpit_report.dan_last_changed_epoch
)
values (
info_to_merge.date
,info_to_merge.status_id
,info_to_merge.status
,info_to_merge.materialnumber
,info_to_merge.sales_partner_id
,info_to_merge.sales_partner_name
,info_to_merge.obu_count
,info_to_merge.obu_count_pct
,info_to_merge.dan_last_changed_tsz
,info_to_merge.dan_last_changed_epoch
)
我重新启动了一个新的EMR集群,从S3中删除了delta表,然后重新启动了python应用程序。我还重新启动了EC2示例,要求EMR集群运行应用程序。
1条答案
按热度按时间6uxekuva1#
这是由于spark-submit命令中错误的“字符:错误的spark-submit是:
正确的spark-submit是: