spark闭包

jjjwad0x  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(332)

这个问题在这里已经有了答案

linkedhashmap变量在foreach循环外不可访问(1个答案)
5年前关门了。
我有一个数组,当它在闭包内(它有一些值)但在循环外时,数组大小是0。我想知道是什么导致这种行为?
我需要harr可以在外面进行批处理。

val hArr = new ArrayBuffer[Put]()

rdd.foreach(row => {
  val hConf = HBaseConfiguration.create()
  val hTable = new HTable(hConf, tablename)
  val hRow = new Put(Bytes.toBytes(row._1.toString))
  hRow.add(...)
  hArr += hRow
  println("hArr: " + hArr.toArray.mkString(","))
})

println("hArr.size: " + hArr.size)
6bc51xsx

6bc51xsx1#

我发现相当多的新spark用户对mapper和reducer函数如何运行以及它们如何与驱动程序中定义的内容相关感到困惑。通常,由map、foreach、reducebykey或许多其他变体定义和注册的所有mapper/reducer函数都不会在驱动程序中执行。在您的驱动程序中,您只需将它们注册为spark即可远程和分布式地运行它们。当这些函数引用您在驱动程序中示例化的一些对象时,您实际上创建了一个“闭包”,它在大多数情况下都可以编译。但通常情况下,这并不是您想要的,您通常会在运行时遇到问题,通过看到notserializable或classnotfound异常。
您可以通过foreach()变量远程执行所有输出工作,也可以通过调用collect()尝试将所有数据收集回驱动程序进行输出。但是要小心collect(),因为它会将所有数据从分布式节点收集到驱动程序中。只有当您完全确定最终聚合的数据很小时,才可以这样做。

tzxcd3kk

tzxcd3kk2#

问题是rdd闭包中的任何项都会被复制并使用本地版本。 foreach 应该只用于保存到磁盘或类似的东西。
如果你想把它放在一个数组中,那么你可以 map 然后 collect ```
rdd.map(row=> {
val hConf = HBaseConfiguration.create()
val hTable = new HTable(hConf, tablename)
val hRow = new Put(Bytes.toBytes(row._1.toString))
hRow.add(...)
hRow
}).collect()

相关问题