Spark之Spark Core

x33g5p2x  于2020-10-30 发布在 SparkCore  
字(19.4k)|赞(0)|评价(0)|浏览(1099)

1**、Spark概述**

进入Spark的官方网站,可以看到Spark的定义,如下图所示:

Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆等等。当前,百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群已达到8000台的规模,是当前已知的世界上最大的Spark集群。

为什么要学习Spark呢?对比Apache Hadoop(MapReduce)、Apache Storm以及Spark的一些特性,可见一斑,如下表:
特性

Apache Spark

Apache Hadoop

Apache Storm****数据存储

内存和可配置的分布式文件系统备份,可以是HDFS或本地磁盘上的多个文件夹或Tachyon。

HDFS——Hadoop分布式文件系统。

内存。用例

批处理(微小批处理),实时数据处理,迭代和交互式分析。

批处理。

实时(每个消息到达即被处理)。容错

捕获应用于原始数据的计算以实现当前状态,并且在发生任何故障的情况下,它将同样的计算集合应用于原始数据,也被称为数据沿袭。

在不同节点上维护相同数据集的多个副本。

Storm支持事务拓扑,这允许完全容错的一次性消息传递语义。编程语言

Java、Scala、Python以及R语言。

Java。

定义Thrift接口,能够用任何语言实现以定义和提交拓扑。硬件

通用商业硬件

通用商业硬件

通用商业硬件管理

易于管理,因为所有实时或批处理用例都可以部署在单个框架中。

仅适用于批处理用例。

仅适用于实时处理用例。部署

Spark是一个通用的集群计算框架,可以在独立模式下部署,也可以在各种其他框架(如Yarn或Mesos)上部署。

Apache Hadoop有自己的部署模型,不能部署在其他分布式集群计算框架上。

Storm有自己的部署模型,不能部署在任何其他分布式集群计算框架上。效率

比Hadoop快10倍,因为数据在内存本身进行读/写。

从HDFS读取/写入数据时速度较慢。

能够于数据在内存本身读/写的同时,在几秒和几毫秒内处理消息。分布式

缓存

通过在分布式工作者的内存中缓存部分结果了,确保更低的延迟计算。

完全取决于磁盘。

Storm没有任何分布式缓存功能,但可以使用Memcached或Redis等框架。易用性

支持如Scala和Python这样的功能性语言,生成精简代码。

仅支持Java,因此MapReduce显得冗长和复杂。

支持多种语言。高级操作

提供如map、reduce、flatmap、group、sort、union、intersection、aggregation、Cartesian等常见操作。

不提供除map和reduce之外的任何预定义操作。

Storm核心框架不提供任何操作,但Trident(扩展到Storm)可用于执行廉洁、聚合、分组等操作。API及扩展性

提供对核心API的定义清晰的抽象,可以扩展到在Spark Core上开发扩展/库,例如Streaming、GraphXM等。

提供严格的API,不允许多样性扩展。

Storm是一个支持扩展开发的近实时处理框架,例如Trident就是在Storm上开发的此类框架。安全性

通过共享密钥提供基本安全性身份验证。

Apache Hadoop有自己强大且成熟的安全框架,使用Kerberos和LDAP进行身份验证和授权。

Storm 0.10+提供了可插拔认证和授权框架。

2、Spark的特点

A**、快**

Hadoop的MapReduce相比,Spark基于内存的运算速度要快100倍以上,即使Spark基于硬盘运算也要快10倍。Spark实现了高效的DAG执行引擎,从而可以通过内存来高效处理数据流。官网介绍如下图所示:

B**、易用**

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。官网介绍如下图所示:

C**、通用**

Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物理成本。官网介绍如下图所示:

此外,Spark还可以很好地融入Hadoop的体系结构中,可以直接操作HDFS,并提供Hive on Spark、Pig on Spark的框架集成Hadoop。

D**、兼容性**

Spark可以非常方便地与其他的开源产品进行融合,比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。官网介绍如下图所示:

3**、Spark应用场景**

