暂时不支持PySpark和delta表合并

unhi4e5o  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(149)

我在PySpark中尝试SQL合并delta表时得到这个错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o64.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:744)
        at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:875)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71)
        at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:504)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:165)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:165)
        at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:158)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:158)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:178)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
        at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:308)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:606)
        at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:308)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:323)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:277)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:256)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)

下面是SQL合并查询:

merge into re_obu_logistic_status_cockpit_report
        using (
                select *
                from   re_data_to_merge
              ) info_to_merge
        on    (
                re_obu_logistic_status_cockpit_report.date = info_to_merge.date
                and re_obu_logistic_status_cockpit_report.status_id = info_to_merge.status_id
                and re_obu_logistic_status_cockpit_report.materialnumber = info_to_merge.materialnumber
                and re_obu_logistic_status_cockpit_report.sales_partner_id = info_to_merge.sales_partner_id

              )
        when  matched
          then update set re_obu_logistic_status_cockpit_report.status = info_to_merge.status
                         ,re_obu_logistic_status_cockpit_report.sales_partner_name = info_to_merge.sales_partner_name
                         ,re_obu_logistic_status_cockpit_report.obu_count = info_to_merge.obu_count
                         ,re_obu_logistic_status_cockpit_report.obu_count_pct = info_to_merge.obu_count_pct
                         ,re_obu_logistic_status_cockpit_report.dan_last_changed_tsz = info_to_merge.dan_last_changed_tsz
                         ,re_obu_logistic_status_cockpit_report.dan_last_changed_epoch = info_to_merge.dan_last_changed_epoch

        when  not matched
          then insert (
                        re_obu_logistic_status_cockpit_report.date
                       ,re_obu_logistic_status_cockpit_report.status_id
                       ,re_obu_logistic_status_cockpit_report.status
                       ,re_obu_logistic_status_cockpit_report.materialnumber
                       ,re_obu_logistic_status_cockpit_report.sales_partner_id
                       ,re_obu_logistic_status_cockpit_report.sales_partner_name
                       ,re_obu_logistic_status_cockpit_report.obu_count
                       ,re_obu_logistic_status_cockpit_report.obu_count_pct
                       ,re_obu_logistic_status_cockpit_report.dan_last_changed_tsz
                       ,re_obu_logistic_status_cockpit_report.dan_last_changed_epoch

                      )
          values      (
                        info_to_merge.date
                       ,info_to_merge.status_id
                       ,info_to_merge.status
                       ,info_to_merge.materialnumber
                       ,info_to_merge.sales_partner_id
                       ,info_to_merge.sales_partner_name
                       ,info_to_merge.obu_count
                       ,info_to_merge.obu_count_pct
                       ,info_to_merge.dan_last_changed_tsz
                       ,info_to_merge.dan_last_changed_epoch

                      )

我重新启动了一个新的EMR集群,从S3中删除了delta表,然后重新启动了python应用程序。我还重新启动了EC2示例,要求EMR集群运行应用程序。

6uxekuva

6uxekuva1#

这是由于spark-submit命令中错误的“字符:错误的spark-submit是:

spark-submit \
  --master yarn \
  --driver-memory 8g \
  --executor-memory 8g \
  --executor-cores 8 \
  --packages io.delta:delta-core_2.12:2.0.0 \
  --conf “spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension” \
  --conf “spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog” \
  --conf “spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED” \
  --conf “spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED” \
  --conf “spark.sql.legacy.timeParserPolicy=LEGACY” \
  --conf spark.rpc.askTimeout=600s \
  main.py

正确的spark-submit是:

spark-submit \
  --master yarn \
  --driver-memory 8g \
  --executor-memory 8g \
  --executor-cores 8 \
  --packages io.delta:delta-core_2.12:2.0.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
  --conf "spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED" \
  --conf "spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED" \
  --conf "spark.sql.legacy.timeParserPolicy=LEGACY" \
  --conf spark.rpc.askTimeout=600s \
  main.py

相关问题