我正在使用spark作业服务器提交集群中的spark作业,我尝试测试的应用程序是一个基于sansa查询和sansa堆栈的spark程序。sansa用于可伸缩地处理大量rdf数据,sansa query是sansa库中用于查询rdf数据的一个。当我使用spark submit命令将spark应用程序作为spark程序运行时,它会按预期正常工作。但是当通过spark job server运行程序时,应用程序大部分时间都会失败,例外情况如下。
0/05/29 18:57:00信息块管理信息:在us1salxhpw0653.corpnet2的内存中添加了rdd\ U 44\ U 0。com:37017 (大小:16.0 b,可用空间:366.2 mb)20/05/29 18:57:00错误应用程序管理员:接收到信号项20/05/29 18:57:00信息sparkcontext:从关闭挂钩调用stop(),20/05/29 18:57:00信息jobmanageractor:获取spark应用程序结束事件,停止工作经理。20/05/29 18:57:00 info jobmanagerator:从外部获取spark应用程序结束事件,正在停止作业管理器20/05/29 18:57:00 info sparkui:已停止spark web uihttp://10.138.32.96:46627 20/05/29 18:57:00信息任务集管理器:在阶段3.0中启动任务3.0(tid 63,us1salxhpw0653.corpnet2.com,executor 1,partition 3,node\u local,4942字节)20/05/29 18:57:00 info tasksetmanager:在us1salxhpw0653.corpnet2.com(executor 1)(1/560)上以513毫秒的时间完成阶段3.0(tid 60)中的任务0.0 20/05/29 18:57:00 info tasksetmanager:在阶段3.0中启动任务4.0(tid 64,us1salxhpw0669.corpnet2.com,executor 2,partition 4,node\u local,4942字节)20/05/29 18:57:00 info tasksetmanager:在us1salxhpw0669.corpnet2.com(executor 2)(2/560)上以512毫秒的时间完成阶段3.0(tid 61)中的任务1.0 20/05/29 18:57:00 info tasksetmanager:在阶段3.0(tid 65,us1salxhpw0670.corpnet2.com,executor 3,partition 5,node\ u local,4942 bytes)20/05/29 18:57:00 info tasksetmanager:在us1salxhpw0670.corpnet2.com(executor 3)(3/560)20/05/29 18:57:00 info blockmanagerinfo:在us1salxhpw0669.corpnet2的内存中添加了rdd\ U 44\ U 4。com:34922 (尺寸:16.0 b,免费:366.2 mb)20/05/29 18:57:00信息块管理信息:在us1salxhpw0653.corpnet2上的内存中添加了rdd\ U 44\ U 3。com:37017 (大小:16.0 b,可用空间:366.2 mb)20/05/29 18:57:00信息:作业2失败:保存在sansaqueryexample。scala:32,取0.732943 s 20/05/29 18:57:00信息:shufflemapstage 3(保存在sansaqueryexample。scala:32)在0.556 s内失败,由于sparkcontext关闭而取消了阶段20/05/29 18:57:00错误fileformatwriter:正在中止作业null。>>org.apache.spark.sparkexception:作业2已取消,因为org.apache.spark.scheduler.dagscheduler$$anonfun$cleanupafterschedulerstop$1.apply(dagscheduler)上的sparkcontext已关闭。scala:820)位于org.apache.spark.scheduler.dagscheduler$$anonfun$cleanupafterschedulerstop$1.apply(dagscheduler)。scala:818)在scala.collection.mutable.hashset.foreach(hashset。scala:78)在org.apache.spark.scheduler.dagscheduler.cleanupafterschedulerstop(dagscheduler。scala:818)位于org.apache.spark.scheduler.dagschedulereventprocessloop.onstop(dagscheduler。scala:1732)在org.apache.spark.util.eventloop.stop(eventloop。scala:83)在org.apache.spark.scheduler.dagscheduler.stop(dagscheduler。scala:1651)在org.apache.spark.sparkcontext$$anonfun$stop$8.apply$mcv$sp(sparkcontext)。scala:1923)在org.apache.spark.util.utils$.trylognonfataleror(utils。scala:1317)在org.apache.spark.sparkcontext.stop(sparkcontext。scala:1922)在org.apache.spark.sparkcontext$$anonfun$2.应用$mcv$sp(sparkcontext。scala:584)在org.apache.spark.util.sparkshutdownhook.run(shutdownhookmanager。scala:216)在org.apache.spark.util.sparkshutdownhookmanager$$anonfun$runall$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(shutdownhookmanager)。scala:188)在org.apache.spark.util.sparkshutdownhookmanager$$anonfun$runall$1$$anonfun$apply$mcv$sp$1.apply(shutdownhookmanager。scala:188)在org.apache.spark.util.sparkshutdownhookmanager$$anonfun$runall$1$$anonfun$apply$mcv$sp$1.apply(shutdownhookmanager。scala:188)在org.apache.spark.util.utils$.loguncaughtexceptions(utils。scala:1954)在org.apache.spark.util.sparkshutdownhookmanager$$anonfun$runall$1.应用$mcv$sp(shutdownhookmanager)。scala:188)在org.apache.spark.util.sparkshutdownhookmanager$$anonfun$runall$1.apply(shutdownhookmanager。scala:188)在org.apache.spark.util.sparkshutdownhookmanager$$anonfun$runall$1.apply(shutdownhookmanager。scala:188)在scala.util.try$.apply(try。scala:192)
用于直接执行的代码
object SansaQueryExampleWithoutSJS {
def main(args: Array[String]) {
val spark=SparkSession.builder().appName("sansa stack example").getOrCreate()
val input = "hdfs://user/dileep/rdf.nt";
val sparqlQuery: String = "SELECT * WHERE {?s ?p ?o} LIMIT 10"
val lang = Lang.NTRIPLES
val graphRdd = spark.rdf(lang)(input)
println(graphRdd.collect().foreach(println))
val result = graphRdd.sparql(sparqlQuery)
result.write.format("csv").mode("overwrite").save("hdfs://user/dileep/test-out")
}
与spark作业服务器集成的代码
object SansaQueryExample extends SparkSessionJob {
override type JobData = Seq[String]
override type JobOutput = collection.Map[String, Long]
override def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config):
JobData Or Every[ValidationProblem] = {
Try(config.getString("input.string").split(" ").toSeq)
.map(words => Good(words))
.getOrElse(Bad(One(SingleProblem("No input.string param"))))
}
override def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = {
val input = "hdfs://user/dileep/rdf.nt";
val sparqlQuery: String = "SELECT * WHERE {?s ?p ?o} LIMIT 10"
val lang = Lang.NTRIPLES
val graphRdd = sparkSession.rdf(lang)(input)
println(graphRdd.collect().foreach(println))
val result = graphRdd.sparql(sparqlQuery)
result.write.format("csv").mode("overwrite").save("hdfs://user/dileep/test-out")
sparkSession.sparkContext.parallelize(data).countByValue
}
}
这里主要说明通过spark作业服务器执行应用程序的步骤
通过restapi将jar上传到sjs
根据需要,通过另一个api创建一个带有内存和内核的spark上下文
通过另一个api执行作业,其中提到已经创建的jar和上下文
因此,当我观察到程序的不同执行时,我可以看到spark作业服务器的行为不一致,程序在少数情况下工作没有任何错误。还可以观察到sparkcontext由于一些未知的原因被关闭。我使用的是sjs 0.8.0和sansa 0.7.1以及spark 2.4
暂无答案!
目前还没有任何答案,快来回答吧!