我有一份表格 (String,(Int,Iterable[String]))
. 对于rdd中的每个条目,整数值(我称之为distance)最初设置为10。世界上的每一个元素 Iterable[String]
在这个rdd中有它自己的条目,在这里它作为一个键(因此我们有这个rdd中每个元素的距离) Iterable[String]
在单独的rdd条目中)。我的意图是:
1如果列表( Iterable[String]
)包含元素“bethan”,我将其距离指定为1。
2.在此之后,我通过过滤创建了一个距离为1的所有键的列表。
3.在此之后,我将rdd转换为一个新的rdd,如果rdd自己的列表中的任何元素的距离为1,则将rdd的距离值更新为2。
我有以下代码:
val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x})
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=> {
var b:Boolean = false
loop.breakable{
for (str <- x._2._2)
if (lst.contains(str)) //checks if it contains element with distance 1
b = true
loop.break
}
if (b)
(x._1,(2,x._2._2))
else
(x._1,(10,x._2._2))
})
但是当我运行它时,我得到错误“task not serializable”。我该怎么做?还有没有更好的方法?
编辑
表格的输入rdd:
("abc",(10,List("efg","hij","klm")))
("efg",(10,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(10,List("czx","Beethan","abc")))
("zse",(10,List("efg","Beethan","nbh")))
("gvf",(10,List("vcsd","fdgd")))
...
列表中包含beethan的每个元素的距离都应为1。每一个有“距离为1的元素”(而不是beethan)的元素都应该有距离2。输出的形式如下:
("abc",(2,List("efg","hij","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh"))
("gvf",(10,List("vcsd","fdgd")))
...
错误消息:
[error] (run-main-0) org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at Bacon$.main(Bacon.scala:86)
at Bacon.main(Bacon.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.io.NotSerializableException: scala.util.control.Breaks
Serialization stack:
- object not serializable (class: scala.util.control.Breaks, value: scala.util.control.Breaks@78426203)
- field (class: Bacon$$anonfun$15, name: loop$1, type: class scala.util.control.Breaks)
- object (class Bacon$$anonfun$15, <function1>)
1条答案
按热度按时间uqcuzwp81#
或
两个版本都适合我。问题是不可序列化的loop.breakable。老实说,我不知道这种结构的行为是否发生了变化,但在更换之后
loop.breakable
至breakable
它工作-也许有一些api的变化。带过滤器的版本可能会慢一些,但可以避免breakable
尽管存在主要的问题,lst应该是broadcasted变量-但是我在这里放置broadcasted变量并不是为了提供尽可能简单的答案