据我所知,每人将有一份工作 action
在Spark里。
但我经常看到,一个动作触发的工作不止一个。我试图通过在数据集上进行简单的聚合来测试这一点,以从每个类别(这里是“subject”字段)中获得最大值
在检查spark ui时,我可以看到有3个“作业”为 groupBy
手术,而我只期待一个。
有人能帮我理解为什么只有3而不是1吗?
students.show(5)
+----------+--------------+----------+----+-------+-----+-----+
|student_id|exam_center_id| subject|year|quarter|score|grade|
+----------+--------------+----------+----+-------+-----+-----+
| 1| 1| Math|2005| 1| 41| D|
| 1| 1| Spanish|2005| 1| 51| C|
| 1| 1| German|2005| 1| 39| D|
| 1| 1| Physics|2005| 1| 35| D|
| 1| 1| Biology|2005| 1| 53| C|
| 1| 1|Philosophy|2005| 1| 73| B|
// Task : Find Highest Score in each subject
val highestScores = students.groupBy("subject").max("score")
highestScores.show(10)
+----------+----------+
| subject|max(score)|
+----------+----------+
| Spanish| 98|
|Modern Art| 98|
| French| 98|
| Physics| 98|
| Geography| 98|
| History| 98|
| English| 98|
| Classics| 98|
| Math| 98|
|Philosophy| 98|
+----------+----------+
only showing top 10 rows
在检查spark ui时,我可以看到有3个“作业”为 groupBy
手术,而我只期待一个。
有人能帮我理解为什么只有3而不是1吗?
== Physical Plan ==
* (2) HashAggregate(keys=[subject#12], functions=[max(score#15)])
+- Exchange hashpartitioning(subject#12, 1)
+- *(1) HashAggregate(keys=[subject#12], functions=[partial_max(score#15)])
+- *(1) FileScan csv [subject#12,score#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/lab/SparkLab/files/exams/students.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<subject:string,score:int>
2条答案
按热度按时间m2xkgtsf1#
我建议你检查一下身体计划-
你可能会看到-
[Map阶段]阶段#1是实现局部聚合(部分聚合),然后使用
hashpartitioning(subject)
. 注意hashpartitioner使用group by
柱[减少阶段]阶段#2是合并阶段#1的输出以获得最终结果
max(score)
这实际上是用来打印前10条记录的show(10)
vh0rcniy2#
我认为只有#3完成了实际的“工作”(执行一个计划,如果您打开sql选项卡上的查询详细信息,您将看到该计划)。另外两个是准备步骤--
1正在查询要生成的namenode
InMemoryFileIndex
读取csv,以及2正在对要执行的数据集进行采样
.groupBy("subject").max("score")
内部需要sortByKey
(这里有更多的细节)。