pyspark 为什么广播连接采集数据到驱动程序以打乱数据?

wkyowqbh  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(133)

我理解广播优化的概念。
当连接中的一方有小数据时,最好只对小的一方进行 Shuffle 。
但是为什么不可能只使用执行器来进行 Shuffle 呢?为什么我们需要使用驱动程序呢?
如果每个执行者持有哈希表来Map执行者之间的记录,我认为它应该工作。
在spark broadcast的当前实现中,它收集数据到驱动程序,然后将其 Shuffle ,收集驱动程序的操作是我想避免的瓶颈。
有没有什么想法,如何实现类似的优化,而没有瓶颈的驱动程序内存?

sigwle7e

sigwle7e1#

你是对的,当前的实现需要在将数据发送到Executors之前将数据收集到驱动程序。
已经有一个JIRA ticket SPARK-17556解决了您的建议:
“目前在Spark SQL中,为了执行广播连接,驱动程序必须收集RDD的结果,然后广播它。这会引入一些额外的延迟。可能直接从执行器广播。”
我从attached document中复制了这个解决方案,使这个答案自我描述:
“要在RDD中添加一个broadcast方法来执行executor的broadcast,我们需要一些支持工作,如下所示:
1.从驱动程序中构造BroadCastId,BroadCastManager将提供一个方法来执行此操作。

// Called from driver to create new broadcast id
def newBroadcastId: Long = nextBroadcastId.getAndIncrement()

字符串

  1. BroadCastManager可以创建一个具有指定ID和持久标记的广播,以推断此广播是执行器广播,并且其数据将备份在hdfs上。
    1.在TorrentBroadcast中,writeBlocks将块写入hdfs,readBlocks按优先级从本地,远程,hdfs读取块。
    1.在构造Broadcast时,可以控制是否上传广播数据块
  2. BroadCastManager发布API以将广播数据放入块管理器

相关问题