我不确定spark是如何将对象传递给执行器的,所以我写了这个测试(在1个执行器中运行)
在遗嘱执行人中, z
的init值是100,尽管我在driver中将它设置为0。在这四个月里 map
函数,其值保持增长而不重置为100。变更的修改 z
到1000也被执行者忽略。
为什么会这样?Spark如何传递到物体上 map
变压器?
object main extends App {
val a = sc.parallelize((0 until 10).toList)
A.z = 0
println(a.map(x=>A.kk()).collect().mkString(","))
println(a.map(x=>A.kk()).collect().mkString(","))
println(s"driver z: ${A.z}")
A.z=1000
println("change z to 1000")
println(a.map(x=>A.kk()).collect().mkString(","))
println(a.map(x=>A.kk()).collect().mkString(","))
}
object A{
var z =100
def kk(): Int ={
z+=1
z
}
}
输出为
104,105,101,102,103,106,107,108,109,110
114,115,111,112,113,116,117,118,119,120
driver z: 0
change z to 1000
121,122,123,124,125,126,127,128,129,130
131,133,132,134,135,136,137,138,139,140
1条答案
按热度按时间jxct1oxe1#
如果您想操作从驱动程序传递到执行器的对象(不管您有1个执行器还是n个执行器),您必须使用累加器。
spark不会传递在驱动程序中操作的对象,它会序列化原始对象(即“z=100”,无论更改如何)。而且,在执行器中对对象所做的所有更改在驱动程序中都不可见。
尝试:
请注意,每个执行人都有自己的累加器副本。对数据集执行操作后,将调用“merge”函数。
https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#accumulators