在我的代码中,我订阅了一个kafka流,并在一个函数中处理每个rdd:
val myStream = KafkaUtils.createDirectStream[K, V](
streamingContext,
PreferConsistent,
Subscribe[K, V](topics, consumerConfig)
)
val myMap: Map[ObjA, ObjB] = getMyMap() // This is the variable I want to access in 'process'
def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
// Code that uses myMap.get("key")
}
myStream.foreachRDD((x, y) => process((x, y)))
我读了这篇关于spark和kafka整合模式的文章。据我所知, foreachRDD
在驱动程序上本地执行,但随后任何内部循环都会分布到集群节点。那是不是意味着我应该广播 myMap
出于性能原因?
2条答案
按热度按时间qjp7pelc1#
但是任何内部循环都被分配到集群节点
分布式的不是“任何内部循环”,而是rdd上的操作。
所以这取决于
myMap.get("key")
在内部使用process
. e、 g.这里有一个广播完全没有意义的例子:fhg3lkii2#
这是否意味着出于性能原因我应该广播mymap?
好:
如果不这样做,则以序列化形式缓存以这种方式广播的数据,并在运行每个任务之前进行反序列化。
如果你这样做了,spark会在每个执行者身上保留一份副本,并在需要时重用。
因为变量可以在多个任务之间重用,所以广播是有价值的,特别是当数据大到足以增加大量开销时。
如果不是这样的话,最好还是坚持闭包序列化以获得更好的可读性(我承认这是一个优先考虑的问题)。