Apache Spark是作为一个通用的集群计算框架开发的,适用于各种用例。在一些使用情况下,它可以工作到最好,对于其他一些情况,它仍可以工作,但不一定是一个理想的选择。下面是一些Apache Spark工作效果最佳的用例,仅供参考:

批处理:Spark是一个非常适合大多数批处理用例的通用集群计算框架,如日志分析、定价分析和理赔处理这些用例都是能够很好地采用Apache Spark实现的范例;
*
流式处理:处理流数据类似于实时或接近实时的数据处理,Spark Streaming是一个通过核心Spark框架开发的扩展,可以用于实现流用例,例如IoT、在线发布等实时或接近实时的用例,是可以采用Spark和Spark Streaming实现的几个范例;
*
数据挖掘:数据挖掘是计算机科学中的一个专门的分支,主要是识别所提供数据中的隐藏模式,涉及聚类、分类、推荐等多种相关迭代和机器学习算法的实现,Spark为此提供了更多的扩展方案;
*
MLlib:MLlib是通过核心Spark框架开发的,其中提供了各种机器学习算法的分布式实现,Spark机器学习库可以用于开发提供预测智能、营销目标的客户细分、推荐引擎和情绪分析这些功能的应用程序;
*
图形计算:这是计算机科学中的另一个专业领域,其重点是不同实体之间的关系。该结构是顶点和边的形式,其中顶点是实体本身,边是这些实体之间的关系。图形计算对于模拟各种现实世界中数据不断发展的用例非常有用,这些用例包含社交网络、网络管理、公共交通链接和路线图等,Spark为此提供了更多的扩展实现方案选择;
*
GraphXM:GraphXM可用来对顶点和边缘形式组成的数据进行建模和结构化,并提供图形并行计算支持了,还提供了多种图形算法的实现;
*
交互式分析:在此过程中,要跨行业收集历史数据,并且工程师对用于交互式分析的数据唾手可得。特定查询变得越来越重要,这不仅是存储更便宜的缘故,还源于敏捷、迅速和定性的决策对更快响应时间的实时需求。大规模交互式数据分析的执行需要高度的并行性,Spark提供了额外的扩展——Spark SQL,其有助于行和列中数据的结构化处理,并且还提供了用于自组织和交互式数据分析的分布式查询引擎。

4**、Spark的体系结构**

Apache Spark由SparkCore、Spark SQL、Spark Streaming、GraphX、MLlib等模块组成,模块间的整体关系如下图所示:

SparkCore

其中,SparkCore是Apache Spark的核心,是其他扩展模块的基础运行时环境。SparkCore的功能主要包括以下几个:

基础设施;
*
SparkConf:用于管理Spark应用程序的各种配置信息;
*
内置的基于Netty的RPC框架,包括同步和异步的多种实现,RPC框架是Spark各组件之间进行通信的基础;
*
事件总线:SparkContext内部各组件之间使用事件——监听器模式异步调用的实现;
*
度量系统:由Spark中的多种度量源(Source)和多种度量输出(Sink)构成,完成对整个Spark集群中各组件运行期状态的监控;
*
SparkContext:通常而言,用户开发的Spark应用程序的提交与执行都离不开SparkContex的支持。在正式提交应用程序之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发;
*
SparkEnv:Spark执行环境SparkEnv,是Spark中的Task运行所必需的组件。SparkEnv内部封装了RPC环境(RpcEnv)、序列化管理器、广播管理器(BroadcastManager)、map任务输出跟踪器(MapOutputTracker)、存储体系、度量系统(MetricsSystem)、输出提交协调器(OutputCommitCoordinator)等Task运行所需的各种组件;
*
存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘I/O,提升了任务执行的效率,使得Spark适用于实时计算、迭代计算、流式计算等场景。在实际场景中,有些Task是存储密集型的,有些则是计算密集型的,所以有时候会造成存储空间很空闲,而计算空间的资源又很紧张。Spark的内存存储空间与执行存储空间之间的边界可以是“软”边界,因此资源紧张的一方可以借用另一方的空间,这既可以有效利用资源,又可以提高Task的执行效率。此外,Spark的内存空间还提供了Tungsten的实现,直接操作操作系统的内存。由于Tungsten省去了在堆内分配Java对象,因此能更加有效地利用系统的内存资源,并且因为直接操作系统内存,空间的分配和释放也更迅速;
*
调度系统:调度系统主要由DAGScheduler和TaskScheduler组成,它们都内置在SparkContext中。DAGScheduler负责创建Job、将DAG中的RDD划分到不同的Stage、给Stage创建对应的Task、批量提交Task等功能。TaskScheduler负责按照FIFO或者FAIR等调度算法对批量Task进行调度;为Task分配资源;将Task发送到集群管理器的当前应用的Executor上,由Executor负责执行等工作。即使现在Spark增加了SparkSession和DataFrame等新的API,但这些新API的底层依然依赖于SparkContext;
*
计算引擎:计算引擎由内存管理器(MemoryManager)、Tungsten、任务内存管理器(TaskMemory-Manager)、Task、外部排序器(ExternalSorter)、Shuffle管理器(ShuffleManager)等组成。

