无法在spark 1.5.0中广播大型concurrenthashmap

wfveoks0  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(254)

我目前正在使用cloudera发行版的spark1.5.0,并在java代码中尝试广播一个并发hashmap。在map()函数中,当我尝试读取广播变量时,在资源管理器日志中会出现nullpointer异常。有人能帮我吗?我找不到任何解决办法。以下是我的代码片段:

// for broadcasting before calling mapper
final Broadcast<ConcurrentHashMap<ConstantKeys, Object>> constantmapFinal =
                    context.broadcast(constantMap);

.......

// In map function
    JavaRDD<String> outputRDD =
                    tempRdd.map(new org.apache.spark.api.java.function.Function() {
                        private static final long serialVersionUID =
                                6104325309455195113L;

                        public Object call(final Object arg0)
                                throws**Exception {
                            ConcurrentHashMap<ConstantKeys, Object> constantMap =
                                    constantmapFinal.value(); // line 428
}
    });

资源管理器日志中的异常:

016-11-17 10:40:10 ERROR ApplicationMaster:96 - User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage 2.0 (TID 20,******(server name)): java.io.IOException: java.lang.NullPointerException
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1177)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at com.***.text.fuzzymatch.execute.FuzzyMatchWrapper$2.call(FuzzyMatchWrapper.java:428)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1205)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at java.util.HashMap.put(HashMap.java:493)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
    at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1174)
    ... 21 more

这适用于较小尺寸的Map。根据输入请求,Map可以包含许多键值对。有人能帮我吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题