scala 在`libraryDependencies`中添加依赖项时不使用“provided”,以及使用`--packages`时?

kknvjkwl  于 2023-04-30  发布在  Scala
关注(0)|答案(1)|浏览(110)

我有一个简单的Spark应用程序,可以进行结构化流。
最初,我的建筑。sbt看起来像这样:

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.454" % "provided"
)

我成功地运行了sbt assembly,但后来当我运行spark-submit ...时,我得到了错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException:  Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".        
    at org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError(QueryCompilationErrors.scala:1070)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:673)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
    at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:40)
    at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

然后我删除了providedspark-sql-kafka-0-10

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.454" % "provided"
)

但是这次当我运行sbt assembly时,我得到了这个错误:

[error] 1 error(s) were encountered during the merge:

  | => ingest-from-s3-to-kafka / assembly 1s
[error] java.lang.RuntimeException: 
[error] Deduplicate found different file contents in the following:
[error]   Jar name = spark-sql-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = spark-tags_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = spark-token-provider-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = unused-1.0.0.jar, jar org = org.spark-project.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]     at sbtassembly.Assembly$.merge(Assembly.scala:624)
[error]     at sbtassembly.Assembly$.$anonfun$assemble$36(Assembly.scala:330)
[error]     at sbtassembly.Assembly$.timed$1(Assembly.scala:228)
[error]     at sbtassembly.Assembly$.$anonfun$assemble$35(Assembly.scala:329)
[error]     at sbtassembly.Assembly$.$anonfun$cachedAssembly$2(Assembly.scala:523)
[error]     at sbt.util.Tracked$.$anonfun$lastOutput$1(Tracked.scala:73)
[error]     at sbtassembly.Assembly$.cachedAssembly(Assembly.scala:527)
[error]     at sbtassembly.Assembly$.assemble(Assembly.scala:414)
[error]     at sbtassembly.Assembly$.$anonfun$assemblyTask$1(Assembly.scala:196)
[error]     at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error]     at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error]     at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error]     at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error]     at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error]     at sbt.Execute.work(Execute.scala:291)
[error]     at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error]     at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error]     at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error]     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error]     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
[error]     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error]     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[error]     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[error]     at java.base/java.lang.Thread.run(Thread.java:833)
[error] (assembly) 
[error] Deduplicate found different file contents in the following:
[error]   Jar name = spark-sql-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = spark-tags_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = spark-token-provider-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = unused-1.0.0.jar, jar org = org.spark-project.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Total time: 3 s, completed Apr 24, 2023, 11:13:44 PM
make: *** [sbt-clean-compile-assembly] Error 1

通常当sbt assembly有多个依赖项时,我会看到“Deduplicate”错误,但这次,只有一个spark-sql-kafka-0-10(其余的依赖项都是“提供”的)。是因为子依赖项中的类发生了冲突吗?
然后我在这个文档中找到了--packages
这次spark-submit --packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 ...成功了。
我的问题是什么时候应该添加依赖到libraryDependencies,而没有“提供”,什么时候应该使用--packages?谢谢!

ruarlubt

ruarlubt1#

请尝试以下操作以解决该特定错误

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

什么时候应该向libraryDependencies添加不带“provided”的依赖项,什么时候应该使用--packages?
如果您的Spark集群可以访问互联网或内部Maven镜像,请使用--packages。这将在运行时下载任何必要的库,允许您保持应用程序JAR更小,同时您需要记住始终向spark-submit添加额外的cli参数。
否则,使用汇编插件将未提供的(也称为编译时)依赖项作为库添加,将把所有依赖项捆绑在一个JAR中,但您需要为重复的元素定义合并策略
spark-sql-Kafka-0-10是Spark源代码中“contrib”路径的一部分;因此不认为“提供”为公共运行时依赖性。但是,这取决于Spark集群,因为您可以将jar复制到每个executor类路径中,然后标记为提供将工作正常。

相关问题