SparkSQL

由于SQL具有普及率高、学习成本低等特点,为了扩大Spark的应用面,还增加了对SQL及Hive的支持。SparkSQL的过程可以总结为:首先使用SQL语句解析器(SqlParser)将SQL语句转换为语法树(Tree),并且使用规则执行器(RuleExecutor)将一系列规则(Rule)应用到语法树,最终生成物理执行计划并执行的过程。其中,规则包括语法分析器(Analyzer)和优化器(Optimizer)。Hive的执行过程与SQL非常类似。

Spark Streaming

Spark Streaming与Apache Storm类似,也用于流式计算。Spark Streaming支持Kafka、Flume、Kinesis和简单的TCP套接字等多种数据输入源。输入流接收器(Receiver)负责接入数据,是接入数据流的接口规范。Dstream是Spark Streaming中所有数据流的抽象,Dstream可以被组织为DStream Graph,Dstream本质上由一系列连续的RDD组成。

Spark GraphX

Spark提供的分布式图形计算框架,GraphX主要遵循整体同步并行计算模式(Bulk SynchronousParallell,BSP)下的Pregel模型。GraphX提供了对图Graph的抽象,Graph由顶点(Vertex)、边(Edge)及继承了Edge的EdgeTriplet(添加了srcAttr和dstAttr,用来保存源顶点和目的顶点的属性)三种结构组成。GraphX目前已经封装了最短路径、网页排名、连接组件、三角关系统计等算法的实现,用户可以选择使用。

Spark MLlib

Spark提供的机器学习框架, MLlib目前已经提供了基础统计、分类、回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩减、特征提取与转型、频繁模式挖掘、预言模型标记语言、管道等多种数理统计、概率论、数据挖掘方面的数学算法。

从集群部署的角度看,Spark集群由集群管理器(Cluster Manager)、工作节点(Worker)、执行器(Executor)、驱动器(Driver)等部分组成,其整体关系如下图所示:

Cluster Manager

Spark的集群管理器,主要负责对整个集群资源的分配与管理。Cluster Manager在YARN部署模式下为ResourceManager;在Mesos部署模式下为Mesos Master;在Standalone部署模式下为Master。Cluster Manager分配的资源属于一级分配,它将各个Worker上的内存、CPU等资源分配给Application,但是并不负责对Executor的资源分配。Standalone部署模式下的Master会直接给Application分配内存、CPU及Executor等资源。目前,Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。

Worker

Spark的工作节点。在YARN部署模式下实际由NodeManager替代。Worker节点主要负责以下工作:将自己的内存、CPU等资源通过注册机制告知Cluster Manager;创建Executor;将资源和任务进一步分配给Executor;同步资源信息、Executor状态信息给Cluster Manager等。在Standalone部署模式下,Master将Worker上的内存、CPU及Executor等资源分配给Application后,将命令Worker启动CoarseGrainedExecutorBackend进程(此进程会创建Executor实例)。

Executor

主要负责任务的执行及与Worker、Driver的信息同步。

