insert-into-sql失败,错误为java.lang.nosuchmethoderror:org.apache.spark.sql.catalyst.expressions.alias

idfiyjo8  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(672)

dataproc集群是用映像创建的 2.0.x 带delta io包 io.delta:delta-core_2.12:0.7.0 spark版本是3.1.1
Spark壳启动:

pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

执行命令以创建增量表并插入增量sql:

spark.sql("""CREATE TABLE IF NOT EXISTS customer(
             c_id Long, c_name String, c_city String
             )
           USING DELTA LOCATION 'gs://edw-bi-dev-dataexports/delta-table-poc/dt_poc/customer'
         """)

spark.sql("INSERT INTO customer VALUES(1, 'Shawn', 'Tx')")

错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 719, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a,**kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o58.sql.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V
        at org.apache.spark.sql.delta.DeltaAnalysis.$anonfun$normalizeQueryColumns$1(DeltaAnalysis.scala:162)
        at scala.collection.immutable.List.map(List.scala:293)
        at org.apache.spark.sql.delta.DeltaAnalysis.org$apache$spark$sql$delta$DeltaAnalysis$$normalizeQueryColumns(DeltaAnalysis.scala:151)
        at org.apache.spark.sql.delta.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:49)
        at org.apache.spark.sql.delta.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:45)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:45)
        at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:40)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:195)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:189)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:154)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:172)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
        at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)

我无法找出问题的根本原因。

uklbhaso

uklbhaso1#

它是由这个改变引起的,它破坏了 Alias 案例类。对此的修复要么将spark版本降级到3.0.x,要么等到新的delta版本发布并支持3.1.x。
p、 在美国,三角洲还有其他地方被Spark3.1.1中的变化打破了

sf6xfgos

sf6xfgos2#

这个问题似乎与sparksql目录的设置有关。我尝试了相同的查询,当我删除这个conf时,它就工作了:

--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

在dataproc映像上测试 2.0.1-debian10 使用:

pyspark --packages io.delta:delta-core_2.12:0.7.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

相关问题