MapReduce On YARN与MRv1在编程模型和数据处理引擎方面的实现是一样的,唯一不同的是运行时环境。不同于MRv1中由JobTracker和TaskTracker构成的运行时环境,MapReduce On YARN的运行时环境由YARN与ApplicationMaster构成,这种新颖的运行时环境使得MapReduce可以与其他计算框架运行在一个集群中,从而达到共享集群资源、提高资源利用率的目的。随着YARN的成熟与完善,MRv1的独立运行模式将被MapReduce On YARN所取代。
MRAppMaster是MapReduce的ApplicationMaster实现,它使得MapReduce应用程序可以直接运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括作业管理、资源申请与再分配、Container启动与释放、作业恢复等。下面将介绍MRAppMaster的基本构成。
1**、MRAppMaster的基本构成**
如下图所示,MRAppMaster主要由以下几种组件/服务构成。
ContainerAllocator。同ResourceManager通信,为MapReduce作业申请资源。作业的每个任务资源需求可描述为5元组<priority,hostname,capability,containers,relax_locality>,分别表示作业优先级、期望资源所在的host、资源量(当前仅支持内存和CPU两种资源)、Container数目、是否松弛本地性。ContainerAllocator周期性通过RPC协议与ResourceManager通信,而ResourceManager则通过心跳应答的方式为之返回已经分配的Container列表、完成的Container列表等信息;
*
ClientService。ClientService是一个接口,由MRClientService实现。MRClientService实现了MRClientProtocol协议,客户端可通过该协议获取作业的执行状态(不必通过ResourceManager)和控制作业(比如杀死作业、改变作业优先级等);
*
Job。Job表示一个MapReduce作业,与MRv1的JobInProgress功能一样,负责监控作业的运行状态。它维护了一个作业状态机,以实现异步执行与各种作业相关的操作;
*
Task。Task表示一个MapReduce作业中的某个任务,与MRv1中的TaskInProgress功能类似,负责监控一个任务的运行状态。它维护了一个任务状态机,以实现异步执行与各种任务相关的操作;
*
TaskAttempt。TaskAttempt表示一个任务运行实例,它的执行逻辑与MRv1中的MapTask和ReduceTask运行实例完全一致,实际上,它直接使用了MRv1的数据处理引擎,但经过了一些优化。正是由于它与MRv1的数据处理引擎一样,对外提供的编程接口也与MRv1完全一致,这意味着MRv1的应用程序可直接运行于YARN之上;
*
TaskCleaner。TaskCleaner负责清理失败任务或者被杀死任务使用的目录和产生的临时结果(可统称为垃圾数据),它维护了一个线程池和一个共享队列,异步删除任务产生的垃圾数据;
*
Speculator。Speculator完成推测执行功能。当同一个作业的某个任务运行速度明显慢于其他任务时,Speculator会为该任务启动一个备份任务,让它与原任务同时处理同一份数据,谁先计算完成则将谁的结果作为最终结果,并将另一个任务杀掉。该机制可有效防止那些“拖后腿”任务拖慢整个作业的执行进度;
*
ContainerLauncher。ContainerLauncher负责与NodeManager通信,以启动一个Container。当ResourceManager为作业分配资源后,ContainerLauncher会将任务执行相关信息填充到Container中,包括任务运行所需资源、任务运行命令、任务运行环境、任务依赖的外部文件等,然后与对应的NodeManager通信,要求它启动Container;
*
TaskAttemptListener。TaskAttemptListener负责管理各个任务的心跳信息,如果一个任务一段时间内未汇报心跳,则认为它死掉了,会将其从系统中移除。同MRv1中的TaskTracker类似,它实现了TaskUmbilicalProtocol协议,任务会通过该协议汇报心跳,并询问是否能够提交最终结果;
*
JobHistoryEventHandler。JobHistoryEventHandler负责对作业的各个事件记录日志,比如作业创建、作业开始运行、一个任务开始运行等,这些日志会被写到HDFS的某个目录下,这对于作业恢复非常有用。当MRAppMaster出现故障时,YARN会将其重新调度到另外一个节点上。为了避免重新计算,MRAppMaster首先会从HDFS上读取上次运行产生的日志,以恢复已经运行完成的任务,进而能够只运行尚未运行完成的任务。
2**、MapReduce客户端**
MapReduce客户端是MapReduce用户与YARN以及MRAppMaster进行通信的唯一途径,通过该客户端,用户可以向YARN提交作业,获取作业的运行状态和控制作业(比如杀死作业、杀死任务等)。MapReduce客户端涉及两个RPC通信协议:
ApplicationClientProtocol。在YARN中,ResourceManager实现了ApplicationClientProtocol协议,任何客户端需使用该协议完成提交作业、杀死作业、改变作业优先级等操作;
*
MRClientProtocol。当作业的ApplicationMaster成功启动后,它会启动MRClientService服务,该服务实现了MRClientProtocol协议,从而允许客户端直接通过该协议与ApplicationMaster通信以控制作业和查询作业运行状态,以减轻ResourceManager的负载。
为了与MRv1的客户端兼容,MRAppMaster混合使用了继承和组合的设计模式。通过使用继承设计模式,MRAppMaster可保证它的对外接口与MRv1的一致,通过使用组合设计模式,MapReduce客户端使用RPC通信协议ApplicationClientProtocol和MRClientProtocol分别与ResourceManager和ApplicationMaster通信,以完成作业提交和后续的作业运行状态查询和作业控制。
A**、ApplicationClientProtocol协议**
MapReduce客户端通过该协议与ResourceManager通信,以提交应用程序和查询集群信息。ResourceManager用Application表示用户提交的作业,并提供了以下接口供用户使用:
B**、MRClientProtocol协议**
MRAppMaster实现了MRClientProtocol协议为客户端提供服务,该协议提供了作业和任务的查询以及控制接口,主要如下:
由于客户端方法中的所有参数对象仍采用了MRv1中基于Writable的序列化和反序列化机制,为了与YARN中基于Protocol Buffers的机制相兼容,YARN提供了一系列方便它们两者相互转换的接口,大家如果对这些内容有兴趣,可以自行去了解,这里不做过多介绍。
3**、MRAppMaster工作流程**
按照作业大小不同,MRAppMaster提供了三种作业运行模式:本地模式(通常用于作业调试,和MRv1一样)、Uber模式和Non-Uber模式。对于小作业,为了降低其延迟,可采用Uber模式,在该模式下,所有Map Task和Reduce Task在同一个Container(MRAppMaster所在Container)中依次执行;对于大作业,则采用Non-Uber模式,在该模式下,MRAppMaster先为Map Task申请资源,当Map Task运行完成数目达到一定比例后再为Reduce Task申请资源。
A**、Uber运行模式**
为了降低小作业延时,YARN专门对小作业运行方式进行了优化。对于小作业而言,MRAppMaster无须再为每个任务分别申请资源,而是让其重用一个Container,并按照先Map Task后Reduce Task的运行方式串行执行每个任务。在YARN中,如果一个MapReduce作业同时满足以下条件,则认为是小作业,可运行在Uber模式下:
Map Task数目不超过mapreduce.job.ubertask.maxmaps(默认是9);
*
Reduce Task数目不超过mapreduce.job.ubertask.maxmaps(默认是1);
*
输入文件大小不超过mapreduce.job.ubertask.maxbytes(默认是一个Block大小);
*
Map Task和Reduce Task需要的资源量不超过MRAppMaster可使用的资源量。
另外,由于链式作业会并发执行不同资源需求的Map Task和Reduce Task,因此不允许运行在Uber模式下。
B**、Non-Uber运行模式**
在大数据环境下,Uber运行模式通常只能覆盖到一小部分作业,而对于其他大多数作业,仍将运行在Non-Uber模式下。在Non-Uber模式下,MRAppMaster将一个作业的Map Task和Reduce Task分为四种状态:
Pending:刚启动但尚未向ResourceManager发送资源请求;
*
scheduled:已经向ResourceManager发送资源请求但尚未分配到资源;
*
assigned:已经分配到了资源且正在运行;
*
completed:已经运行完成。
对于Map Task而言,它的生命周期为scheduled->assigned->completed;而对于Reduce Task而言,它的生命周期为pending->scheduled->assigned-> completed。由于Reduce Task的执行依赖于Map Task的输出结果,因此,为避免Reduce Task过早启动造成资源利用率低下,MRAppMaster让刚启动的Reduce Task处于pending状态,以便能够根据Map Task运行情况决定是否对其进行调度。在YARN之上运行MapReduce作业需要解决两个关键问题:如何确定Reduce Task启动时机以及如何完成Shuffle功能。
由于YARN中不再有Map Slot和Reduce Slot的概念,且ResourceManager也不知道Map Task与Reduce Task之间存在依赖关系,因此,MRAppMaster自己需设计资源申请策略以防止因Reduce Task过早启动造成资源利用率低下和Map Task因分配不到资源而“饿死”。MRAppMaster在MRv1原有策略(Map Task完成数目达到一定比例后才允许启动Reduce Task)基础上添加了更为严厉的资源控制策略和抢占策略。总结起来,Reduce Task启动时机由以下三个参数控制:
mapreduce.job.reduce.slowstart.completedmaps:当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05;
*
yarn.app.mapreduce.am.job.reduce.rampup.limit:在Map Task完成前,最多启动的Reduce Task比例,默认为0.5;
*
yarn.app.mapreduce.am.job.reduce.preemption.limit:当Map Task需要资源但暂时无法获取资源(比如Reduce Task运行过程中,部分Map Task因结果丢失需重算)时,为了保证至少一个Map Task可以得到资源,最多可以抢占的Reduce Task比例,默认为0.5。
按照MapReduce的执行逻辑,Shuffle HTTP Server应该分布到各个节点上,以便能够支持各个Reduce Task远程复制数据。然而,由于Shuffle是MapReduce框架中特有的一个处理流程,从设计上讲,不应该将它直接嵌到YARN的某个组件(比如NodeManager)中。
当用户向YARN中提交一个MapReduce应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是由ResourceManager启动MRAppMaster;第二个阶段是由MRAppMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。如下图所示,YARN的工作流程分为以下几个步骤:
步骤1:用户向YARN中提交应用程序,其中包括MRAppMaster程序、启动MRAppMaster的命令、用户程序等;
*
步骤2:ResourceManager为该应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的MRAppMaster;
*
步骤3:MRAppMaster启动后,首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,之后,它将为内部任务申请资源,并监控它们的运行状态,直到运行结束,即重复步骤4~7;
*
步骤4:MRAppMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源;
*
步骤5:一旦MRAppMaster申请到资源后,则与对应的NodeManager通信,要求它启动任务;
*
步骤6:NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务;
*
步骤7:各个任务通过RPC协议向MRAppMaster汇报自己的状态和进度,以让MRAppMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
*
步骤8:应用程序运行完成后,MRAppMaster向ResourceManager注销并关闭自己。
4**、资源申请与再分配**
ContainerAllocate是MRAppMaster中负责资源申请和分配的模块。用户提交的作业被分解成Map Task和Reduce Task后,这些Task所需的资源统一由ContainerAllocator模块负责从ResourceManager中申请,而一旦ContainerAllocator得到资源后,需采用一定的策略进一步分配给作业的各个任务。
在YARN中,作业的资源需求可描述为5元组:<priority,hostname,capability,containers,relax_locality>,分别表示作业优先级、期望资源所在的host、资源量、Container数目、是否松弛本地性,比如:
<10,“node1”,”memory:1G,CPU:1”,3,true> //优先级是一个正整数,优先级值越小,优先级越高
<10,“node2”,”memory:2G,CPU:1”,1,false> //1个必须来自node2上大小为2GB内存、1个CPU的Container(不能来自node2所在的机架或者其他节点)
<2,“/*”,”memory:1G,CPU:1”,20,false> ///*表示这样的资源可来自任意一个节点,即不考虑数据本地性
ContainerAllocator周期性通过心跳与ResourceManager通信,以获取已分配的Container列表、完成的Container列表、最近更新的节点列表等信息,而ContainerAllocator根据这些信息完成相应的操作。
A**、资源申请**
当用户提交作业之后,MRAppMaster会为其初始化,并创建一系列Map Task和Reduce Task,由于Reduce Task依赖于Map Task的中间结果,所以Reduce Task会延后调度。在ContainerAllocator中,当Map Task数目完成一定比例(由mapreduce.job.reduce.slowstart.completedmaps指定,默认是0.05,即5%)且Reduce Task可允许占用的资源(Reduce Task可占用资源比由yarn.app.mapreduce.am.job. reduce.rampup.limit指定)能够折合成整数个任务时,才会调度Reduce Task。
考虑到Map Task和Reduce Task之间的依赖关系,因此,它们之间的状态转移也是不一样的,对于Map Task而言,会依次转移到以下几个任务集合中:
scheduled->assigned->completed
对于Reduce Task而言,则按照以下流程进行:
pending->scheduled->assigned->completed
其中,pending表示等待ContainerAllocator发送资源请求的任务集合;scheduled表示已经将资源请求发送给RM,但还没有收到分配的资源的任务集合;assigned是已经收到RM分配的资源的任务集合;completed表示已运行完成的任务集合。
Reduce Task之所以会多出一个pending状态,主要是为了根据Map Task情况调整Reduce Task状态(在pending和scheduled中相互转移)。进一步说,这主要是为了防止Map Task饿死,因为在YARN中不再有Map Slot和Reduce Slot的概念(这两个概念从一定程度上减少了作业饿死的可能性),只有内存、CPU等真实的资源,需要由ApplicationMaster控制资源申请的顺序,以防止可能产生的作业饿死。
此外,ContainerAllocator将所有任务划分成三类,分别是Failed Map Task、Map Task和Reduce Task,并分别赋予它们优先级5、20和10,也就是说,当三种任务同时有资源需求时,会优先分配给Failed Map Task,然后是Reduce Task,最后是Map Task。
总结下,ContainerAllocator工作流程如下:
步骤1:将Map Task的资源需求发送给RM;
*
如果达到了Reduce Task的调度条件,则开始为Reduce Task申请资源;
*
如果为某个Task申请到了资源,则取消其他重复资源的申请。由于在HDFS中,任何一份数据通常有三个备份,而对于一个任务而言,考虑到rack和any级别的本地性,它可能会对应7个资源请求,分别是:
<20,”node1”,”memory:1G”,1,true>
<20,”node2”,”memory:1G”,1,true>
<20,”node3”,”memory:1G”,1,true>
<20,”rack1”,”memory:1G”,1,true>
<20,”rack2”,”memory:1G”,1,true>
<20,”rack3”,”memory:1G”,1,true>
<20,”/*”,”memory:1G”,1,true>
一旦该任务获取了以上任意一种资源,都会取消其他6个的资源申请。
在作业运行过程中,会出现资源重新申请和资源取消的行为,具体如下:
如果任务运行失败,则会重新为该任务申请资源;
*
如果一个任务运行速度过慢,则会为其额外申请资源以启动备份任务(如果启动了推测执行功能);
*
如果一个节点失败的任务数目过多,则会撤销对该节点的所有资源的申请请求。
B**、资源再分配**
一旦MRAppMaster收到新分配的Container后,会将这些Container进一步分配给各个任务,Container分配过程如下:
步骤1:判断收到的Container包含的资源是否满足要求,如果不满足,则通过下次心跳通知ResourceManager释放该Container;
*
判断收到的Container所在节点是否被加入黑名单中,如果是,则寻找一个与该Container匹配的任务,并重新为该任务申请资源,同时通过下次心跳通知ResourceManager释放该Container;
*
步骤3:根据Container的优先级,将它分配给对应类型的任务。如果优先级为PRIORITY_FAST_MAP(对应数值为5),则分配给失败的Map Task;如果优先级为PRIORITY_REDUCE,则分配给Reduce Task;否则,分配给正常的Map Task。对于前两种情况,ContainerAllocator直接从对应队列中取出第一个任务即可,对于最后一种情况,则依次尝试从node-local(输入数据与Container在同一个节点)、rack-local(输入数据与Container在同一个机架)和no-local(输入数据与Container不再同一个机架)几个任务列表中查找Map Task。
这里需要注意的是:当作业在一个节点上失败的任务实例数目超过一定上限(通过参数mapreduce.job.maxtaskfailures.per.tracker配置,默认值是3,管理员可通过参数yarn.app.mapreduce.am.job.node-blacklisting.enable配置是否启用该功能,默认情况下为true,表示启动该功能)后,该节点将被加入到黑名单中;为了防止大量节点被加入黑名单导致作业无法运行完成或者运行效率低下,YARN允许管理员通过参数yarn.app.mapreduce.am.job.node-blacklisting.ignore-threshold-node-percent设置最多被加入黑名单的节点比例,默认是33,即最多33%的集群节点可被加入黑名单。
5**、Container启动与释放**
ContainerLauncher负责与各个NodeManager通信,以启动或者释放Container。在YARN中,运行Task所需的全部信息被封装到Container中,包括所需资源、依赖的外部文件、JAR包、运行时环境变量、运行命令等。ContainerLauncher通过RPC协议ContainerManager与NodeManager通信,以控制Container的启动和释放,进而控制任务的执行(比如启动任务、杀死任务等),ContainerManager协议定义了三个RPC接口,具体如下图所示:
ContainerLauncher是一个Java接口,它定义了两种事件:
CONTAINER_REMOTE_LAUNCH。启动一个Container。当ContainerAllocator为某个任务申请到资源后,会将运行该任务相关的所有信息封装到Container中,并要求对应的节点启动该Container。需要注意的是,Container中运行的任务对应的数据处理引擎与MRv1中完全一致,仍为Map Task和Reduce Task,正因如此,MRv1中的程序与YARN中的MapReduce程序完全兼容;
*
CONTAINER_REMOTE_CLEANUP。停止/杀死一个Container。存在多种可能触发该事件的行为,常见的有:
推测执行时,一个任务运行完成,需杀死另一个相同输入数据的任务;
*
用户发送一个杀死任务的请求;
*
任意一个任务运行结束时,YARN会触发一个杀死任务的命令,以释放对应Container占用的资源。
这里需要特别注意的是第三种情况,YARN作为资源管理系统,应确保任何一个任务运行结束后资源得到释放,否则会造成资源泄露。而实现这一要求的可行方法是,任何一个任务结束后,不管它对应的资源是否得到释放,YARN均会主动显式检查和回收资源(container)。
6**、推测执行机制**
在分布式集群环境下,因软件Bug、负载不均衡或者资源分布不均等原因,造成同一个作业的多个任务之间运行速度不一致,有的任务运行速度明显慢于其他任务(比如某个时刻,一个作业的某个任务进度只有10%,而其他所有Task已经运行完毕),则这些任务将拖慢作业的整体执行进度。为了避免这种情况发生,运用推测执行(Speculative Execution)机制,Hadoop会为该任务启动一个备份任务,让该备份任务与原始任务同时处理一份数据,谁先运行完成,则将谁的结果作为最终结果。
推测执行算法的核心思想是:某一时刻,判断一个任务是否拖后腿或者是否是值得为其启动备份任务,采用的方法为,先假设为其启动一个备份任务,则可估算出备份任务的完成时间estimatedEndTime2;同样地,如果按照此刻该任务的计算速度,可估算出该任务最有可能的完成时间estimatedEndTime1,这样estimatedEndTime1与estimatedEndTime2之差越大,表明为该任务启动备份任务的价值越大,则倾向于为这样的任务启动备份任务。
这种算法的最大优点是,可最大化备份任务的有效率,其中有效率是有效备份任务数与所有备份任务数的比值,有效备份任务是指完成时间早于原始任务完成时间的备份任务(即带来实际收益的备份任务)。备份任务的有效率越高、推测执行算法就越优秀,带来的收益也就越大。
推测执行机制实际上采用了经典的算法优化方法:以空间换时间,它同时启动多个相同任务处理同一份数据,并让这些任务竞争以缩短数据处理时间,显然这种方法需要占用更多的计算资源,在集群资源紧缺的情况下,应合理使用该机制,争取在多用少量资源情况下,减少大作业的计算时间。
7**、作业恢复**
当一个作业的MRAppMaster运行失败时,ResourceManager会将其迁移到另外一个节点上运行。为了避免作业重新计算带来的资源浪费,MRAppMaster在运行过程中会(在HDFS上)记录一些运行时日志以便重启时能够恢复作业之前的运行状态。
从作业恢复粒度角度来看,当前存在三种不同级别的恢复机制,级别由低到高依次是作业级别、任务级别以及记录级别,其中级别越低实现越简单,但造成的资源浪费越严重。当前MRAppMaster采用了任务级别的恢复机制,即以任务为基本单位进行恢复,这种机制是基于事务型日志完成作业恢复的,它只关注两种任务:运行完成的任务和未运行完成的任务。作业执行过程中,MRAppMaster会以日志的形式将作业以及任务状态记录下来,一旦MRAppMaster重启,则可从日志中恢复作业的运行状态,其中已经运行完成的任务,无须再运行,而未开始运行或者运行中的任务需重新运行。
8**、数据处理引擎**
MRAppMaster仍采用了MRv1中的数据处理引擎,分别由数据处理引擎Map Task和Reduce Task完成Map任务和Reduce任务的处理。但相比于MRv1,MRAppMaster对这两个引擎进行了优化,这些优化主要体现在Shuffle阶段,具体如下:
Map端——用Netty代替Jetty
MRv1版本中,TaskTracker采用了Jetty服务器处理来自各个Reduce Task的数据读取请求。由于Jetty采用了非常简单的网络模型,因此性能比较低。在Hadoop 2.0中,MRAppMaster改用Netty——另一种开源的客户/服务器端编程框架,由于它内部采用了Java NIO技术,故其相比Jetty更加高效。Netty社区也比Jetty的更加活跃,且稳定性更好。
Reduce****端——批拷贝
****MRv1版本中,在Shuffle过程中,Reduce Task会为每个数据分片建立一个专门的HTTP连接(One-connection-per-map),即使多个分片同时出现在一个TaskTracker上也是如此。为了提高数据复制效率,Hadoop 2.0尝试采用批拷贝技术:不再为每个Map Task建立一个HTTP连接,而是为同一个TaskTracker上的多个Map Task建立一个HTTP连接,进而能够一次读取多个数据分片。
9**、历史作业管理器**
为了方便用户查看MapReduce历史作业信息,MRAppMaster提供了一个JobHistoryServer服务,该服务主要由四个子服务组成,见下图。
对于这四个子服务的描述,具体如下:
HistoryClientService。为Web界面展示历史作业信息提供后端实现,它通过调用JobHistory中的相关API获取作业信息,比如提交时间、提交用户、Counter值等,并将这些信息发送到前端;
*
HSAddminService。管理员可使用”bin/mapred hsadmin”命令动态更新JobHistoryServer访问和管理权限信息。用户输入该命令后,后端的HSAdminService服务将执行该命令;
*
AggregatedLogDeletionService。该服务周期性扫描所有历史作业日志目录,如果一个日志目录存放时间超过yarn.nodemanager.log.retain-seconds(单位是秒,默认为3/*60/*60,即3小时),则直接将其从HDFS上删除;
*
JobHistory。该服务从HDFS上读取MapReduce历史作业日志,并将其解析成格式化信息,供前端查看。MRAppMaster将执行完成的作业信息保存到mapreduce.jobhistory.intermediate-done-dir指定的目录中,如果用户未设置该属性,则保存到${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate目录中,其中每个作业保存作业配置文件(后缀为.xml)和作业执行日志(后缀为.jhist)两个文件,通过解析作业执行日志,JobHistory能获取作业执行时的所有信息,包括作业提交时间、作业Counter、每个任务提交时间和运行时间、每个任务的Counter等。
参考文献:
——《Hadoop技术内幕深入解析YARN架构设计与实现原理》
——《CSDN博客》
转自https://mp.weixin.qq.com/s/-rk_Z6oKE0v-d3G-_UPZWQ
内容来源于网络,如有侵权,请联系作者删除!