java解决apachespark中的依赖问题

qyswt5oh  于 2021-06-06  发布在  Kafka
关注(0)|答案(7)|浏览(360)

构建和部署spark应用程序时的常见问题是: java.lang.ClassNotFoundException . object x is not a member of package y 编译错误。 java.lang.NoSuchMethodError 如何解决这些问题?

ut6juiuv

ut6juiuv1#

将spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\jars中的所有jar文件添加到项目中。spark-2.4.0-bin-hadoop2.7可从https://spark.apache.org/downloads.html

cs7cruho

cs7cruho2#

我认为这个问题必须解决一个汇编插件。你需要造一个肥jar。例如在sbt中:
添加文件 $PROJECT_ROOT/project/assembly.sbt 带代码 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0") 构建.sbt added some libraries librarydependencies++=seq(“com.some.company”%%“some lib”%”“1.0.0”)`
在sbt控制台中输入“assembly”,并部署assembly jar
如果您需要更多信息,请访问https://github.com/sbt/sbt-assembly

njthzxwz

njthzxwz3#

apachespark的类路径是动态构建的(以适应每个应用程序用户的代码),这使得它容易受到此类问题的攻击@user7337271的回答是正确的,但是还有一些问题,这取决于您使用的集群管理器(“master”)。
首先,spark应用程序由以下组件组成(每个组件都是一个单独的jvm,因此在其类路径中可能包含不同的类):
驱动程序:这是您的应用程序创建的 SparkSession (或 SparkContext )并连接到集群管理器以执行实际工作
集群管理器:充当集群的“入口点”,负责为每个应用程序分配执行器。spark支持几种不同的类型:standalone、yarn和mesos,我们将在下面描述它们。
执行者:这些是集群节点上的进程,执行实际的工作(运行spark任务)
ApacheSpark的集群模式概述中的这个图描述了它们之间的关系:

现在-哪些类应该驻留在这些组件中?
这可以通过以下图表来回答:

让我们慢慢分析一下:
spark代码是spark的库。它们应该存在于所有三个组件中,因为它们包括让spark执行它们之间通信的胶水。顺便说一句-spark的作者做出了一个设计决定,在所有组件中包含所有组件的代码(例如,在驱动程序中也包含只应在executor中运行的代码),以简化这一点-因此spark的“胖jar”(版本高达1.6)或“archive”(在2.0中,详细信息(见下文)包含所有组件的必要代码,并应在所有组件中提供。
仅驱动程序代码这是用户代码,不包括应在执行器上使用的任何内容,即在rdd/dataframe/dataset上的任何转换中不使用的代码。这不一定要与分布式用户代码分开,但可以分开。
分布式代码这是使用驱动程序代码编译的用户代码,但也必须在执行器上执行-实际转换使用的所有内容都必须包含在这个jar中。
既然我们已经搞清楚了,那么如何让类在每个组件中正确加载,它们应该遵循什么规则呢?
spark代码:正如前面的回答所述,您必须在所有组件中使用相同的scala和spark版本。
1.1在独立模式下,有一个“预先存在的”spark安装,应用程序(驱动程序)可以连接到它。这意味着所有驱动程序都必须使用在主程序和执行程序上运行的相同spark版本。
1.2在Yarn/mesos中,每个应用程序可以使用不同的Spark版本,但同一应用程序的所有组件必须使用相同的Spark版本。这意味着,如果使用版本x编译和打包驱动程序应用程序,则在启动sparksession时应提供相同的版本(例如,通过 spark.yarn.archive 或者 spark.yarn.jars 使用Yarn时的参数)。您提供的jar/archive应该包括所有spark依赖项(包括可传递的依赖项),当应用程序启动时,它将由集群管理器发送给每个执行器。
驱动程序代码:这完全取决于-驱动程序代码可以作为一堆jar或“胖jar”来提供,只要它包含所有spark依赖项+所有用户代码
分布式代码:除了存在于驱动程序中之外,还必须将此代码发送给执行者(同样,还有它的所有可传递依赖项)。这是使用 spark.jars 参数。
总而言之,以下是构建和部署spark应用程序的建议方法(在本例中,使用yarn):
用您的分布式代码创建一个库,将它打包为一个“常规”jar(用一个.pom文件描述它的依赖项)和一个“胖jar”(包括它的所有可传递依赖项)。
创建一个驱动程序应用程序,对分布式代码库和apachespark(具有特定版本)具有编译依赖关系
将驱动程序应用程序打包到要部署到驱动程序的胖jar中
将正确版本的分布式代码作为 spark.jars 启动时的参数 SparkSession 传递包含所有jar的归档文件(例如gzip)的位置 lib/ 下载的spark二进制文件的文件夹作为 spark.yarn.archive

nhaq1z21

nhaq1z214#

应用程序的依赖类应该在启动命令的applicationjar选项中指定。
更多细节可以在spark文档中找到
摘自文件:
applicationjar:绑定jar的路径,包括应用程序和所有依赖项。url必须在集群内全局可见,例如,所有节点上都存在的hdfs://路径或file://路径

twh00eeo

twh00eeo5#

我有以下build.sbt

lazy val root = (project in file(".")).
  settings(
    name := "spark-samples",
    version := "1.0",
    scalaVersion := "2.11.12",
    mainClass in Compile := Some("StreamingExample")        
  )

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.4.0",
  "org.apache.spark" %% "spark-streaming" % "2.4.0",
  "org.apache.spark" %% "spark-sql" % "2.4.0",
  "com.couchbase.client" %% "spark-connector" % "2.2.0" 
)

