spark notserializableexception异常

bd1hkmkf  于 2021-05-30  发布在  Hadoop
关注(0)|答案(6)|浏览(300)

在spark代码中,我试图从csv文件创建indexedrowmatrix。但是,我得到以下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext

这是我的密码:

sc = new JavaSparkContext("local", "App",
              "/srv/spark", new String[]{"target/App.jar"});

JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();

JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
              new  Function<scala.Tuple2<String, Long>, IndexedRow>() {
                /**
                 * 
              **/ 
                private static final long serialVersionUID = 4795273163954440089L;

                @Override
                public IndexedRow call(Tuple2<String, Long> tuple)
                        throws Exception {
                    String line = tuple._1;
                    long index = tuple._2;
                    String[] strings = line.split(",");
                    double[] doubles = new double[strings.length];
                     for (int i = 0; i < strings.length; i++) {
                         doubles[i] = Double.parseDouble(strings[i]);
                     }
                     Vector v = new DenseVector(doubles);
                     return new IndexedRow(index, v);
                }
            });
insrf1ej

insrf1ej1#

为Map器创建一个单独的类并实现srielizable,有时内部类会在spark环境中导致编译问题。。

xeufq47z

xeufq47z2#

我也有同样的问题。它把我逼疯了。它是对匿名示例和可序列化性的java限制。我的解决方案是将函数的匿名示例声明为实现serializable的命名静态类并示例化它。我基本上声明了一个函数库,它是一个外部类,包含了我想要使用的函数的静态内部类定义。
当然,如果您用scala编写它,它很可能是一个包含更整洁代码的文件,但在本例中这对您没有帮助。

zc0qhyus

zc0qhyus3#

有些东西闻起来有点可疑,如果你给我们看更多的代码,也许我们能给出更好的答案。
无论如何,您可以尝试在表示Map器函数的单独文件中创建一个公共类:

public class Mapper implements Function<Tuple2<String,Long>, IndexedRow> {

  @Override
  public IndexedRow call(Tuple2<String, Long> tuple) throws Exception {
    String line = tuple._1();
    long index = tuple._2();
    String[] strings = line.split(",");
    double[] doubles = new double[strings.length];
    for (int i = 0; i < strings.length; i++) {
      doubles[i] = Double.parseDouble(strings[i]);
    }
    Vector v = new DenseVector(doubles);
    return new IndexedRow(index, v);
  }
}

然后用它来Mapjavardd:

JavaRDD<String> csv = jsc.textFile("data/matrix.csv").cache();
JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(new Mapper());

这样,对于map()调用,spark只需要序列化mapper类,而mapper类中没有任何不可序列化的属性。
然而,作业可能会因为其他原因而失败,这些原因我们不知道,因为我们看不到所涉及的所有代码。

wvt8vs2t

wvt8vs2t4#

当序列化出现问题时,最好添加以下参数: -Dsun.io.serialization.extendedDebugInfo=true 这样你就能更准确地看到它失败的地方。
下面是代码中可能发生的情况。一 JavaSparkContext 确实是不可序列化的(原因有很多,你可以在网上找到)。在您的代码中,您没有直接序列化它,但是您确实持有对它的引用,因为您的 Function 不是静态的,因此它持有对封闭类的引用。因此,当您发送Map时,基本上会发生的是,它将尝试序列化包含 JavaSparkContext 它是不可序列化的,这就是异常的来源。您可以尝试静态重写此函数,或者将函数作为非嵌套类编写,或者使 JavaSparkContext 使其不序列化。
如果可能的话,我建议您选择最新的选项,原因很简单,创建它是最佳实践 JavaSparkContext 在本地,否则,由于您对类的每个引用(有时很难找到),您将有数百个不可序列化的问题。例如,可以通过示例化 JavaSparkContext 在你的主要班级里:

public static void main(String[] args) {
   JavaSparkContext sc = new JavaSparkContext();

   // do whatever you need to do, if you need sc inside other classes,
   // store this sc into a static class, say Registry.set(sc) and Registry.getJSC()

   JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();
   JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
          new  Function<scala.Tuple2<String, Long>, IndexedRow>() {
            private static final long serialVersionUID = 4795273163954440089L; // won't be serialized

            @Override
            public IndexedRow call(Tuple2<String, Long> tuple)
                    throws Exception {
                String line = tuple._1;
                long index = tuple._2;
                String[] strings = line.split(",");
                double[] doubles = new double[strings.length];
                 for (int i = 0; i < strings.length; i++) {
                     doubles[i] = Double.parseDouble(strings[i]);
                 }
                 Vector v = new DenseVector(doubles);
                 return new IndexedRow(index, v);
            }
        });
}

另外请注意,静态字段与示例无关,而是与类相关,因此我认为 serialVersionUID 也没有序列化(以防在某个时候对您造成问题)。

jqjz2hbq

jqjz2hbq5#

一般来说,当任务提交给不同的执行者时,rdd对象将由spark序列化。但是您应该使用闭包,这样可以避免这个错误。
您可以使用rdd.mappartition()来处理每个分区,并将代码放在其中。通过这种方式,spark本身将负责序列化和反序列化map对象。

x0fgdtte

x0fgdtte6#

在驱动程序中编写并在rdd转换中使用的任何代码都需要序列化。如果您遇到有关序列化请求的问题,请遵循以下设计原则:
编写在转换(map)中使用不可序列化对象的所有代码。
利用 forEachPartition 在spark中对每个分区执行一个操作。rdd转换共享的任何代码都必须始终是可序列化的。

相关问题