在本地环境中没有问题,但是在执行spark提交时出现异常。
近似代码如下所示
class Test extends Serializable {
def action() = {
val sc = SparkContext.getOrCreate(sparkConf)
val rdd1 = sc.textFile(.. )
val rdd2 = rdd1.map ( logline => {
//gson
val jsonObject jsonParser.parse(logLine).getAsJsonObject
//jackson
val jsonObject = objectMapper.readValue(logLine,classOf[HashMap[String,String]])
MyDataSet ( parsedJson.get("field1"), parsedJson.get("field2"),...)
}
}
}
例外情况
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
ensureSerializable(ClosureCleanser. scala:444)..
........
........
caused by: java.io.NotSerializableException : com.fasterxml.jackson.module.scala.modifiers.ScalaTypeModifier
gson和Jackson库我都用过。
这难道不是一个可以通过从serializable继承来解决的问题吗?
1条答案
按热度按时间oknwwptz1#
异常
NotSerializableException
是非常不言自明的。你的任务是不可序列化的。Spark是一个并行计算引擎。驱动程序(执行主程序的地方)将您想要在RDD上进行的转换(在map
函数中编写的代码)转换到执行它们的执行器。因此,这些转换需要是可序列化的。在您的情况下,jsonParser
和objectMapper
是在驱动程序上创建的,为了在转换中使用它们,spark尝试将它们序列化,但失败了,因为它们不可序列化,这是您的错误,我不知道哪个不可序列化,可能两个都不可序列化。让我们举一个例子,看看我们可以如何修复它。
为了解决这个问题,我们在变形中创建一个对象
这是可行的,但显然,为每条记录创建对象的开销会相当大,我们可以使用
mapPartition
做得更好,每个分区只创建一次对象: