为什么dstream.foreachrdd失败时出现java.io.notserializableeexception:org.apache.spark.sparkcontext?

o7jaxewo  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(435)

我需要建立一个图形与graphx的基础上处理的数据从Kafka。然而,似乎 sc.parallelize() 引发错误 java.io.NotSerializableException: org.apache.spark.SparkContext ```
......
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)

lines.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
......
// Build a graph
val vertRDD = sc.parallelize(vertices)
val edgeRDD = sc.parallelize(edge)
val graph = Graph(vertRDD, edgeRDD, defaultUser)
}
})
})

我应该用什么方法解决这个问题?
w51jfk4q

w51jfk4q1#

spark streaming中的foreachrdd操作符在驱动程序上的每个批处理间隔运行处理rdd,然后使用该驱动程序(通过其 RDD )写一段代码,最终把自己变成Spark作业。
foreachrdd(foreachfunc:(rdd[t])⇒ unit):对数据流中的每个rdd应用一个函数。这是一个输出操作符,因此“this”数据流将被注册为一个输出流,并因此具体化。
foreachpartition是只在执行者上发生的操作。
foreachpartition(f:(迭代器[t])⇒ 单位):单位应用函数 f 到这个rdd的每个分区。
在一个任务可以在执行器上执行之前,它必须被序列化,因为 SparkContext 不可序列化,因此出现异常。spark就是这样确保的 SparkContext (作为 sc )由于spark中的设计决策,永远不会出现。这无论如何都没有意义,因为整个调度基础设施都在驱动程序上。 SparkContext 以及 RDD 仅在驱动程序上可用。它们只是描述分布式计算的一种方式,最终将“转换”为在spark执行器上运行的任务。
这就是您看到错误消息的原因:
java.io.notserializableexception:org.apache.spark.sparkcontext
顺便说一句,我今天回答了一个类似的问题(请参阅sparkcontext.textfile是否可以与自定义接收器一起使用?),所以看起来今天是sparkcontext日:)

相关问题