我有一个简单的Spark应用程序,可以进行结构化流。
最初,我的建筑。sbt看起来像这样:
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.454" % "provided"
)
我成功地运行了sbt assembly
,但后来当我运行spark-submit ...
时,我得到了错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
at org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError(QueryCompilationErrors.scala:1070)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:673)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:40)
at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
然后我删除了provided
为spark-sql-kafka-0-10
:
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.454" % "provided"
)
但是这次当我运行sbt assembly
时,我得到了这个错误:
[error] 1 error(s) were encountered during the merge:
| => ingest-from-s3-to-kafka / assembly 1s
[error] java.lang.RuntimeException:
[error] Deduplicate found different file contents in the following:
[error] Jar name = spark-sql-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = spark-tags_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = spark-token-provider-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = unused-1.0.0.jar, jar org = org.spark-project.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] at sbtassembly.Assembly$.merge(Assembly.scala:624)
[error] at sbtassembly.Assembly$.$anonfun$assemble$36(Assembly.scala:330)
[error] at sbtassembly.Assembly$.timed$1(Assembly.scala:228)
[error] at sbtassembly.Assembly$.$anonfun$assemble$35(Assembly.scala:329)
[error] at sbtassembly.Assembly$.$anonfun$cachedAssembly$2(Assembly.scala:523)
[error] at sbt.util.Tracked$.$anonfun$lastOutput$1(Tracked.scala:73)
[error] at sbtassembly.Assembly$.cachedAssembly(Assembly.scala:527)
[error] at sbtassembly.Assembly$.assemble(Assembly.scala:414)
[error] at sbtassembly.Assembly$.$anonfun$assemblyTask$1(Assembly.scala:196)
[error] at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error] at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error] at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error] at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error] at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error] at sbt.Execute.work(Execute.scala:291)
[error] at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error] at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error] at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
[error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[error] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[error] at java.base/java.lang.Thread.run(Thread.java:833)
[error] (assembly)
[error] Deduplicate found different file contents in the following:
[error] Jar name = spark-sql-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = spark-tags_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = spark-token-provider-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = unused-1.0.0.jar, jar org = org.spark-project.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Total time: 3 s, completed Apr 24, 2023, 11:13:44 PM
make: *** [sbt-clean-compile-assembly] Error 1
通常当sbt assembly
有多个依赖项时,我会看到“Deduplicate”错误,但这次,只有一个spark-sql-kafka-0-10
(其余的依赖项都是“提供”的)。是因为子依赖项中的类发生了冲突吗?
然后我在这个文档中找到了--packages
。
这次spark-submit --packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 ...
成功了。
我的问题是什么时候应该添加依赖到libraryDependencies
,而没有“提供”,什么时候应该使用--packages
?谢谢!
1条答案
按热度按时间ruarlubt1#
请尝试以下操作以解决该特定错误
什么时候应该向libraryDependencies添加不带“provided”的依赖项,什么时候应该使用--packages?
如果您的Spark集群可以访问互联网或内部Maven镜像,请使用
--packages
。这将在运行时下载任何必要的库,允许您保持应用程序JAR更小,同时您需要记住始终向spark-submit添加额外的cli参数。否则,使用汇编插件将未提供的(也称为编译时)依赖项作为库添加,将把所有依赖项捆绑在一个JAR中,但您需要为重复的元素定义合并策略
spark-sql-Kafka-0-10是Spark源代码中“contrib”路径的一部分;因此不认为“提供”为公共运行时依赖性。但是,这取决于Spark集群,因为您可以将jar复制到每个executor类路径中,然后标记为提供将工作正常。