我理解广播优化的概念。当连接中的一方有小数据时,最好只对小的一方进行 Shuffle 。但是为什么不可能只使用执行器来进行 Shuffle 呢?为什么我们需要使用驱动程序呢?如果每个执行者持有哈希表来Map执行者之间的记录,我认为它应该工作。在spark broadcast的当前实现中,它收集数据到驱动程序,然后将其 Shuffle ,收集驱动程序的操作是我想避免的瓶颈。有没有什么想法,如何实现类似的优化,而没有瓶颈的驱动程序内存?
sigwle7e1#
你是对的,当前的实现需要在将数据发送到Executors之前将数据收集到驱动程序。已经有一个JIRA ticket SPARK-17556解决了您的建议:“目前在Spark SQL中,为了执行广播连接,驱动程序必须收集RDD的结果,然后广播它。这会引入一些额外的延迟。可能直接从执行器广播。”我从attached document中复制了这个解决方案,使这个答案自我描述:“要在RDD中添加一个broadcast方法来执行executor的broadcast,我们需要一些支持工作,如下所示:1.从驱动程序中构造BroadCastId,BroadCastManager将提供一个方法来执行此操作。
// Called from driver to create new broadcast id def newBroadcastId: Long = nextBroadcastId.getAndIncrement()
字符串
1条答案
按热度按时间sigwle7e1#
你是对的,当前的实现需要在将数据发送到Executors之前将数据收集到驱动程序。
已经有一个JIRA ticket SPARK-17556解决了您的建议:
“目前在Spark SQL中,为了执行广播连接,驱动程序必须收集RDD的结果,然后广播它。这会引入一些额外的延迟。可能直接从执行器广播。”
我从attached document中复制了这个解决方案,使这个答案自我描述:
“要在RDD中添加一个broadcast方法来执行executor的broadcast,我们需要一些支持工作,如下所示:
1.从驱动程序中构造BroadCastId,BroadCastManager将提供一个方法来执行此操作。
字符串
1.在TorrentBroadcast中,writeBlocks将块写入hdfs,readBlocks按优先级从本地,远程,hdfs读取块。
1.在构造Broadcast时,可以控制是否上传广播数据块