了解spark的闭包及其序列化

irlmq6kh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(610)

免责声明:刚刚开始玩Spark。
我很难理解著名的“task not serializable”异常,但是我的问题与我在上面看到的有点不同(或者我认为是这样)。
我有一个小小的定制rdd( TestRDD ). 它有一个字段,用于存储类未实现可序列化的对象( NonSerializable ). 我已经将“spark.serializer”配置选项设置为使用kryo。然而,当我尝试 count() 在我的rdd上,我得到以下信息:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)

当我往里面看的时候 DAGScheduler.submitMissingTasks 我看到它在我的rdd上使用了闭包序列化程序,这是java序列化程序,而不是我期望的kryo序列化程序。我读到kryo在序列化闭包方面有问题,spark总是使用java序列化程序来实现闭包,但我不太明白闭包是如何在这里发挥作用的。我要做的就是:

SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

JavaSparkContext sc = new JavaSparkContext(conf);

TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());

也就是说,没有Map程序或任何需要闭包序列化的东西。otoh这是有效的:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()

kryo序列化程序按预期使用,不涉及闭包序列化程序。如果我没有将serializer属性设置为kryo,这里也会出现异常。
我很感激任何关于闭包从何而来以及如何确保我可以使用kryo序列化定制rdd的说明。
更新:这里是 TestRDD 它的不可序列化字段 mNS :

class TestRDD extends RDD<String> {

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    NonSerializable mNS = new NonSerializable();

    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }

    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }

    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }

    static class TestPartition implements Partition {

        final int mIndex;

        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }

        public int index() {
            return mIndex;
        }
    }
}
sf6xfgos

sf6xfgos1#

当我往里面看的时候 DAGScheduler.submitMissingTasks 我看到它在我的rdd上使用了闭包序列化程序,这是java序列化程序,而不是我期望的kryo序列化程序。 SparkEnv 支持两个序列化程序,一个名为 serializer 它用于数据序列化、检查点设置、工作进程之间的消息传递等,并在下提供 spark.serializer 配置标志。另一个叫做 closureSerializer 低于 spark.closure.serializer 它用于检查您的对象实际上是可序列化的,并且可以为spark<=1.6.2配置(除了 JavaSerializer 从2.0.0及更高版本硬编码到 JavaSerializer .
kryo闭包序列化程序有一个使其无法使用的bug,您可以在spark-7708下看到这个bug(这个bug可以用kryo 3.0.0修复,但是spark目前用一个特定版本的chill修复,这个版本在kryo 2.2.1上修复)。此外,对于spark2.0.x,javaserializer现在是固定的,而不是可配置的(您可以在这个pull请求中看到它)。这意味着我们实际上被 JavaSerializer 用于闭包序列化。
我们使用一个序列化程序来提交任务,而另一个序列化程序在worker和其他类似的应用程序之间序列化数据,这是不是很奇怪?当然,但这是我们的。
总而言之,如果你设定 spark.serializer 配置,或使用 SparkContext.registerKryoClasses 您将在spark中使用kryo进行大部分序列化。话虽如此,为了检查给定的类是否可序列化,以及任务是否可序列化到worker,spark将使用 JavaSerializer .

相关问题