Driver

Application的驱动程序,Application通过Driver与Cluster Manager、Executor进行通信。Driver可以运行在Application中,也可以由Application提交给Cluster Manager并由Cluster Manager安排Worker运行。

Application

用户使用Spark提供的API编写的应用程序,Application通过Spark API将进行RDD的转换和DAG的构建,并通过Driver将Application注册到Cluster Manager。Cluster Manager将会根据Application的资源需求,通过一级分配将Executor、内存、CPU等资源分配给Application。Driver通过二级分配将Executor等资源分配给每一个任务,Application最后通过Driver通知Executor运行任务。

5**、弹性分布式数据集(RDD)**

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark框架中最基本的数据抽象,它代表一个不可变、可分区、里面元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显示地将工作集缓存在内存中,后续的查询能够重用工作集,从而极大地提升查询速度。

A**、RDD的特性**

A list ofpartitions:一组分片,即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU Core的数目;
*
A functionfor computing each split:一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每个计算的结果;
*
A list ofdependencies on other RDDs:RDD相互之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区重新进行计算;
*
Optionally,aPartitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量;
*
Optionally,alist of preferred locations to compute each split of (e.g. block locations foran HDFS file): 一个列表,存/取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其他要处理数据块的存储位置。

B**、RDD的Transformation算子**

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark能够更加有效率地运行。RDD的Transformation算子简单总结如下表:
转换

含义****map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]sample(withReplacement, fraction, seed)

根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDDintersection(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDDdistinct([numTasks]))

对源RDD进行去重后返回一个新的RDDgroupByKey([numTasks])****

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDDreduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])

聚合操作,先对局部聚合,再对全局聚合sortByKey([ascending], [numTasks])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDDsortBy(func,[ascending], [numTasks])

与sortByKey类似,但是更灵活join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDDcogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDDcartesian(otherDataset)

笛卡尔积pipe(command, [envVars])

通过管道的方式对RDD的每个分区使用shell命令进行操作,返回对应的结果coalesce(numPartitions**)**

对RDD中的分区减少指定的数目,通常在过滤完一个大的数据集之后进行此操作,默认不会进行shuffle(false),即默认不会进行重分区repartition(numPartitions)

将RDD中所有数据平均划分到numparitions个partition中,默认会进行shuffle,即默认会进行重分区

C**、RDD的Action算子**

RDD中还有另一类算子,即Action算子,这类算子在执行时会触发SparkContext提交作业。RDD的Action算子简单总结如下表:
动作

含义****reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的collect()

在驱动程序中,以数组的形式返回数据集的所有元素count()

返回RDD中的元素个数first()

返回RDD中的第一个元素(类似于take(1))take(n)

返回一个由数据集的前n个元素组成的数组takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子takeOrdered(n, [ordering])

排序后的limit(n)saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本saveAsSequenceFile(path)

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。saveAsObjectFile(path)

使用java的序列化方法保存到本地文件,可以被sparkContext.objectFile()加载countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

D**、RDD的缓存机制**

RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时就立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

通过查看源码发现cache最终也是调用了persist方法,如上图所示。默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中进行定义,如下图所示:

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

E**、RDD的CheckPoint(检查点)机制——容错机制**

检查点,其本质是通过将RDD写入Disk做检查点,目的是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就能减少开销。

设置checkpoint的目录,可以是本地的文件夹(这种模式,需要将spark-shell运行在本地模式上)、也可以是HDFS(这种模式,需要将spark-shell运行在集群模式上)。一般是在具有容错能力,高可靠的文件系统上(比如HDFS, S3等)设置一个检查点路径,用于保存检查点数据。源码中关于checkpoint的定义如下图所示,注意用红框标记的描述内容。

F**、RDD的依赖关系及Spark任务的Stage**

RDD****的依赖关系

RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency),如下图所示:

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,可以形象地比喻为独生子女;

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,可以形象地比喻为超生子女。

Spark****任务的Stage

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。划分Stage的过程如下图所示:

6**、Spark的安装和配置**

