1、Spark 基础
1.1 Spark 概述
Spark于2009年诞生于美国加州大学伯克利分校的AMP实验室,它是一个可应用于大规模数据处理的统一分析引擎。Spark不仅计算速度快,而且内置了丰富的API,使得我们能够更加容易编写程序。
Spark是基于内存计算的大数据并行计算框架,适用于各种各样的分布式平台的系统。在Spark生态圈中主要包含了 Spark Core、Spark SQL、Spark Streaming、GraphX、 MLlib、独立调度器 等组件。
- Spark Core: Spark核心组件,实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含对弹性分布式数据集的API定义。
- Spark SQL: 用来操作结构化数据的核心组件,通过Spark SQL可直接查询Hive、HBase等多种外部数据源中的数据。Spark SQL的重要特点是能够统一处理关系表RDD。
- Spark Streaming: Spark提供的流式计算框架,支持高吞吐量、可容错处理的实时流式数据处理,其核心原理是将流数据分解成一系列短小的批处理作业。
- MLlib: Spark提供的关于机器学习功能的算法程序库,包括分类、回归、聚类、协同过滤算法等,还提供了模型评估、数据导入等额外的功能。
- GraphX: Spark提供的分布式图处理框架,拥有对图计算和图挖掘算法的API接口及丰富的功能和运算符,便于对分布式图处理的需求,能在海量数据上运行复杂的图算法。
- 独立调度器、Yarn、Mesos: 集群管理器,负责Spark框架高效地在一个到数千个节点之间进行伸缩计算的资源管理。
1.2 Spark 特点
Spark计算框架在处理数据时,所有的中间数据都保存在内存中,从而减少磁盘读写操作,提高框架计算效率。同时Spark还兼容HDFS、Hive,可以很好地与Hadoop系统融合,从而弥补MapReduce高延迟的性能缺点。所以说,Spark是一个更加快速、高效的大数据计算平台。
- 速度快: 与Hadoop相比,基于内存运算效率100倍+,基于硬盘运算效率10倍+。
- 易用性: 支持多种语言scala、python、R、java,支持交互式Shell。
- 通用性: Spark SQL、Spark Streaming、Spark MLlib、GraphX,它们在同一应用程序中可以无缝拼接,较少开发成本与精力。
- 兼容性: 可以运行在Hadoop模式、Mesos模式、Standalone独立模式、Cloud,且 可以访问多种数据源,本地、HDFS、Hbase、Hive。
1.3 Spark 应用场景
- 数据科学: 数据工程师可以利用Spark进行数据分析与建模,由于Spark具有良好的易用性,数据工程师只需要具备一定的SQL语言基础、统计学、机器学习等方面的经验,以及使用Python、Matlab或者 R语言的基础编程能力,就可以使用Spark进行上述工作。
- 数据处理: 大数据工程师将Spark技术应用于广告、报表、推荐系统等业务中,在广告业务中,利用Spark系统进行应用分析、效果分析、定向优化等业务,在推荐系统业务中,利用Spark内置机器学习算法训练模型数据,进行个性化推荐及热点点击分析等业务。
1.4 Spark 与 Hadoop 对比
- 编程方式: Hadoop的MapReduce计算数据时,要转化为Map和Reduce两个过程,从而难以描述复杂的数据处理过程;而Spark的计算模型不局限于Map和Reduce操作,还提供了多种数据集的操作类型,编程模型比MapReduce更加灵活。
- 数据存储: Hadoop的MapReduce进行计算时,每次产生的中间结果都存储在本地磁盘中;而Spark在计算时产生的中间结果存储在内存中。
- 数据处理: Hadoop在每次执行数据处理时,都要从磁盘中加载数据,导致磁盘IO开销较大;而Spark在执行数据处理时,要将数据加载到内存中,直接在内存中加载中间结果数据集,减少了磁盘的IO开销。
- 数据容错: MapReduce计算的中间结果数据,保存在磁盘中, Hadoop底层实现了备份机制,从而保证了数据容错;Spark RDD实现了基于Lineage(血缘)的容错机制和设置检查点方式的容错机制,弥补数据在内存处理时,因断电导致数据丢失的问题。
Spark仅是一种计算框架,不负责数据的存储和管理,因此,通常都会将Spark和Hadoop进行统一部署,由Hadoop中的HDFS、HBase等组件负责数据的存储管理,Spark负责数据计算。
关于Spark的安装可以参考下面这篇博文:Hadoop集群搭建及配置⑦—— Spark&Scala安装配置。
2、Spark运行架构与原理
2.1 Spark运行架构
Spark运行架构主要由 SparkContext、Cluster Manager和Worker 组成,其中ClusterManager负责整个集群的统一资源管理,Worker节点中的Executor是应用执行的主要进程,内部含有多个Task线程以及内存空间.
2.2 Spark运行基本流程。
通过下图深入了解Spark运行基本流程。
左框图:当一个Spark应用被提交时,根据提交参数创建Driver
进程, Driver进程初始化SparkContext
对象,由SparkContext负责和Cluster Manager
的通信以及资源的申请、任务的分配和监控等。
- 注册并分配资源:Driver进程向Cluster Manager申请资源(图1弧) , Cluster Manager接收到Application的注册请求后,会使用自己的资源调度算法,在Spark集群的Worker节点上 (图2弧), 通知
Worker
为应用启动多个Executor
。 - 资源反馈: Executor(执行器) 创建后,会向Cluster Manager进行资源及状态的反馈 (图3弧) , 便于Cluster Manager对Executor进行状态监控,如果监控到Executor失败,则会立刻重新创建。
- 反向注册: Executor 会向 SparkContext 反向注册申请Task (图4弧) 。
- 提交Task: Task Scheduler将Task发送给Worker进程中的Executor运行 (图5弧),并提供应用程序代码 。
- 资源注销:当程序执行完毕后写入数据, Driver向Cluster Manager注销申请的资源 (图6弧)。
3、Spark和的MapReduce作业运行模型
Spark是多线程运行, MapReduce多进程运行。
3.1 线程优缺点
线程优点:
1、任务启动速度快;
2、多个线程共享进程中的内存->(适合于内存密集型的任务);
3、通过多个线程多个任务可以同时执行。
线程缺点:
1、同一个节点上所有任务都运行在JVM进程中,出现严重的资源竞争,很难控制每个占用资源的情况,而MapReduce可以单独给map或者reduce设置资源,可以很方便控制资源的占用情况,因此有利于大作业的稳定运行;
2、MapReduce任务是进程级别的,所以挂掉无所谓,但是spark某一个进程挂掉,里面所有线程的任务都会受到影响。因此Spark不利于大作业的稳定运行,所以spark的稳定性没有MapReduce好,Spark作业适合于低延迟类型的作业。
多进程:方便控制资源,进程是一个独享的空间,但是启动比较费时间,因此不适合低延迟类型的作业,导致MapReduce时效性差。
小知识点:
内存:10G,8个map 1G,2个reduce 1G。
MapReduce : 如果map 执行完之后,yarn去看任务运行的资源情况,这个任务占用多少资源?
2G,因为map,reduce是两个独立的进程,map处理完之后释放了。
spark:1个executor 10G,8个map 1G,2个reduce 1G。map执行完之后,资源占用情况?
10G,只有任务执行完毕才释放;只是map执行完,reduce没有执行完毕。
3.2 线程和进程的区别
1、线程是进程的子集,简单理解 教室(进程),桌椅(线程);
2、线程启动快于进程,线程能够快速切换。
Spark的每一个进程包含Executor对象,其内部有一个线程池,每一个线程执行一个task;线程池作用:省去进程频繁启停的开销。
task并发度的理解: 一个节点表示一个机器,每个节点可以启动一个或者多个Executor;每个Executor,由若干个core组成,每一个core执行一个task,这里的core理解为线程,每次执行一个线程。每个task最终执行的结果,生成RDD,RDD内部有多个partition组成,每个partition由一个task进行处理。
节点 >> Executor >> core – task(线程) – RDD >> partition。
4、Spark 核心
4.1 RDD介绍
RDD: 弹性分布式数据集(相当于集合)
弹性(内存和磁盘切换):RDD的数据是默认存放在内存中,但是内存资源存在不足的情况,spark会将RDD数据写入磁盘。
容错性:如果任务执行失败,可以自动从失败节点进行恢复,由于某个节点宕机了导致数据丢失,RDD会根据自己的数据来源重新计算一遍,计算失败的partition的数据。
Spark针对RDD提供两类操作:transformations和action
transformations是RDD之间的变换,action会对数据执行一定的操作;transformations采用懒策略,仅在对相关RDD进行action提交时才触发计算。
详细可以参考:Spark RDD弹性分布式数据集——理论
4.2 RDD 窄依赖和宽依赖
- 窄依赖: 每一个父RDD的Partition最多被子RDD的一个Partition使用 (一对一的关系);
常见算子:map、flatmap、filter、union、sample 等等。 - 宽依赖: 指的是多个子RDD的Partition会依赖同一个父RDD的Partition (一对多的关系);
常见算子:groupByKey、reduceByKey、sortByKey、join等等。
DAG: 表示整个Spark的执行流程。
stage:spark中划分stage是通过宽依赖进行的,遇见宽依赖就切分,每个stage内部能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化。
宽依赖和窄依赖的作用:
1、stage划分;
2、容错:针对复杂业务逻辑,当执行到宽依赖的时候,进行适当的cache,减少因为任务异常导致数据重跑;
3、代码优化。
4.3 spark内存模型
executor 分为三个部分:
- storage内存(60%):存储cache , persist;broadcast数据(将数据广播到所有节点上面,但是有一个前提,该数据在内存中)。
- executor内存(20%): 执行内存。join、agg算子,在这块内存中进行执行,或者缓存shuffle的数据,如果内存满了写到磁盘。
- other内存(20%):executor留给自己的内存。
4.4 Spark 资源参数调优
- driver-memory: 通常不用设置,一般1G就够了,若出现使用collect算子将RDD数据全部拉取到Driver上处理,就必须确保该值足够大,否则OOM内存溢出。
- spark.default.parallelism:每个stage的默认task数量。 建议:设置500~1000较合适,默认一个HDFS的block对应一个task,Spark默认值偏少,这样导致不能充分利用资源、
- spark.storage.memoryFraction:设置RDD持久化数据在executor内存中能占的 比例, 默认0.6,即默认executor 60%的内存可以保存持久化RDD数据。 建议:若有较多的持久化操作,可以设置高些,超出内存的会频繁gc导致运行缓慢。
- spark.shuffle.memoryFraction:聚合操作占executor内存的比例,默认0.2。建议:若持久化操作较少,但shuffle较多时,可以降低持久化内存占比,提高shuffle操作内存占比。
4.5 Spark 开发调优
- 避免创建重复的RDD。对同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据,极大浪费内存。
- 尽可能复用同一个RDD。比如:一个RDD数据格式是key-value,另一个是单独value类型,这两个RDD的value部分完全一样,这样可以复用达到减少算子执行次数。
- 对多次使用的RDD进行持久化处理。每次对一个RDD执行一个算子操作时,都会重新从源头处理计算一遍,计算出那个RDD出来, 然后进一步操作,这种方式性能很差。对多次使用的RDD进行持久化,将RDD的数据保存在内存或磁盘中,避免重复劳动借助cache()和persist()方法。
- 避免使用shuffle类算子。在spark作业运行过程中,最消耗性能的地方就是shuffle过程,比如 groupByKey、reduceByKey、join等算子,都会触发shuffle。