// META-INF discarding
assemblyMergeStrategy in assembly := {
       case PathList("META-INF", xs @ _*) => MergeStrategy.discard
       case x => MergeStrategy.first
   }

我已经使用sbt汇编插件为我的应用程序创建了一个胖jar,但是使用spark submit运行时失败,错误如下:

java.lang.NoClassDefFoundError: rx/Completable$OnSubscribe
    at com.couchbase.spark.connection.CouchbaseConnection.streamClient(CouchbaseConnection.scala:154)

我可以看到这个类存在于我的胖jar里:

jar tf target/scala-2.11/spark-samples-assembly-1.0.jar | grep 'Completable$OnSubscribe'
rx/Completable$OnSubscribe.class

不知道我在这里遗漏了什么,有什么线索吗?

c8ib6hqw

c8ib6hqw6#

除了user7337271已经给出的非常广泛的答案之外,如果问题是由于缺少外部依赖项造成的,那么您可以使用依赖项(例如maven assembly plugin)构建一个jar
在这种情况下,请确保在构建系统中将所有核心spark依赖项标记为“已提供”,并且如前所述,确保它们与运行时spark版本相关。

9njqaruj

9njqaruj7#

在构建和部署spark应用程序时,所有依赖项都需要兼容的版本。
scala版本。所有软件包都必须使用相同的主要(2.10、2.11、2.12)scala版本。
考虑以下(不正确) build.sbt :

name := "Simple Project"

version := "1.0"

libraryDependencies ++= Seq(
   "org.apache.spark" % "spark-core_2.11" % "2.0.1",
   "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
   "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)

我们使用 spark-streaming 对于Scala2.10,其余的包是针对Scala2.11的。有效文件可能是

name := "Simple Project"

version := "1.0"

libraryDependencies ++= Seq(
   "org.apache.spark" % "spark-core_2.11" % "2.0.1",
   "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
   "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)

但最好全局指定版本并使用 %% (它为您附加了scala版本):

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-core" % "2.0.1",
   "org.apache.spark" %% "spark-streaming" % "2.0.1",
   "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
)

同样,在maven:

<project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>

spark版本所有软件包必须使用相同的主要spark版本(1.6、2.0、2.1…)。
考虑以下(不正确的)build.sbt:

name := "Simple Project"

version := "1.0"

libraryDependencies ++= Seq(
   "org.apache.spark" % "spark-core_2.11" % "1.6.1",
   "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
   "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)

我们使用 spark-core 1.6而其余部件处于spark 2.0中。有效文件可能是

name := "Simple Project"

version := "1.0"

libraryDependencies ++= Seq(
   "org.apache.spark" % "spark-core_2.11" % "2.0.1",
   "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
   "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)

但最好使用一个变量(仍然不正确):

name := "Simple Project"

version := "1.0"

val sparkVersion = "2.0.1"

libraryDependencies ++= Seq(
   "org.apache.spark" % "spark-core_2.11" % sparkVersion,
   "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
   "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
)

同样,在maven:

<project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>

spark相关性中使用的spark版本必须与spark安装的spark版本匹配。例如,如果在集群上使用1.6.1,则必须使用1.6.1来构建jar。不总是接受次要版本不匹配。
用于构建jar的scala版本必须与用于构建已部署spark的scala版本匹配。默认情况下(可下载的二进制文件和默认版本):
spark 1.x->scala 2.10
spark 2.x->scala 2.11
如果fatjar中包含了其他包,那么应该可以在worker节点上访问这些包。有多种选择,包括: --jars 争论 spark-submit -分发本地 jar 文件夹。 --packages 争论 spark-submit -从maven存储库获取依赖项。
在集群节点中提交时,应该包括应用程序 jar--jars .

相关问题