我有一个流hdfs文本文件的代码。但是每个文本文件包含一个标题和50行的描述。我想忽略这些行,只接收数据。
这是我的代码,但它抛出了一个sparkexception:task not serializable
val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")
hdfsDStream.foreachRDD(
rdd => {
val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
=> {
if (partitionIdx == 0) {
lines.drop(50)
}
lines
})
val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))
if (data.count() > 0) {
...
}
}
)
2条答案
按热度按时间mftmpeh81#
我认为您只需要zipwithindex并过滤索引小于50的情况。
也。。。在这里-
Row(p(0),p(1),p(2),p(3))
,你真的需要吗Row
突然之间?qnzebej02#
任务不可序列化错误发生在这种情况下:将函数传递给spark:引用整个对象的风险是什么?或运行apache spark作业时出现任务不可序列化异常
很可能您正在那里创建某种对象,并在rdd方法中调用其函数,从而强制引擎序列化您的对象。
不幸的是,您打印的代码部分工作得非常好,问题出在被点替换的部分。例如,这一个有效: