在spark代码中,我试图从csv文件创建indexedrowmatrix。但是,我得到以下错误:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
这是我的密码:
sc = new JavaSparkContext("local", "App",
"/srv/spark", new String[]{"target/App.jar"});
JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();
JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
new Function<scala.Tuple2<String, Long>, IndexedRow>() {
/**
*
**/
private static final long serialVersionUID = 4795273163954440089L;
@Override
public IndexedRow call(Tuple2<String, Long> tuple)
throws Exception {
String line = tuple._1;
long index = tuple._2;
String[] strings = line.split(",");
double[] doubles = new double[strings.length];
for (int i = 0; i < strings.length; i++) {
doubles[i] = Double.parseDouble(strings[i]);
}
Vector v = new DenseVector(doubles);
return new IndexedRow(index, v);
}
});
6条答案
按热度按时间insrf1ej1#
为Map器创建一个单独的类并实现srielizable,有时内部类会在spark环境中导致编译问题。。
xeufq47z2#
我也有同样的问题。它把我逼疯了。它是对匿名示例和可序列化性的java限制。我的解决方案是将函数的匿名示例声明为实现serializable的命名静态类并示例化它。我基本上声明了一个函数库,它是一个外部类,包含了我想要使用的函数的静态内部类定义。
当然,如果您用scala编写它,它很可能是一个包含更整洁代码的文件,但在本例中这对您没有帮助。
zc0qhyus3#
有些东西闻起来有点可疑,如果你给我们看更多的代码,也许我们能给出更好的答案。
无论如何,您可以尝试在表示Map器函数的单独文件中创建一个公共类:
然后用它来Mapjavardd:
这样,对于map()调用,spark只需要序列化mapper类,而mapper类中没有任何不可序列化的属性。
然而,作业可能会因为其他原因而失败,这些原因我们不知道,因为我们看不到所涉及的所有代码。
wvt8vs2t4#
当序列化出现问题时,最好添加以下参数:
-Dsun.io.serialization.extendedDebugInfo=true
这样你就能更准确地看到它失败的地方。下面是代码中可能发生的情况。一
JavaSparkContext
确实是不可序列化的(原因有很多,你可以在网上找到)。在您的代码中,您没有直接序列化它,但是您确实持有对它的引用,因为您的Function
不是静态的,因此它持有对封闭类的引用。因此,当您发送Map时,基本上会发生的是,它将尝试序列化包含JavaSparkContext
它是不可序列化的,这就是异常的来源。您可以尝试静态重写此函数,或者将函数作为非嵌套类编写,或者使JavaSparkContext
使其不序列化。如果可能的话,我建议您选择最新的选项,原因很简单,创建它是最佳实践
JavaSparkContext
在本地,否则,由于您对类的每个引用(有时很难找到),您将有数百个不可序列化的问题。例如,可以通过示例化JavaSparkContext
在你的主要班级里:另外请注意,静态字段与示例无关,而是与类相关,因此我认为
serialVersionUID
也没有序列化(以防在某个时候对您造成问题)。jqjz2hbq5#
一般来说,当任务提交给不同的执行者时,rdd对象将由spark序列化。但是您应该使用闭包,这样可以避免这个错误。
您可以使用rdd.mappartition()来处理每个分区,并将代码放在其中。通过这种方式,spark本身将负责序列化和反序列化map对象。
x0fgdtte6#
在驱动程序中编写并在rdd转换中使用的任何代码都需要序列化。如果您遇到有关序列化请求的问题,请遵循以下设计原则:
编写在转换(map)中使用不可序列化对象的所有代码。
利用
forEachPartition
在spark中对每个分区执行一个操作。rdd转换共享的任何代码都必须始终是可序列化的。