有人告诉我,只要我使用一个版本的spark,就可以构建一个spark应用程序 sbt assembly
为了构建它,我可以在任何spark集群上使用spark submit运行它。
所以,我用spark 2.1.1构建了一个简单的应用程序。你可以在下面看到我的build.sbt文件。而不是在我的集群上开始:
cd spark-1.6.0-bin-hadoop2.6/bin/
spark-submit --class App --master local[*] /home/oracle/spark_test/db-synchronizer.jar
如你所见,我正在用spark 1.6.0执行它。
我得到了一个错误:
17/06/08 06:59:20 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver]
java.lang.NoSuchMethodError: org.apache.spark.SparkConf.getTimeAsMs(Ljava/lang/String;Ljava/lang/String;)J
at org.apache.spark.streaming.kafka010.KafkaRDD.<init>(KafkaRDD.scala:70)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:219)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/06/08 06:59:20 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:23 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:26 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:29 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
... 1 more
17/06/08 06:59:39 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:42 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:45 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:48 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
... 1 more
根据一些阅读,我看到了一个典型的错误: java.lang.NoSuchMethodError
连接到不同版本的spark。这可能是真的,因为我用的是不同的。但不应该 sbt assembly
盖上那个?请参见下面的by build.sbt和assembly.sbt文件
构建.sbt
name := "spark-db-synchronizator"
//Versions
version := "1.0.0"
scalaVersion := "2.10.6"
val sparkVersion = "2.1.1"
val sl4jVersion = "1.7.10"
val log4jVersion = "1.2.17"
val scalaTestVersion = "2.2.6"
val scalaLoggingVersion = "3.5.0"
val sparkTestingBaseVersion = "1.6.1_0.3.3"
val jodaTimeVersion = "2.9.6"
val jodaConvertVersion = "1.8.1"
val jsonAssertVersion = "1.2.3"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.slf4j" % "slf4j-api" % sl4jVersion,
"org.slf4j" % "slf4j-log4j12" % sl4jVersion exclude("log4j", "log4j"),
"log4j" % "log4j" % log4jVersion % "provided",
"org.joda" % "joda-convert" % jodaConvertVersion,
"joda-time" % "joda-time" % jodaTimeVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.holdenkarau" %% "spark-testing-base" % sparkTestingBaseVersion % "test",
"org.skyscreamer" % "jsonassert" % jsonAssertVersion % "test"
)
assemblyJarName in assembly := "db-synchronizer.jar"
run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
runMain in Compile := Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run))
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
// Spark does not support parallel tests and requires JVM fork
parallelExecution in Test := false
fork in Test := true
javaOptions in Test ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")
装配.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
1条答案
按热度按时间5m1hhzi41#
您是正确的,可以在一些spark1.6环境(如hadoopyarn,cdh或hdp)上绑定spark2.1.1库来运行spark应用程序。
这种技巧通常在大型公司中使用,因为cdh(yarn)或hdp(yarn)不支持基础设施团队,所以开发团队只能强制使用一些较旧的spark版本。
你应该使用
spark-submit
从较新的spark安装(我建议使用最新的和最好的2.1.1作为本文的撰写),并捆绑所有spark jar作为spark应用程序的一部分。只是
sbt assembly
使用spark 2.1.1的spark应用程序(如中所述build.sbt
)以及spark-submit
uberjar将使用与spark2.1.1版本完全相同的版本来支持较旧的spark环境。事实上,hadoopyarn并没有使spark比任何其他应用程序库或框架更好。它很不愿意特别注意spark。
但是,这需要一个集群环境(刚刚检查过,当您的spark应用程序使用spark 2.1.1时,它不能与spark standalone 1.6一起工作)。
在您的情况下,当您使用
local[*]
主url,它不应该工作。原因有两个:
local[*]
受到类路径的限制,试图说服spark 1.6.0在同一jvm上运行spark 2.1.1可能需要相当长的时间(如果可能的话)您可以使用较旧的版本来运行较新的2.1.1。反之亦然。
使用hadoopYarn…嗯…它不注意Spark,已经在我的项目中测试过几次了。
我一直在想,我怎么知道运行时使用的是哪个版本的i.e.spark-core
使用WebUI,您应该可以在左上角看到版本。
您还应该查阅webui的environment选项卡,在那里可以找到运行时环境的配置。这是有关spark应用程序宿主环境的最权威的来源。
在靠近底部的地方,您应该可以看到类路径条目,这些条目将为您提供包含jar、文件和类的类路径。
使用它来查找任何与类路径相关的问题。