根据我对spark工作原理的有限理解,当.collect()
操作被调用时,列column
中的数据将被分区,在执行器之间拆分,.distinct()
转换将应用于这些分区中的每一个,并且重复数据消除的结果将发送到驱动程序。但是,在驱动程序中是否存在记录重复的可能性(因为重复数据删除是在每个执行器上独立进行的)?我们是否需要在收集的结果上再次应用.distinct()
来消除重复数据?
根据我对spark工作原理的有限理解,当.collect()
操作被调用时,列column
中的数据将被分区,在执行器之间拆分,.distinct()
转换将应用于这些分区中的每一个,并且重复数据消除的结果将发送到驱动程序。但是,在驱动程序中是否存在记录重复的可能性(因为重复数据删除是在每个执行器上独立进行的)?我们是否需要在收集的结果上再次应用.distinct()
来消除重复数据?
1条答案
按热度按时间jm2pwxwz1#
你的想法是正确的,但你错过了一个步骤,那就是reduce阶段。Spark执行聚合非常类似于MapReduce。在MapReduce中,不同的聚合有3个步骤
1.数据由每个Map器读取(如您所说,在执行器之间拆分)
1.合并器执行不同的(在Map器进程中的stll)在所有Map器完成后,我们处理:
1.(缺少步骤)启动一个新的reducer进程(仍然在集群中),该进程聚合来自每个mapper的distinct列表并再次执行distinct。
1.将结果发送给客户端。
Spark做同样的事情,但与MapReduce不同的是,Spark使用执行器来执行所有的部分(Map器/组合器/缩减器)。
为了理解Spark的作用,让我们看看df.distinct()的物理计划:
围绕计划的这三条线(顺序是自下而上)
DataFrame/RDD已经分区并驻留在执行器中
(1)HashAggregate -这一步在分区级别执行第一个distinct
(2)Exchange hashpartitioning -此阶段将数据混洗并将其获取到单个执行器
(3)HashAggregate -这一步在不同列表的列表上执行第二个不同的操作
然后,collect()函数将非重复列表返回给驱动程序。