在foreachrdd中使用变量时是否需要使用广播变量?

rbpvctlc  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(498)

在我的代码中,我订阅了一个kafka流,并在一个函数中处理每个rdd:

val myStream = KafkaUtils.createDirectStream[K, V](
      streamingContext,
      PreferConsistent,
      Subscribe[K, V](topics, consumerConfig)
    )

  val myMap: Map[ObjA, ObjB] = getMyMap() // This is the variable I want to access in 'process'

  def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {

     // Code that uses myMap.get("key")

  }

  myStream.foreachRDD((x, y) => process((x, y)))

我读了这篇关于spark和kafka整合模式的文章。据我所知, foreachRDD 在驱动程序上本地执行,但随后任何内部循环都会分布到集群节点。那是不是意味着我应该广播 myMap 出于性能原因?

qjp7pelc

qjp7pelc1#

但是任何内部循环都被分配到集群节点
分布式的不是“任何内部循环”,而是rdd上的操作。
所以这取决于 myMap.get("key") 在内部使用 process . e、 g.这里有一个广播完全没有意义的例子:

def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
  case (rdd, _) => rdd.take(10).filter(/* do something using myMap */)
}
fhg3lkii

fhg3lkii2#

这是否意味着出于性能原因我应该广播mymap?
好:
如果不这样做,则以序列化形式缓存以这种方式广播的数据,并在运行每个任务之前进行反序列化。
如果你这样做了,spark会在每个执行者身上保留一份副本,并在需要时重用。
因为变量可以在多个任务之间重用,所以广播是有价值的,特别是当数据大到足以增加大量开销时。
如果不是这样的话,最好还是坚持闭包序列化以获得更好的可读性(我承认这是一个优先考虑的问题)。

相关问题