我有一个spark2.4.6,Dataframe写为
df
.select((struct(df.columns.map(column): _*)).alias("value"))
.write
.format("kafka")
.options(........)
.save
但是在制作了一个jar并在版本3.0.2中执行了spark submit之后,它给出了:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Literal must have a corresponding value to string, but class String found.
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.sql.catalyst.expressions.Literal$.validateLiteralValue(literals.scala:215)
at org.apache.spark.sql.catalyst.expressions.Literal.<init>(literals.scala:292)
at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$validateQuery$2(KafkaWriter.scala:55)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:50)
at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:86)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:255)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
at part4integrations.IntegratingKafka$.nimbleWrite(IntegratingKafka.scala:145)
at part4integrations.IntegratingKafka$.main(IntegratingKafka.scala:154)
at part4integrations.IntegratingKafka.main(IntegratingKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
关于功能检查
我们在3.0.2中有这个验证函数https://github.com/apache/spark/blob/648457905c4ea7d00e3d88048c63f360045f0714/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#l292
但在以前的版本中没有https://github.com/apache/spark/blob/v2.4.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
在不升级spark版本的情况下有什么解决方法吗?
因为用3.0.2版本重建jar可以正常工作。
暂无答案!
目前还没有任何答案,快来回答吧!