1**、Spark概述**
进入Spark的官方网站,可以看到Spark的定义,如下图所示:
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆等等。当前,百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群已达到8000台的规模,是当前已知的世界上最大的Spark集群。
为什么要学习Spark呢?对比Apache Hadoop(MapReduce)、Apache Storm以及Spark的一些特性,可见一斑,如下表:
特性
Apache Spark
Apache Hadoop
Apache Storm****数据存储
内存和可配置的分布式文件系统备份,可以是HDFS或本地磁盘上的多个文件夹或Tachyon。
HDFS——Hadoop分布式文件系统。
内存。用例
批处理(微小批处理),实时数据处理,迭代和交互式分析。
批处理。
实时(每个消息到达即被处理)。容错
捕获应用于原始数据的计算以实现当前状态,并且在发生任何故障的情况下,它将同样的计算集合应用于原始数据,也被称为数据沿袭。
在不同节点上维护相同数据集的多个副本。
Storm支持事务拓扑,这允许完全容错的一次性消息传递语义。编程语言
Java、Scala、Python以及R语言。
Java。
定义Thrift接口,能够用任何语言实现以定义和提交拓扑。硬件
通用商业硬件
通用商业硬件
通用商业硬件管理
易于管理,因为所有实时或批处理用例都可以部署在单个框架中。
仅适用于批处理用例。
仅适用于实时处理用例。部署
Spark是一个通用的集群计算框架,可以在独立模式下部署,也可以在各种其他框架(如Yarn或Mesos)上部署。
Apache Hadoop有自己的部署模型,不能部署在其他分布式集群计算框架上。
Storm有自己的部署模型,不能部署在任何其他分布式集群计算框架上。效率
比Hadoop快10倍,因为数据在内存本身进行读/写。
从HDFS读取/写入数据时速度较慢。
能够于数据在内存本身读/写的同时,在几秒和几毫秒内处理消息。分布式
缓存
通过在分布式工作者的内存中缓存部分结果了,确保更低的延迟计算。
完全取决于磁盘。
Storm没有任何分布式缓存功能,但可以使用Memcached或Redis等框架。易用性
支持如Scala和Python这样的功能性语言,生成精简代码。
仅支持Java,因此MapReduce显得冗长和复杂。
支持多种语言。高级操作
提供如map、reduce、flatmap、group、sort、union、intersection、aggregation、Cartesian等常见操作。
不提供除map和reduce之外的任何预定义操作。
Storm核心框架不提供任何操作,但Trident(扩展到Storm)可用于执行廉洁、聚合、分组等操作。API及扩展性
提供对核心API的定义清晰的抽象,可以扩展到在Spark Core上开发扩展/库,例如Streaming、GraphXM等。
提供严格的API,不允许多样性扩展。
Storm是一个支持扩展开发的近实时处理框架,例如Trident就是在Storm上开发的此类框架。安全性
通过共享密钥提供基本安全性身份验证。
Apache Hadoop有自己强大且成熟的安全框架,使用Kerberos和LDAP进行身份验证和授权。
Storm 0.10+提供了可插拔认证和授权框架。
2、Spark的特点
A**、快**
Hadoop的MapReduce相比,Spark基于内存的运算速度要快100倍以上,即使Spark基于硬盘运算也要快10倍。Spark实现了高效的DAG执行引擎,从而可以通过内存来高效处理数据流。官网介绍如下图所示:
B**、易用**
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。官网介绍如下图所示:
C**、通用**
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物理成本。官网介绍如下图所示:
此外,Spark还可以很好地融入Hadoop的体系结构中,可以直接操作HDFS,并提供Hive on Spark、Pig on Spark的框架集成Hadoop。
D**、兼容性**
Spark可以非常方便地与其他的开源产品进行融合,比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。官网介绍如下图所示:
3**、Spark应用场景**
Apache Spark是作为一个通用的集群计算框架开发的,适用于各种用例。在一些使用情况下,它可以工作到最好,对于其他一些情况,它仍可以工作,但不一定是一个理想的选择。下面是一些Apache Spark工作效果最佳的用例,仅供参考:
批处理:Spark是一个非常适合大多数批处理用例的通用集群计算框架,如日志分析、定价分析和理赔处理这些用例都是能够很好地采用Apache Spark实现的范例;
*
流式处理:处理流数据类似于实时或接近实时的数据处理,Spark Streaming是一个通过核心Spark框架开发的扩展,可以用于实现流用例,例如IoT、在线发布等实时或接近实时的用例,是可以采用Spark和Spark Streaming实现的几个范例;
*
数据挖掘:数据挖掘是计算机科学中的一个专门的分支,主要是识别所提供数据中的隐藏模式,涉及聚类、分类、推荐等多种相关迭代和机器学习算法的实现,Spark为此提供了更多的扩展方案;
*
MLlib:MLlib是通过核心Spark框架开发的,其中提供了各种机器学习算法的分布式实现,Spark机器学习库可以用于开发提供预测智能、营销目标的客户细分、推荐引擎和情绪分析这些功能的应用程序;
*
图形计算:这是计算机科学中的另一个专业领域,其重点是不同实体之间的关系。该结构是顶点和边的形式,其中顶点是实体本身,边是这些实体之间的关系。图形计算对于模拟各种现实世界中数据不断发展的用例非常有用,这些用例包含社交网络、网络管理、公共交通链接和路线图等,Spark为此提供了更多的扩展实现方案选择;
*
GraphXM:GraphXM可用来对顶点和边缘形式组成的数据进行建模和结构化,并提供图形并行计算支持了,还提供了多种图形算法的实现;
*
交互式分析:在此过程中,要跨行业收集历史数据,并且工程师对用于交互式分析的数据唾手可得。特定查询变得越来越重要,这不仅是存储更便宜的缘故,还源于敏捷、迅速和定性的决策对更快响应时间的实时需求。大规模交互式数据分析的执行需要高度的并行性,Spark提供了额外的扩展——Spark SQL,其有助于行和列中数据的结构化处理,并且还提供了用于自组织和交互式数据分析的分布式查询引擎。
4**、Spark的体系结构**
Apache Spark由SparkCore、Spark SQL、Spark Streaming、GraphX、MLlib等模块组成,模块间的整体关系如下图所示:
SparkCore
其中,SparkCore是Apache Spark的核心,是其他扩展模块的基础运行时环境。SparkCore的功能主要包括以下几个:
基础设施;
*
SparkConf:用于管理Spark应用程序的各种配置信息;
*
内置的基于Netty的RPC框架,包括同步和异步的多种实现,RPC框架是Spark各组件之间进行通信的基础;
*
事件总线:SparkContext内部各组件之间使用事件——监听器模式异步调用的实现;
*
度量系统:由Spark中的多种度量源(Source)和多种度量输出(Sink)构成,完成对整个Spark集群中各组件运行期状态的监控;
*
SparkContext:通常而言,用户开发的Spark应用程序的提交与执行都离不开SparkContex的支持。在正式提交应用程序之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发;
*
SparkEnv:Spark执行环境SparkEnv,是Spark中的Task运行所必需的组件。SparkEnv内部封装了RPC环境(RpcEnv)、序列化管理器、广播管理器(BroadcastManager)、map任务输出跟踪器(MapOutputTracker)、存储体系、度量系统(MetricsSystem)、输出提交协调器(OutputCommitCoordinator)等Task运行所需的各种组件;
*
存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘I/O,提升了任务执行的效率,使得Spark适用于实时计算、迭代计算、流式计算等场景。在实际场景中,有些Task是存储密集型的,有些则是计算密集型的,所以有时候会造成存储空间很空闲,而计算空间的资源又很紧张。Spark的内存存储空间与执行存储空间之间的边界可以是“软”边界,因此资源紧张的一方可以借用另一方的空间,这既可以有效利用资源,又可以提高Task的执行效率。此外,Spark的内存空间还提供了Tungsten的实现,直接操作操作系统的内存。由于Tungsten省去了在堆内分配Java对象,因此能更加有效地利用系统的内存资源,并且因为直接操作系统内存,空间的分配和释放也更迅速;
*
调度系统:调度系统主要由DAGScheduler和TaskScheduler组成,它们都内置在SparkContext中。DAGScheduler负责创建Job、将DAG中的RDD划分到不同的Stage、给Stage创建对应的Task、批量提交Task等功能。TaskScheduler负责按照FIFO或者FAIR等调度算法对批量Task进行调度;为Task分配资源;将Task发送到集群管理器的当前应用的Executor上,由Executor负责执行等工作。即使现在Spark增加了SparkSession和DataFrame等新的API,但这些新API的底层依然依赖于SparkContext;
*
计算引擎:计算引擎由内存管理器(MemoryManager)、Tungsten、任务内存管理器(TaskMemory-Manager)、Task、外部排序器(ExternalSorter)、Shuffle管理器(ShuffleManager)等组成。
SparkSQL
由于SQL具有普及率高、学习成本低等特点,为了扩大Spark的应用面,还增加了对SQL及Hive的支持。SparkSQL的过程可以总结为:首先使用SQL语句解析器(SqlParser)将SQL语句转换为语法树(Tree),并且使用规则执行器(RuleExecutor)将一系列规则(Rule)应用到语法树,最终生成物理执行计划并执行的过程。其中,规则包括语法分析器(Analyzer)和优化器(Optimizer)。Hive的执行过程与SQL非常类似。
Spark Streaming
Spark Streaming与Apache Storm类似,也用于流式计算。Spark Streaming支持Kafka、Flume、Kinesis和简单的TCP套接字等多种数据输入源。输入流接收器(Receiver)负责接入数据,是接入数据流的接口规范。Dstream是Spark Streaming中所有数据流的抽象,Dstream可以被组织为DStream Graph,Dstream本质上由一系列连续的RDD组成。
Spark GraphX
Spark提供的分布式图形计算框架,GraphX主要遵循整体同步并行计算模式(Bulk SynchronousParallell,BSP)下的Pregel模型。GraphX提供了对图Graph的抽象,Graph由顶点(Vertex)、边(Edge)及继承了Edge的EdgeTriplet(添加了srcAttr和dstAttr,用来保存源顶点和目的顶点的属性)三种结构组成。GraphX目前已经封装了最短路径、网页排名、连接组件、三角关系统计等算法的实现,用户可以选择使用。
Spark MLlib
Spark提供的机器学习框架, MLlib目前已经提供了基础统计、分类、回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩减、特征提取与转型、频繁模式挖掘、预言模型标记语言、管道等多种数理统计、概率论、数据挖掘方面的数学算法。
从集群部署的角度看,Spark集群由集群管理器(Cluster Manager)、工作节点(Worker)、执行器(Executor)、驱动器(Driver)等部分组成,其整体关系如下图所示:
Cluster Manager
Spark的集群管理器,主要负责对整个集群资源的分配与管理。Cluster Manager在YARN部署模式下为ResourceManager;在Mesos部署模式下为Mesos Master;在Standalone部署模式下为Master。Cluster Manager分配的资源属于一级分配,它将各个Worker上的内存、CPU等资源分配给Application,但是并不负责对Executor的资源分配。Standalone部署模式下的Master会直接给Application分配内存、CPU及Executor等资源。目前,Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。
Worker
Spark的工作节点。在YARN部署模式下实际由NodeManager替代。Worker节点主要负责以下工作:将自己的内存、CPU等资源通过注册机制告知Cluster Manager;创建Executor;将资源和任务进一步分配给Executor;同步资源信息、Executor状态信息给Cluster Manager等。在Standalone部署模式下,Master将Worker上的内存、CPU及Executor等资源分配给Application后,将命令Worker启动CoarseGrainedExecutorBackend进程(此进程会创建Executor实例)。
Executor
主要负责任务的执行及与Worker、Driver的信息同步。
Driver
Application的驱动程序,Application通过Driver与Cluster Manager、Executor进行通信。Driver可以运行在Application中,也可以由Application提交给Cluster Manager并由Cluster Manager安排Worker运行。
Application
用户使用Spark提供的API编写的应用程序,Application通过Spark API将进行RDD的转换和DAG的构建,并通过Driver将Application注册到Cluster Manager。Cluster Manager将会根据Application的资源需求,通过一级分配将Executor、内存、CPU等资源分配给Application。Driver通过二级分配将Executor等资源分配给每一个任务,Application最后通过Driver通知Executor运行任务。
5**、弹性分布式数据集(RDD)**
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark框架中最基本的数据抽象,它代表一个不可变、可分区、里面元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显示地将工作集缓存在内存中,后续的查询能够重用工作集,从而极大地提升查询速度。
A**、RDD的特性**
A list ofpartitions:一组分片,即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU Core的数目;
*
A functionfor computing each split:一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每个计算的结果;
*
A list ofdependencies on other RDDs:RDD相互之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区重新进行计算;
*
Optionally,aPartitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量;
*
Optionally,alist of preferred locations to compute each split of (e.g. block locations foran HDFS file): 一个列表,存/取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其他要处理数据块的存储位置。
B**、RDD的Transformation算子**
RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark能够更加有效率地运行。RDD的Transformation算子简单总结如下表:
转换
含义****map(func)
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成filter(func)
返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成flatMap(func)
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)mapPartitions(func)
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]mapPartitionsWithIndex(func)
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]sample(withReplacement, fraction, seed)
根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子union(otherDataset)
对源RDD和参数RDD求并集后返回一个新的RDDintersection(otherDataset)
对源RDD和参数RDD求交集后返回一个新的RDDdistinct([numTasks]))
对源RDD进行去重后返回一个新的RDDgroupByKey([numTasks])****
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDDreduceByKey(func, [numTasks])
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])
聚合操作,先对局部聚合,再对全局聚合sortByKey([ascending], [numTasks])
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDDsortBy(func,[ascending], [numTasks])
与sortByKey类似,但是更灵活join(otherDataset, [numTasks])
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDDcogroup(otherDataset, [numTasks])
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDDcartesian(otherDataset)
笛卡尔积pipe(command, [envVars])
通过管道的方式对RDD的每个分区使用shell命令进行操作,返回对应的结果coalesce(numPartitions**)**
对RDD中的分区减少指定的数目,通常在过滤完一个大的数据集之后进行此操作,默认不会进行shuffle(false),即默认不会进行重分区repartition(numPartitions)
将RDD中所有数据平均划分到numparitions个partition中,默认会进行shuffle,即默认会进行重分区
C**、RDD的Action算子**
RDD中还有另一类算子,即Action算子,这类算子在执行时会触发SparkContext提交作业。RDD的Action算子简单总结如下表:
动作
含义****reduce(func)
通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的collect()
在驱动程序中,以数组的形式返回数据集的所有元素count()
返回RDD中的元素个数first()
返回RDD中的第一个元素(类似于take(1))take(n)
返回一个由数据集的前n个元素组成的数组takeSample(withReplacement,num, [seed])
返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子takeOrdered(n, [ordering])
排序后的limit(n)saveAsTextFile(path)
将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本saveAsSequenceFile(path)
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。saveAsObjectFile(path)
使用java的序列化方法保存到本地文件,可以被sparkContext.objectFile()加载countByKey()
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。foreach(func)
在数据集的每一个元素上,运行函数func进行更新。
D**、RDD的缓存机制**
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时就立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
通过查看源码发现cache最终也是调用了persist方法,如上图所示。默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中进行定义,如下图所示:
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
E**、RDD的CheckPoint(检查点)机制——容错机制**
检查点,其本质是通过将RDD写入Disk做检查点,目的是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就能减少开销。
设置checkpoint的目录,可以是本地的文件夹(这种模式,需要将spark-shell运行在本地模式上)、也可以是HDFS(这种模式,需要将spark-shell运行在集群模式上)。一般是在具有容错能力,高可靠的文件系统上(比如HDFS, S3等)设置一个检查点路径,用于保存检查点数据。源码中关于checkpoint的定义如下图所示,注意用红框标记的描述内容。
F**、RDD的依赖关系及Spark任务的Stage**
RDD****的依赖关系
RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency),如下图所示:
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,可以形象地比喻为独生子女;
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,可以形象地比喻为超生子女。
Spark****任务的Stage
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。划分Stage的过程如下图所示:
6**、Spark的安装和配置**
A**、Spark伪分布模式**
废话不多说,直接进入Spark伪分布模式安装和部署的主题。首先,将Spark安装包上传到主机hadoop221的/root/tools目录下,进入到该目录下,运行命令tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz -C /root/training/
,将Spark安装包解压到/root/training目录下,这里需要注意的是,由于Spark的命令跟Hadoop的命令有冲突(比如start-all.sh,Hadoop和Spark都有,先前已经给Hadoop设置过环境变量了,这里就不能再给Spark设置),这里就不再设置环境,避免出现问题。
然后,配置Spark的核心配置文件spark-env.sh,进入到/root/training/spark-2.1.0-bin-hadoop2.7/conf目录下,运行命令cp spark-env.sh.template spark-env.sh,根据Spark提供的配置文件模板复制得到其配置文件,运行命令vi spark-env.sh,进行编辑,在文件末尾添加如下内容:
exportJAVA_HOME=/root/training/jdk1.8.0_144
export SPARK_MASTER_HOST=hadoop221
export SPARK_MASTER_PORT=7077
保存退出,接着配置Spark从节点的配置文件,运行命令cp slaves.template slaves,根据Spark提供的从节点的配置文件模板,复制得到其从节点配置文件,运行命令vi slaves,进行编辑,将localhost替换为hadoop221即可。至此,Spark的伪分布模式安装配置完成。
最后,启动Spark。由于这里没有配置Spark的环境变量,因此需要进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令sbin/start-all.sh,便在主机hadoop221上启动了一个主节点master和一个从节点worker,输出的log信息如下图所示:
跟Hadoop类似,Spark也提供自己的Web Console访问途径,可以理解为Spark内置了一个Tomcat服务器,端口号为8080,在浏览器的地址栏中输入http://192.168.12.221:8080/打开,看到的内容如下图所示。
B**、Spark全分布模式**
Spark全分布模式的安装需要使用到三台Linux系统,下面在主机hadoop222(主节点)、hadoop223(从节点)以及hadoop224(从节点)上进行安装和配置。按照先前一贯的手法,先在hadoop222上进行安装和配置,然后将配置好的Spark通过网络复制命令拷贝到主机hadoop223和hadoop224上。同样地,先将Spark安装包上传到主机hadoop222上,并解压到相应的目录下,这跟前面完全一致,这里就不再细说了。
然后,配置Spark的核心配置文件spark-env.sh及其从节点配置文件,同样地,需要从Spark提供的模板文件进行拷贝得到,按照先前一样操作即可。编辑spark-env.sh文件,在文件末尾添加如下内容:
exportJAVA_HOME=/root/training/jdk1.8.0_144
export SPARK_MASTER_HOST=hadoop222
export SPARK_MASTER_PORT=7077
保存退出,接着配置Spark从节点的配置文件,编辑slaves文件,将原来的localhost删除,添加如下两行代码即可。
hadoop223
hadoop224
保存退出,接下来将安装好的hadoop222上的Spark安装目录拷贝到主机hadoop223和hadoop224上,分别运行如下两条命令即可:
scp -r/root/training/spark-2.1.0-bin-hadoop2.7/ root@hadoop223:/root/training/
scp -r/root/training/spark-2.1.0-bin-hadoop2.7/ root@hadoop224:/root/training/
至此,Spark全分布模式的安装和配置完成。进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令sbin/start-all.sh,便在主机hadoop222上启动了一个主节点,在主机hadoop223和hadoop224上分别启动了一个从节点,输出的log信息如下图所示:
在浏览器的地址栏中输入http://192.168.12.222:8080/打开,看到的内容如下图所示。
C**、Spark HA**
基于文件系统的HA
基于文件系统的HA,主要用于开发或测试环境。Spark提供目录保存Spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,这时,一旦Master发生故障,就可以通过重新启动Master进程(sbin/start-master.sh),恢复已运行的SparkApplication和worker的注册信息。下面在主机hadoop221上,基于前面安装的Spark伪分布模式,配置基于文件系统的HA,在正是开始之前,需要停止Spark的运行。
进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令mkdir recovery,创建恢复目录,编辑Spark核心配置文件spark-env.sh,在文件末尾添加如下内容(spark.deploy.recoveryMode表示:设置为FILESYSTEM开启单点恢复功能,默认值为NONE;spark.deploy.recoveryDirectory表示Spark保存恢复状态的目录)。
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM-Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"
保存退出即可。运行命令sbin/start-all.sh,启动Spark,可以看到在recovery目录下生成了一个名称为worker_worker-20180817213453-192.168.12.221-40130的文件,该文件中就保存了Spark当前运行的状态信息。这里可以来模拟下Spark从异常退出到恢复运行的过程,运行命令bin/spark-shell --master spark://hadoop221:7077,启动Spark客户端并连接到Spark集群上,看到的内容如下图所示。
新开一个窗口连接到主机hadoop221上,杀掉主节点master进程,可看到如下图所示的log信息,提示连接主节点master失败。
最后,在新开的窗口中运行命令sbin/start-master.sh,重新启动master节点,这时会发现 Spark客户端又重新连接上了Spark集群。
基于Zookeeper的HA
Zookeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到Zookeeper中,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入Zookeeper的Spark集群,整体架构如下图所示:
接下来基于前面安装的Spark全分布模式,配置基于Zookeeper的Spark HA集群。在正式配置之前,先停掉运行中的Spark,并在三台主机上启动Zookeeper。编辑Spark的核心配置文件spark-env.sh,注释掉exportSPARK_MASTER_HOST=hadoop222和exportSPARK_MASTER_PORT=7077(这里需要注意的是:由于主节点的信息交由Zookeeper来维护了,因此该配置不再需要;三个主机上进行同样的配置),并在文件末尾添加如下内容:
exportSPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=hadoop222:2181,hadoop223:2181,hadoop224:2181-Dspark.deploy.zookeeper.dir=/spark"
保存退出,至此基于Zookeeper的Spark HA配置完成。进入到主机hadoop222的/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令sbin/start-all.sh,启动Spark集群。这里需要注意的是,由于这里运行的是Spark HA集群,因此需要额外再启动一个master节点,进入到主机hadoop223的/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令sbin/start-master.sh即可。
在浏览器的地址中输入http://192.168.12.222:8080打开,看到的内容如下图所示:
在浏览器的地址中输入http://192.168.12.223:8080打开,看到的内容如下图所示:
7**、RDD编程实例**
A**、parallelize、map、sortby、filter算子的综合使用**
//通过并行化算子parallelize生成rdd
val rdd1=sc.parallelize(List(50, 6, 40,7, 32, 80, 22, 69, 12, 10))
//对rdd1里的每一个元素乘2,然后进行排序
val rdd2=rdd1.map(_/*2).sortBy(x=>x,true)
//过滤出大于等于八十的元素
val rdd3=rdd2.filter(_>=80)
//将元素以数组的方式在客户端显示
rdd3.collect
运行结果如下图所示:
B**、flatMap算子的使用**
val rdd1 = sc.parallelize(Array("helloworld", "hello China", "hello Beijing"))
//将rdd1里面的每一个元素先切分在压平
var rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect
运行结果如下图所示:
C**、union、intersection、distinct算子的使用**
val rdd1=sc.parallelize(List(2, 1, 5,6, 4, 3))
val rdd2=sc.parallelize(List(1, 2, 3, 4,5, 6))
//求并集
val rdd3=rdd1.union(rdd2)
//求交集
val rdd4=rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
运行结果如下图所示:
D**、join、groupByKey算子的使用**
val rdd1=sc.parallelize(List(("Iris",1), ("Mary", 3), ("Mike", 2)))
val rdd2=sc.parallelize(List(("Mary",2), ("Iris", 1), ("Kate", 2)))
//求jion
val rdd3=rdd1.join(rdd2)
//求并集
val rdd4=rdd1 union rdd2
//按key进行分组
val rdd5=rdd4.groupByKey
rdd3.collect
rdd5.collect
运行结果如下图所示:
E**、cogroup、reduce算子的使用**
val rdd1=sc.parallelize(List(("Iris",1),("Iris",2), ("Mary",3), ("Kate",2)))
val rdd2=sc.parallelize(List(("Mary",2),("Kite",1), ("Iris",2)))
val rdd3=sc.parallelize(List(1,2,3,4,5))
//注意cogroup与groupByKey的区别
rdd1.cogroup(rdd2).collect
//reduce聚合
rdd3.reduce(+).collect
运行结果如下图所示:
F**、union、reduceByKey、map、sortByKey算子的综合使用**
val rdd1=sc.parallelize(List(("Iris",1),("Cherry",3),("Kate",2),("Mike",1)))
val rdd2=sc.parallelize(List(("Cherry",2),("Iris",3),("Mike",2),("Kate",5)))
val rdd3=rdd1.union(rdd2)
//按key进行聚合
val rdd4=rdd3.reduceByKey(+)
rdd4.collect
//按value的降序排序
val rdd5=rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
rdd5.collect
运行结果如下图所示:
8**、Spark RDD高级算子**
A**、mapPartitionsWithIndex算子**
该算子的功能是:把每个partition中的分区号和对应的值拿出来,源码定义如下图:
接收一个函数参数,该函数有两个参数,第一个参数代表分区号,第二个参数代表分区中的元素。举例如下(该例子的功能是将每个分区中的元素和分区号打印出来):
//创建一个RDD,其具有两个partition
valrdd1=sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
//创建一个函数,返回RDD中的每个分区号和元素
def myFunc(partitionIndex:Int,iter:Iterator[Int]):Iterator[String]={
iter.toList.map(x=>"[PartitionId:"+partitionIndex+",value="+x+"]").iterator}
//调用mapPartitionsWithIndex
rdd1.mapPartitionsWithIndex(myFunc).collect
运行结果如下图所示:
B**、aggregate算子**
该算子的功能是:先对局部进行聚合操作,然后再对全局进行聚合操作,源码定义如下图所示:
zeroValue: U:初始值,它同时作用于局部操作和全局操作;
seqOp: (U,T) => U:局部操作;
combOp: (U,U) => U:全局操作
举例如下:
//创建一个RDD,其包含两个分区
val rdd1 =sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
//将每个分区中的最大值进行求和,初始值为0
rdd1.aggregate(0)(math.max(,), +)
//将每个分区中的最大值进行求和,初始值为0
rdd1.aggregate(10)(math.max(,), +)
//直接进行求和,初始值为0
rdd1.aggregate(0)(+, +)
//直接进行求和,初始值为10
rdd1.aggregate(10)(+, +)
运行结果如下图所示:
C**、aggregateByKey算子**
举例如下:
//准备数据,创建pairRDD,其具有两个分区
val pairRDD = sc.parallelize(List(("cat",2), ("cat", 5), ("mouse",4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
//定义函数
def func3(index: Int, iter:Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x +"]").iterator
}
//将每个分区中动物最多的数目进行求和
pairRDD.aggregateByKey(0)(math.max(,), _ + _).collect
//将每种动物的数目进行求和
pairRDD.aggregateByKey(0)(+, _ +_).collect
//该操作同reduceByKey,使用reduceByKey将每种动物的数目进行求和
pairRDD.reduceByKey(+).collect
运行结果如下图所示:
限于篇幅,这篇文章暂且就介绍到这里,部分内容放到下一章节里进行讲述,欢迎关注。
参考文献:
——《实时大数据分析 基于Storm、Spark技术的实时应用》
——《百度百科》
——《CSDN博客》
——《潭州大数据课程课件》
转自https://mp.weixin.qq.com/s/mbD9NTVCVPqVNeiKff4voQ
内容来源于网络,如有侵权,请联系作者删除!