我目前正在使用cloudera发行版的spark1.5.0,并在java代码中尝试广播一个并发hashmap。在map()函数中,当我尝试读取广播变量时,在资源管理器日志中会出现nullpointer异常。有人能帮我吗?我找不到任何解决办法。以下是我的代码片段:
// for broadcasting before calling mapper
final Broadcast<ConcurrentHashMap<ConstantKeys, Object>> constantmapFinal =
context.broadcast(constantMap);
.......
// In map function
JavaRDD<String> outputRDD =
tempRdd.map(new org.apache.spark.api.java.function.Function() {
private static final long serialVersionUID =
6104325309455195113L;
public Object call(final Object arg0)
throws**Exception {
ConcurrentHashMap<ConstantKeys, Object> constantMap =
constantmapFinal.value(); // line 428
}
});
资源管理器日志中的异常:
016-11-17 10:40:10 ERROR ApplicationMaster:96 - User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage 2.0 (TID 20,******(server name)): java.io.IOException: java.lang.NullPointerException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1177)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at com.***.text.fuzzymatch.execute.FuzzyMatchWrapper$2.call(FuzzyMatchWrapper.java:428)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at java.util.HashMap.put(HashMap.java:493)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1174)
... 21 more
这适用于较小尺寸的Map。根据输入请求,Map可以包含许多键值对。有人能帮我吗?
暂无答案!
目前还没有任何答案,快来回答吧!