A**、Spark伪分布模式**

废话不多说,直接进入Spark伪分布模式安装和部署的主题。首先,将Spark安装包上传到主机hadoop221的/root/tools目录下,进入到该目录下,运行命令tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz -C /root/training/

,将Spark安装包解压到/root/training目录下,这里需要注意的是,由于Spark的命令跟Hadoop的命令有冲突(比如start-all.sh,Hadoop和Spark都有,先前已经给Hadoop设置过环境变量了,这里就不能再给Spark设置),这里就不再设置环境,避免出现问题。

然后,配置Spark的核心配置文件spark-env.sh,进入到/root/training/spark-2.1.0-bin-hadoop2.7/conf目录下,运行命令cp spark-env.sh.template spark-env.sh,根据Spark提供的配置文件模板复制得到其配置文件,运行命令vi spark-env.sh,进行编辑,在文件末尾添加如下内容:

exportJAVA_HOME=/root/training/jdk1.8.0_144

export SPARK_MASTER_HOST=hadoop221

export SPARK_MASTER_PORT=7077

保存退出,接着配置Spark从节点的配置文件,运行命令cp slaves.template slaves,根据Spark提供的从节点的配置文件模板,复制得到其从节点配置文件,运行命令vi slaves,进行编辑,将localhost替换为hadoop221即可。至此,Spark的伪分布模式安装配置完成。

最后,启动Spark。由于这里没有配置Spark的环境变量,因此需要进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令sbin/start-all.sh,便在主机hadoop221上启动了一个主节点master和一个从节点worker,输出的log信息如下图所示:

跟Hadoop类似,Spark也提供自己的Web Console访问途径,可以理解为Spark内置了一个Tomcat服务器,端口号为8080,在浏览器的地址栏中输入http://192.168.12.221:8080/打开,看到的内容如下图所示。

B**、Spark全分布模式**

Spark全分布模式的安装需要使用到三台Linux系统,下面在主机hadoop222(主节点)、hadoop223(从节点)以及hadoop224(从节点)上进行安装和配置。按照先前一贯的手法,先在hadoop222上进行安装和配置,然后将配置好的Spark通过网络复制命令拷贝到主机hadoop223和hadoop224上。同样地,先将Spark安装包上传到主机hadoop222上,并解压到相应的目录下,这跟前面完全一致,这里就不再细说了。

然后,配置Spark的核心配置文件spark-env.sh及其从节点配置文件,同样地,需要从Spark提供的模板文件进行拷贝得到,按照先前一样操作即可。编辑spark-env.sh文件,在文件末尾添加如下内容:

exportJAVA_HOME=/root/training/jdk1.8.0_144

export SPARK_MASTER_HOST=hadoop222

export SPARK_MASTER_PORT=7077

保存退出,接着配置Spark从节点的配置文件,编辑slaves文件,将原来的localhost删除,添加如下两行代码即可。

hadoop223

hadoop224

保存退出,接下来将安装好的hadoop222上的Spark安装目录拷贝到主机hadoop223和hadoop224上,分别运行如下两条命令即可:

scp -r/root/training/spark-2.1.0-bin-hadoop2.7/ root@hadoop223:/root/training/

scp -r/root/training/spark-2.1.0-bin-hadoop2.7/ root@hadoop224:/root/training/

至此,Spark全分布模式的安装和配置完成。进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令sbin/start-all.sh,便在主机hadoop222上启动了一个主节点,在主机hadoop223和hadoop224上分别启动了一个从节点,输出的log信息如下图所示:

在浏览器的地址栏中输入http://192.168.12.222:8080/打开,看到的内容如下图所示。

C**、Spark HA**

基于文件系统的HA

基于文件系统的HA,主要用于开发或测试环境。Spark提供目录保存Spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,这时,一旦Master发生故障,就可以通过重新启动Master进程(sbin/start-master.sh),恢复已运行的SparkApplication和worker的注册信息。下面在主机hadoop221上,基于前面安装的Spark伪分布模式,配置基于文件系统的HA,在正是开始之前,需要停止Spark的运行。

