Spark之RDD算子

x33g5p2x  于2021-09-19 转载在 Spark  
字(5.0k)|赞(0)|评价(0)|浏览(630)

🐱今天出一期spark系列的硬货,即RDD算子,所谓算子,就是对某些事物的操作,或者说是方法。本期主要介绍几十个RDD算子,根据他们的特点,逐一进行介绍,有关spark的往期内容大家可以查看下面的内容👇:

  • 链接: Spark之处理布尔、数值和字符串类型的数据.
  • 链接: Spark之Dataframe基本操作.
  • 链接: Spark之处理布尔、数值和字符串类型的数据.
  • 链接: Spark之核心架构.

❤️记得我们前面说过,spark存在着惰性评估的机制,所谓惰性评估,就是等到绝对需要时才执行计算。当用户表达一些对数据的操作时,不是立即修改数据,而是建立一个作用到原始数据的转换计划,直到最后才开始执行代码。这里我们将RDD分为2种,一种是转换算子,一种是行动算子。

1. RDD 转换算子

转换算子,故名思义,就是对数据进行转换的算子,并不不能立马执行,而是定义逻辑,根据数据处理方式的不同将算子整体上分为Value 类型、双 Value 类型和 Key-Value类型。

1.1 Value 类型

  1. map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。(一个一个执行,效率不高)

  1. mapPartitions

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。(效率较高,得到一个分区后的数据才开始计算,但是对内存需求较高)

map 和 mapPartitions 的区别?

  • Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
  • Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
  • Map 算子因为类似于串行操作,所以性能比较低,而是mapPartitions 算子类似于批处理,所以性能较高。但是mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
  1. mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

  1. flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射。(将整体映射成一个一个个体,如: List(List(1,2),3,List(4,5))转换为 List(List(1),List(2),List(3),List(4),List(5))

  1. glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。(比如将int类型的【1,2】【3,4】这两个分区内的数据转化为array类型的【1,2】【3,4】每个分区内的数据转化为了数组类型)

  1. groupby

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中。(分组和分区没有本质的关系!)

解释一下:(1,2)一个分区,(3,4)一个分区,但是经过groupby之后,我们发现(1,3)一个分区,(2,4)一个分区,但总体上还是两个分区。

  1. filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

  1. sample

根据指定的规则从数据集中抽取数据。

//sample的三个参数
withReplacement: Boolean,#是否又放回抽样
fraction: Double,#抽取的几率
seed: Long = Utils.random.nextLong#随机数种子
  1. distinct

将数据集中重复的数据去重,去重的方式是通过将数值map成键值对的形式然后通过reducebykey聚合,最后选出聚合结果。

  1. coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率,当 spark 程序中,存在过多的小任务的时候,可以通过coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本。(该方法不会打乱数据,可能会导致数据倾斜。也可以设置成shuffle,也可以扩大分区,但是需要shuffle,扩大分区时等于repartition)

  1. repartition

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。(将分区数由少变多)

  1. sortBy

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。
(例如:有一组数据1,2,3,4,1,2 其中123时一个分区,412是一个分区,排序后的结果是112,234这两个分区。)

1.2 双Value 类型

双Value 类型故名思义,就是传递两个数据源的算子,这里就会涉及到交集并集差集的概念。(交,并,差集都需要两个rdd数据类型一样)

  1. intersection

对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

  1. union

对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

  1. subtract

以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

  1. zip

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。(要求分区数量一样,每个分区中的数据也一样)

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.zip(dataRDD2)
结果为(1,3)(2,4)(3,5)(4,6)

1.3 Key - Value 类型

  1. partitionBy

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是HashPartitioner(数据类型一定需要是Key - Value类型的数据,是将数据进行重新的分区,分区数量不变。)

  1. reduceByKey

可以将数据按照相同的 Key 对 Value 进行聚合(相同的key分在一个组里面进行聚合,原理是两两聚合,如果key的值只有一个,那就不会进行聚合)

  1. groupByKey

将数据源的数据根据 key 对 value 进行分组,将相同的key放在一个组中,形成一个对偶元组(什么是对偶元组,即元组的第一个值是key值,元组的第二个值是相同key的value集合。)

reduceByKey 和 groupByKey 的区别?

我们今天就来从深层次来讲讲groupByKey和reduceByKey的相同与不同点。
先说一下groupByKey的原理:

解释:在groupByKey会将分区内的数据打乱,因此存在着shuffle操作,spark中的shuffle操作必须落盘处理,也就是写进磁盘中进行存储,否则很容易造成内存溢出,shuffle性能不够高,如果后续需要实现reducebykey一样的聚合操作,可以使用map函数来实现。

reduceByKey的原理:

解释:reducebykey可以将数据在分区内就进行聚合操作,使得shuffle落盘的数据大大减少,增强shuffle效率。

总结:

  • shuffle角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的 数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
  • 从功能的角度:reduceByKey 其实包含分组和聚合的功能,GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey。
  1. aggregateByKey

将数据根据不同的规则进行分区内计算和分区间计算,啥叫分区内和分区间呢?我给大家解释一下:其实在前面的reducebykey中,分区内指的就是一个分区内部的数据可以进行聚合操作(不仅仅限于聚合),分区外,指的是不同分区之间的数据也可以进行聚合操作(不仅仅限于聚合)。
aggregateByKey就是这样一个函数,可以将分区内和分区外的逻辑操作分开来计算,例如分区内进行取最大值,分区外求和,这时就可以使用该函数。

  1. foldByKey

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey(计算规则相同时,简化aggregateByKey操作)

  1. combineByKey

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

  1. join

在两个数据源上在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD,如果两个数据源中没有相同的K,则结果中不会出现该(K,W)。

  1. leftOuterJoin

类似于 SQL 语句的左外连接

  1. cogroup

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD (可以理解为先连接后分组)

2. RDD 行动算子

前面终于把转换算子讲完了,收获就是对shuffle过程有了更深层次的认识。这一部分我们来讲行动算子,所谓行动算子,就是使用了该算子后,将会触发整个流程的执行。

  1. reduce

聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

  1. collect

在驱动程序中,以数组 Array 的形式返回数据集的所有元素(会将不同分区内的数据按照分区顺序采集到driver端的内存中形成数组。)

  1. count

返回 RDD(数据源) 中元素的个数

  1. first

返回 RDD(数据源) 中的第一个元素

  1. take

返回一个由 RDD 的前 n 个元素组成的数组

  1. takeOrdered

返回该 RDD 排序后的前 n 个元素组成的数组(先排序,再取数)

  1. aggregate

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合(例如;【1,2】,【3,4】两个分区,初始值为10,那么该函数就会 (1+2+10)+(3+4+10)+10计算)

  1. fold

折叠操作,aggregate 的简化版操作(aggregate当分区内和分区间的计算规则相同时可以简化。)

  1. countByKey

统计每种 key 的个数

  1. save 相关算子

将数据保存到不同格式的文件中

// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")
  1. foreach

分布式遍历 RDD 中的每一个元素,调用指定函数

// 收集后打印
rdd.map(num=>num).collect().foreach(println)
// 分布式打印
rdd.foreach(println)

3. 参考文献

这些spark函数真的是一个一个学习的,因为不太会Scala,所以只能从分布式的角度来理解他,这些函数对学会分布式的原理太有帮助了。

  • 《Spark权威指南》
  • 《Hadoop权威指南》
  • 《尚硅谷spark教材》
  • 《大数据hadoop3.X分布式处理实战》
  • 《Pyspark实战》

相关文章