我的spark流代码在EclipseIDE上无缝工作。但是当我在本地spark集群上运行它时,它会给出org.apache.spark.util.taskcompletionlistenerexception。
同样在spark提交“客户机模式”代码运行良好,直到我启动我的Kafka生产者,但当我启动生产者它给出以下错误。
我用sh spark\u home/sbin/start-all.sh命令启动本地集群
用这个脚本调用spark submit。
# !/bin/sh
SP_SUBMIT=/home/user/spark/bin/spark-submit
DEP_MODE=client
$SP_SUBMIT \
--deploy-mode $DEP_MODE \
--class com.alind.sparkStream.Test \
--master spark://clstr:7077 \
--name alind\
/home/user/jar/com.alind-0.0.1-SNAPSHOT.jar \
当spark流开始接收消息时,我得到了这个错误。
2015-06-29 16:13:56 ERROR JobScheduler:96 - Error running job streaming job 1435574590600 ms.3
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 306.0 failed 1 times, most recent failure: Lost task 0.0 in stage 306.0 (TID 164, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 307
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 308
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 309
2015-06-29 16:13:56 INFO SparkContext:59 - Starting job: foreach at Test.java:428
2015-06-29 16:13:56 INFO MapOutputTrackerMaster:59 - Size of output statuses for shuffle 34 is 84 bytes
2015-06-29 16:13:56 INFO MapOutputTrackerMaster:59 - Size of output statuses for shuffle 35 is 84 bytes
2015-06-29 16:13:56 INFO DAGScheduler:59 - Got job 94 (foreach at Test.java:428) with 2 output partitions (allowLocal=false)
2015-06-29 16:13:56 INFO DAGScheduler:59 - Final stage: Stage 327(foreach at Test.java:428)
2015-06-29 16:13:56 INFO DAGScheduler:59 - Parents of final stage: List(Stage 320, Stage 317, Stage 324, Stage 321, Stage 318, Stage 325, Stage 322, Stage 326, Stage 323, Stage 319)
2015-06-29 16:13:56 INFO ShuffledDStream:59 - Slicing from 1435574619500 ms to 1435574620400 ms (aligned to 1435574619500 ms and 1435574620400 ms)
2015-06-29 16:13:56 INFO DAGScheduler:59 - Missing parents: List(Stage 320, Stage 317, Stage 318, Stage 319)
2015-06-29 16:13:56 INFO DAGScheduler:59 - Submitting Stage 317 (MappedRDD[234] at mapToPair at Test.java:157), which has no missing parents
2015-06-29 16:13:56 INFO MemoryStore:59 - ensureFreeSpace(4024) called with curMem=386851, maxMem=278302556
2015-06-29 16:13:56 INFO MemoryStore:59 - Block broadcast_129 stored as values in memory (estimated size 3.9 KB, free 265.0 MB)
2015-06-29 16:13:56 INFO MemoryStore:59 - ensureFreeSpace(2230) called with curMem=390875, maxMem=278302556
2015-06-29 16:13:56 INFO MemoryStore:59 - Block broadcast_129_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.0 MB)
2015-06-29 16:13:56 INFO BlockManagerInfo:59 - Added broadcast_129_piece0 in memory on localhost:42836 (size: 2.2 KB, free: 265.3 MB)
2015-06-29 16:13:56 INFO BlockManagerMaster:59 - Updated info of block broadcast_129_piece0
2015-06-29 16:13:56 INFO SparkContext:59 - Created broadcast 129 from getCallSite at DStream.scala:294
2015-06-29 16:13:56 INFO DAGScheduler:59 - Submitting 1 missing tasks from Stage 317 (MappedRDD[234] at mapToPair at Test.java:157)
2015-06-29 16:13:56 INFO TaskSchedulerImpl:59 - Adding task set 317.0 with 1 tasks
2015-06-29 16:13:56 INFO TaskSetManager:59 - Starting task 0.0 in stage 317.0 (TID 168, localhost, NODE_LOCAL, 7642 bytes)
2015-06-29 16:13:56 INFO Executor:59 - Running task 0.0 in stage 317.0 (TID 168)
2015-06-29 16:13:56 INFO KafkaRDD:103 - Computing topic test, partition 0 offsets 252661 -> 253192
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Verifying properties
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Property group.id is overridden to
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Property zookeeper.connect is overridden to
2015-06-29 16:13:56 ERROR TaskContextImpl:96 - Error in TaskCompletionListener
java.lang.NullPointerException
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-06-29 16:13:56 ERROR Executor:96 - Exception in task 0.0 in stage 317.0 (TID 168)
org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-06-29 16:13:56 WARN TaskSetManager:71 - Lost task 0.0 in stage 317.0 (TID 168, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我的pom.xml看起来像这样。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>alinds</groupId>
<artifactId>alind</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>
com.alind.sparkStream.Test
</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>MyOtherProject</groupId>
<version>1.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>Spark repository</id>
<url>http://www.sparkjava.com/nexus/content/repositories/spark/</url>
</repository>
</repositories>
</project>
Spark驱动看起来像这样。。。
public class Test {
static Logger log = Logger.getLogger(Test.class.getName());
public static void main(String[] args) {
System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("spark://clstr:7077");
// when i run this code from eclipse i change setMaster value to ("local[2]")
sparkConf.setAppName("alind");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(
javaSparkContext, new Duration(100));
Set<String> topics = new HashSet<String>();
topics.add("test");
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "10.20.3.14:9092");
// Tested this metadata.broker.list with localhost:9092 as well, its not working on cluster with any of these.
JavaPairInputDStream<String, String> stream = KafkaUtils
.createDirectStream(javaStreamingContext, String.class,
String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics);
stream.print();
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
}
如果您能告诉我本地集群有什么问题,我将不胜感激。Kafka那边好像出了什么事。
1条答案
按热度按时间4c8rllxm1#
我有同样的问题,原因是我的一个解码器有不正确的构造函数。这一例外在这方面确实具有误导性。
不正确的类别
正确的版本(参见props:verifiableproperties)
ps:我用的是scala,不是java。