进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令mkdir recovery,创建恢复目录,编辑Spark核心配置文件spark-env.sh,在文件末尾添加如下内容(spark.deploy.recoveryMode表示:设置为FILESYSTEM开启单点恢复功能,默认值为NONE;spark.deploy.recoveryDirectory表示Spark保存恢复状态的目录)。

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM-Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"

保存退出即可。运行命令sbin/start-all.sh,启动Spark,可以看到在recovery目录下生成了一个名称为worker_worker-20180817213453-192.168.12.221-40130的文件,该文件中就保存了Spark当前运行的状态信息。这里可以来模拟下Spark从异常退出到恢复运行的过程,运行命令bin/spark-shell --master spark://hadoop221:7077,启动Spark客户端并连接到Spark集群上,看到的内容如下图所示。

新开一个窗口连接到主机hadoop221上,杀掉主节点master进程,可看到如下图所示的log信息,提示连接主节点master失败。

最后,在新开的窗口中运行命令sbin/start-master.sh,重新启动master节点,这时会发现 Spark客户端又重新连接上了Spark集群。

基于Zookeeper的HA

Zookeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到Zookeeper中,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入Zookeeper的Spark集群,整体架构如下图所示:

接下来基于前面安装的Spark全分布模式,配置基于Zookeeper的Spark HA集群。在正式配置之前,先停掉运行中的Spark,并在三台主机上启动Zookeeper。编辑Spark的核心配置文件spark-env.sh,注释掉exportSPARK_MASTER_HOST=hadoop222和exportSPARK_MASTER_PORT=7077(这里需要注意的是:由于主节点的信息交由Zookeeper来维护了,因此该配置不再需要;三个主机上进行同样的配置),并在文件末尾添加如下内容:

exportSPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=hadoop222:2181,hadoop223:2181,hadoop224:2181-Dspark.deploy.zookeeper.dir=/spark"

保存退出,至此基于Zookeeper的Spark HA配置完成。进入到主机hadoop222的/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令sbin/start-all.sh,启动Spark集群。这里需要注意的是,由于这里运行的是Spark HA集群,因此需要额外再启动一个master节点,进入到主机hadoop223的/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令sbin/start-master.sh即可。

在浏览器的地址中输入http://192.168.12.222:8080打开,看到的内容如下图所示:

在浏览器的地址中输入http://192.168.12.223:8080打开,看到的内容如下图所示:

7**、RDD编程实例**

A**、parallelize、map、sortby、filter算子的综合使用**

//通过并行化算子parallelize生成rdd

val rdd1=sc.parallelize(List(50, 6, 40,7, 32, 80, 22, 69, 12, 10))

//对rdd1里的每一个元素乘2,然后进行排序

