我需要建立一个图形与graphx的基础上处理的数据从Kafka。然而,似乎 sc.parallelize()
引发错误 java.io.NotSerializableException: org.apache.spark.SparkContext
```
......
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
lines.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
......
// Build a graph
val vertRDD = sc.parallelize(vertices)
val edgeRDD = sc.parallelize(edge)
val graph = Graph(vertRDD, edgeRDD, defaultUser)
}
})
})
我应该用什么方法解决这个问题?
1条答案
按热度按时间w51jfk4q1#
spark streaming中的foreachrdd操作符在驱动程序上的每个批处理间隔运行处理rdd,然后使用该驱动程序(通过其
RDD
)写一段代码,最终把自己变成Spark作业。foreachrdd(foreachfunc:(rdd[t])⇒ unit):对数据流中的每个rdd应用一个函数。这是一个输出操作符,因此“this”数据流将被注册为一个输出流,并因此具体化。
foreachpartition是只在执行者上发生的操作。
foreachpartition(f:(迭代器[t])⇒ 单位):单位应用函数
f
到这个rdd的每个分区。在一个任务可以在执行器上执行之前,它必须被序列化,因为
SparkContext
不可序列化,因此出现异常。spark就是这样确保的SparkContext
(作为sc
)由于spark中的设计决策,永远不会出现。这无论如何都没有意义,因为整个调度基础设施都在驱动程序上。SparkContext
以及RDD
仅在驱动程序上可用。它们只是描述分布式计算的一种方式,最终将“转换”为在spark执行器上运行的任务。这就是您看到错误消息的原因:
java.io.notserializableexception:org.apache.spark.sparkcontext
顺便说一句,我今天回答了一个类似的问题(请参阅sparkcontext.textfile是否可以与自定义接收器一起使用?),所以看起来今天是sparkcontext日:)