我正在尝试通过spark streaming 1.3.1处理来自Kafka的消息。作业正在我的电脑上本地运行。
当我开始工作,没有消息消费,这是工作正常。但当我向kafka发送消息时,作业会记录一个nullpointerexception
15/05/20 11:22:00 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:54)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:71)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:71)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/20 11:22:00 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 9)
org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/20 11:22:00 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 9, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/20 11:22:00 ERROR TaskSetManager: Task 1 in stage 4.0 failed 1 times; aborting job
15/05/20 11:22:00 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
15/05/20 11:22:00 INFO TaskSchedulerImpl: Cancelling stage 4
15/05/20 11:22:00 INFO DAGScheduler: Stage 4 (foreachRDD at SimpleApp.java:55) failed in 0,018 s
15/05/20 11:22:00 INFO DAGScheduler: Job 4 failed: foreachRDD at SimpleApp.java:55, took 0,026982 s
15/05/20 11:22:00 ERROR JobScheduler: Error running job streaming job 1432113720000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 9, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
这是消费者工作的代码。自定义解码器“traceeventdecoder”只是将kafka消息中的字节数组反序列化为traceevent类型的对象。
public static void main(final String[] args) throws InterruptedException, IOException {
System.out.println("Job started");
SparkConf conf = new SparkConf().setAppName("HandlingTimeFilter");
conf.setMaster("local[10]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "cer:2181");
kafkaParams.put("bootstrap.servers", "cer:9099,lanthan:9099,uran:9099,protactinium:9099");
kafkaParams.put("consumer.id", "SimpleSparkConsumer");
kafkaParams.put("group.id", "consumers");
Set<String> topics = new HashSet<>();
topics.add("traceEvents");
JavaPairInputDStream<String, TraceEvent> stream =
KafkaUtils.createDirectStream(
jssc, String.class, TraceEvent.class,
StringDecoder.class, TraceEventDecoder.class,
kafkaParams, topics);
stream.foreachRDD((Function<JavaPairRDD<String, TraceEvent>, Void>) rdd -> {
Map<String, TraceEvent> events = rdd.collectAsMap();
System.out.println("New RDD call, size=" + events.size());
return null;
});
jssc.start();
Thread.sleep(60000);
jssc.stop();
}
2条答案
按热度按时间8iwquhpp1#
您的traceeventdecoder构造函数是公共的吗?是否有traceeventdecoder(verifiableproperties)版本?当kafkadirectstream的初始化参数出错时,可能会发生此错误。我已经向apache提交了一个与此案例相关的问题:https://issues.apache.org/jira/browse/spark-9780
vof42yt12#
我在spark 1.2.0中遇到了这个问题,但是升级到了1.3.1,现在可以工作了。我认为这个问题可能与缺少依赖性有关。如果您使用的是maven,那么可以调整pom.xml以构建适当的uberjar。
资料来源:http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/missing_dependencies_in_jar_files.html (我更新了上面的版本以使用spark 1.3.1并包含kafka依赖项)