Apache Spark 使用`df.select(column).distinct().collect()`获取嵌套中的唯一值

inn6fuwd  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(118)

根据我对spark工作原理的有限理解,当.collect()操作被调用时,列column中的数据将被分区,在执行器之间拆分,.distinct()转换将应用于这些分区中的每一个,并且重复数据消除的结果将发送到驱动程序。但是,在驱动程序中是否存在记录重复的可能性(因为重复数据删除是在每个执行器上独立进行的)?我们是否需要在收集的结果上再次应用.distinct()来消除重复数据?

jm2pwxwz

jm2pwxwz1#

你的想法是正确的,但你错过了一个步骤,那就是reduce阶段。Spark执行聚合非常类似于MapReduce。在MapReduce中,不同的聚合有3个步骤
1.数据由每个Map器读取(如您所说,在执行器之间拆分)
1.合并器执行不同的(在Map器进程中的stll)在所有Map器完成后,我们处理:
1.(缺少步骤)启动一个新的reducer进程(仍然在集群中),该进程聚合来自每个mapper的distinct列表并再次执行distinct。
1.将结果发送给客户端。
Spark做同样的事情,但与MapReduce不同的是,Spark使用执行器来执行所有的部分(Map器/组合器/缩减器)。

df = spark.createDataFrame([[i] for i in [1,2,2,3,3,3,1,4,5]], ['n'])
df.show()
+---+                                                                           
|  n|
+---+
|  1|
|  2|
|  2|
|  3|
|  3|
|  3|
|  1|
|  4|
|  5|
+---+
df_distinct = df.distinct()
df_distinct.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[n#0L], functions=[])
   +- Exchange hashpartitioning(n#0L, 200), ENSURE_REQUIREMENTS, [id=#109]
      +- HashAggregate(keys=[n#0L], functions=[])
         +- InMemoryTableScan [n#0L]
               +- InMemoryRelation [n#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Scan ExistingRDD[n#0L]
print(df_distinct.collect())
[Row(n=5), Row(n=1), Row(n=3), Row(n=2), Row(n=4)]

为了理解Spark的作用,让我们看看df.distinct()的物理计划:
围绕计划的这三条线(顺序是自下而上)

(3) +- HashAggregate(keys=[n#0L], functions=[])
(2)   +- Exchange hashpartitioning(n#0L, 200), ENSURE_REQUIREMENTS, [id=#109]
(1)      +- HashAggregate(keys=[n#0L], functions=[])

DataFrame/RDD已经分区并驻留在执行器中
(1)HashAggregate -这一步在分区级别执行第一个distinct
(2)Exchange hashpartitioning -此阶段将数据混洗并将其获取到单个执行器
(3)HashAggregate -这一步在不同列表的列表上执行第二个不同的操作
然后,collect()函数将非重复列表返回给驱动程序。

相关问题