spark如何将scala对象传递给map转换

m0rkklqb  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(658)

我不确定spark是如何将对象传递给执行器的,所以我写了这个测试(在1个执行器中运行)
在遗嘱执行人中, z 的init值是100,尽管我在driver中将它设置为0。在这四个月里 map 函数,其值保持增长而不重置为100。变更的修改 z 到1000也被执行者忽略。
为什么会这样?Spark如何传递到物体上 map 变压器?

object main extends App {
  val a = sc.parallelize((0 until 10).toList)
  A.z = 0
  println(a.map(x=>A.kk()).collect().mkString(","))
  println(a.map(x=>A.kk()).collect().mkString(","))
  println(s"driver z: ${A.z}")
  A.z=1000
  println("change z to 1000")
  println(a.map(x=>A.kk()).collect().mkString(","))
  println(a.map(x=>A.kk()).collect().mkString(","))
}

object A{
  var z =100
  def kk(): Int ={
    z+=1
    z
  }
}

输出为

104,105,101,102,103,106,107,108,109,110
114,115,111,112,113,116,117,118,119,120
driver z: 0
change z to 1000
121,122,123,124,125,126,127,128,129,130
131,133,132,134,135,136,137,138,139,140
jxct1oxe

jxct1oxe1#

如果您想操作从驱动程序传递到执行器的对象(不管您有1个执行器还是n个执行器),您必须使用累加器。
spark不会传递在驱动程序中操作的对象,它会序列化原始对象(即“z=100”,无论更改如何)。而且,在执行器中对对象所做的所有更改在驱动程序中都不可见。
尝试:

// Driver
val acc = sparkSession.sparkContext.longAccumulator("foo")

// Executor/s
acc.add(...)
acc.reset()

请注意,每个执行人都有自己的累加器副本。对数据集执行操作后,将调用“merge”函数。
https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#accumulators

相关问题