val rdd2=rdd1.map(_/*2).sortBy(x=>x,true)

//过滤出大于等于八十的元素

val rdd3=rdd2.filter(_>=80)

//将元素以数组的方式在客户端显示

rdd3.collect

运行结果如下图所示:

B**、flatMap算子的使用**

val rdd1 = sc.parallelize(Array("helloworld", "hello China", "hello Beijing"))

//将rdd1里面的每一个元素先切分在压平

var rdd2 = rdd1.flatMap(_.split(' '))

rdd2.collect

运行结果如下图所示:

C**、union、intersection、distinct算子的使用**

val rdd1=sc.parallelize(List(2, 1, 5,6, 4, 3))

val rdd2=sc.parallelize(List(1, 2, 3, 4,5, 6))

//求并集

val rdd3=rdd1.union(rdd2)

//求交集

val rdd4=rdd1.intersection(rdd2)

//去重

rdd3.distinct.collect

rdd4.collect

运行结果如下图所示:

D**、join、groupByKey算子的使用**

val rdd1=sc.parallelize(List(("Iris",1), ("Mary", 3), ("Mike", 2)))

val rdd2=sc.parallelize(List(("Mary",2), ("Iris", 1), ("Kate", 2)))

//求jion

val rdd3=rdd1.join(rdd2)

//求并集

val rdd4=rdd1 union rdd2

//按key进行分组

val rdd5=rdd4.groupByKey

rdd3.collect

rdd5.collect

运行结果如下图所示:

E**、cogroup、reduce算子的使用**

val rdd1=sc.parallelize(List(("Iris",1),("Iris",2), ("Mary",3), ("Kate",2)))

val rdd2=sc.parallelize(List(("Mary",2),("Kite",1), ("Iris",2)))

val rdd3=sc.parallelize(List(1,2,3,4,5))

//注意cogroup与groupByKey的区别

rdd1.cogroup(rdd2).collect

//reduce聚合

rdd3.reduce(+).collect

运行结果如下图所示:

F**、union、reduceByKey、map、sortByKey算子的综合使用**

val rdd1=sc.parallelize(List(("Iris",1),("Cherry",3),("Kate",2),("Mike",1)))

val rdd2=sc.parallelize(List(("Cherry",2),("Iris",3),("Mike",2),("Kate",5)))

val rdd3=rdd1.union(rdd2)

//按key进行聚合

val rdd4=rdd3.reduceByKey(+)

rdd4.collect

//按value的降序排序

val rdd5=rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))

rdd5.collect

运行结果如下图所示:

8**、Spark RDD高级算子**

A**、mapPartitionsWithIndex算子**

该算子的功能是:把每个partition中的分区号和对应的值拿出来,源码定义如下图:

接收一个函数参数,该函数有两个参数,第一个参数代表分区号,第二个参数代表分区中的元素。举例如下(该例子的功能是将每个分区中的元素和分区号打印出来):

//创建一个RDD,其具有两个partition

valrdd1=sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

//创建一个函数,返回RDD中的每个分区号和元素

def myFunc(partitionIndex:Int,iter:Iterator[Int]):Iterator[String]={

iter.toList.map(x=>"[PartitionId:"+partitionIndex+",value="+x+"]").iterator}

//调用mapPartitionsWithIndex

rdd1.mapPartitionsWithIndex(myFunc).collect

运行结果如下图所示:

B**、aggregate算子**

该算子的功能是:先对局部进行聚合操作,然后再对全局进行聚合操作,源码定义如下图所示:

zeroValue: U:初始值,它同时作用于局部操作和全局操作;

seqOp: (U,T) => U:局部操作;

combOp: (U,U) => U:全局操作

举例如下:

//创建一个RDD,其包含两个分区

val rdd1 =sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

//将每个分区中的最大值进行求和,初始值为0

rdd1.aggregate(0)(math.max(,), +)

//将每个分区中的最大值进行求和,初始值为0

rdd1.aggregate(10)(math.max(,), +)

//直接进行求和,初始值为0

rdd1.aggregate(0)(+, +)

//直接进行求和,初始值为10

rdd1.aggregate(10)(+, +)

运行结果如下图所示:

C**、aggregateByKey算子**

举例如下:

//准备数据,创建pairRDD,其具有两个分区

val pairRDD = sc.parallelize(List(("cat",2), ("cat", 5), ("mouse",4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

//定义函数

def func3(index: Int, iter:Iterator[(String, Int)]) : Iterator[String] = {

iter.toList.map(x => "[partID:" + index + ", val: " + x +"]").iterator

}

//将每个分区中动物最多的数目进行求和

pairRDD.aggregateByKey(0)(math.max(,), _ + _).collect

//将每种动物的数目进行求和

pairRDD.aggregateByKey(0)(+, _ +_).collect

//该操作同reduceByKey,使用reduceByKey将每种动物的数目进行求和

pairRDD.reduceByKey(+).collect

运行结果如下图所示:

限于篇幅,这篇文章暂且就介绍到这里,部分内容放到下一章节里进行讲述,欢迎关注。

参考文献:

——《实时大数据分析 基于Storm、Spark技术的实时应用》

——《百度百科》

——《CSDN博客》

——《潭州大数据课程课件》

转自https://mp.weixin.qq.com/s/mbD9NTVCVPqVNeiKff4voQ

相关文章