hadoop之mapreduce详解(基础篇)
本篇文章主要从mapreduce运行作业的过程,shuffle,以及mapreduce作业失败的容错几个方面进行详解。
一、mapreduce作业运行过程
1.1、mapreduce介绍
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念”Map(映射)”和”Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。 —来源于百度百科
MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)
MapReduce是一个并行计算与运行软件框架(Software Framework)
MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)
mapreduce是hadoop中一个批量计算的框架,在整个mapreduce作业的过程中,包括从数据的输入,数据的处理,数据的数据输入这些部分,而其中数据的处理部分就要map,reduce,combiner等操作组成。在一个mapreduce的作业中必定会涉及到如下一些组件:
1、客户端,提交mapreduce作业
2、yarn资源管理器,负责集群上计算资源的协调
3、yarn节点管理器,负责启动和监控集群中机器上的计算容器(container)
4、mapreduce的application master,负责协调运行mapreduce的作业
5、hdfs,分布式文件系统,负责与其他实体共享作业文件
1.2、作业运行过程
作业的运行过程主要包括如下几个步骤:
1、作业的提交 2、作业的初始化 3、作业任务的分配 4、作业任务的执行 5、作业执行状态更新 6、作业完成
具体作业执行过程的流程图如下图所示:
1.2.1、作业的提交
作业提交源码分析详情见:hadoop2.7之作业提交详解(上) hadoop2.7之作业提交详解(下)
在MR的代码中调用waitForCompletion()方法,里面封装了Job.submit()方法,而Job.submit()方法里面会创建一个JobSubmmiter对象。当我们在waitForCompletion(true)时,则waitForCompletion方法会每秒轮询作业的执行进度,如果发现与上次查询到的状态有差别,则将详情打印到控制台。如果作业执行成功,就显示作业计数器,否则将导致作业失败的记录输出到控制台。
其中JobSubmmiter实现的大概过程如下:
1、向资源管理器resourcemanager提交申请,用于一个mapreduce作业ID,如图步骤2所示
2、检查作业的输出配置,判断目录是否已经存在等信息
3、计算作业的输入分片的大小
4、将运行作业的jar,配置文件,输入分片的计算资源复制到一个以作业ID命名的hdfs临时目录下,作业jar的复本比较多,默认为10个(通过参数mapreduce.client.submit.file.replication控制),
5、通过资源管理器的submitApplication方法提交作业
1.2.2、作业的初始化
1、当资源管理器通过方法submitApplication方法被调用后,便将请求传给了yarn的调度器,然后调度器在一个节点管理器上分配一个容器(container0)用来启动application master(主类是MRAppMaster)进程。该进程一旦启动就会向resourcemanager注册并报告自己的信息,application master并且可以监控map和reduce的运行状态。因此application master对作业的初始化是通过创建多个薄记对象以保持对作业进度的跟踪。
2、application master接收作业提交时的hdfs临时共享目录中的资源文件,jar,分片信息,配置信息等。并对每一个分片创建一个map对象,以及通过mapreduce.job.reduces参数(作业通过setNumReduceTasks()方法设定)确定reduce的数量。
3、application master会判断是否使用uber(作业与application master在同一个jvm运行,也就是maptask和reducetask运行在同一个节点上)模式运行作业,uber模式运行条件:map数量小于10个,1个reduce,且输入数据小于一个hdfs块
可以通过参数:
mapreduce.job.ubertask.enable #是否启用uber模式
mapreduce.job.ubertask.maxmaps #ubertask的最大map数
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce数
mapreduce.job.ubertask.maxbytes #ubertask最大作业大小
4、application master调用setupJob方法设置OutputCommiter,FileOutputCommiter为默认值,表示建立做的最终输出目录和任务输出的临时工作空间
1.2.3、作业任务的分配
1、在application master判断作业不符合uber模式的情况下,那么application master则会向资源管理器为map和reduce任务申请资源容器。
2、首先就是为map任务发出资源申请请求,直到有5%的map任务完成时,才会为reduce任务所需资源申请发出请求。
3、在任务的分配过程中,reduce任务可以在任何的datanode节点运行,但是map任务执行的时候需要考虑到数据本地化的机制,在给任务指定资源的时候每个map和reduce默认为1G内存,可以通过如下参数配置:
mapreduce.map.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.memory.mb
mapreduce.reduce.cpu.vcores
1.2.4、作业任务的执行
application master提交申请后,资源管理器为其按需分配资源,这时,application master就与节点管理器通信来启动容器。该任务由主类YarnChild的一个java应用程序执行。在运行任务之前,首先将所需的资源进行本地化,包括作业的配置,jar文件等。接下来就是运行map和reduce任务。YarnChild在单独的JVM中运行。
1.2.5、作业任务的状态更新
每个作业和它的每个任务都有一个状态:作业或者任务的状态(运行中,成功,失败等),map和reduce的进度,作业计数器的值,状态消息或描述当作业处于正在运行中的时候,客户端可以直接与application master通信,每秒(可以通过参数mapreduce.client.progressmonitor.pollinterval设置)轮询作业的执行状态,进度等信息。
1.2.6、作业的完成
当application master收到最后一个任务已完成的通知,便把作业的状态设置为成功。
在job轮询作业状态时,知道任务已经完成,然后打印消息告知用户,并从waitForCompletion()方法返回。
当作业完成时,application master和container会清理中间数据结果等临时问题。OutputCommiter的commitJob()方法被调用,作业信息由作业历史服务存档,以便用户日后查询。
二、shuffle
mapreduce确保每个reduce的输入都是按照键值排序的,系统执行排序,将map的输入作为reduce的输入过程称之为shuffle过程。shuffle也是我们优化的重点部分。shuffle流程图如下图所示:
2.1、map端
在生成map之前,会计算文件分片的大小:计算源码详见:hadoop2.7作业提交详解之文件分片
然后会根据分片的大小计算map的个数,对每一个分片都会产生一个map作业,或者是一个文件(小于分片大小*1.1)生成一个map作业,然后通过自定的map方法进行自定义的逻辑计算,计算完毕后会写到本地磁盘。
在这里不是直接写入磁盘,为了保证IO效率,采用了先写入内存的环形缓冲区,并做一次预排序(快速排序)。缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存缓冲区的大小到达一定比例时,默认为80%(可通过mapreduce.map.sort.spill.percent配置项修改),将启动一个溢写线程将内存缓冲区的内容溢写到磁盘(spill to disk),这个溢写线程是独立的,不影响map向缓冲区写结果的线程,在溢写到磁盘的过程中,map继续输入到缓冲中,如果期间缓冲区被填满,则map写会被阻塞到溢写磁盘过程完成。溢写是通过轮询的方式将缓冲区中的内存写入到本地mapreduce.cluster.local.dir目录下。在溢写到磁盘之前,我们会知道reduce的数量,然后会根据reduce的数量划分分区,默认根据hashpartition对溢写的数据写入到相对应的分区。在每个分区中,后台线程会根据key进行排序,所以溢写到磁盘的文件是分区且排序的。如果有combiner函数,它在排序后的输出运行,使得map输出更紧凑。减少写到磁盘的数据和传输给reduce的数据。
每次环形换冲区的内存达到阈值时,就会溢写到一个新的文件,因此当一个map溢写完之后,本地会存在多个分区切排序的文件。在map完成之前会把这些文件合并成一个分区且排序(归并排序)的文件,可以通过参数mapreduce.task.io.sort.factor控制每次可以合并多少个文件。
在map溢写磁盘的过程中,对数据进行压缩可以提交速度的传输,减少磁盘io,减少存储。默认情况下不压缩,使用参数mapreduce.map.output.compress控制,压缩算法使用mapreduce.map.output.compress.codec参数控制。
2.2、reduce端
map任务完成后,监控作业状态的application master便知道map的执行情况,并启动reduce任务,application master并且知道map输出和主机之间的对应映射关系,reduce轮询application master便知道主机所要复制的数据。
一个Map任务的输出,可能被多个Reduce任务抓取。每个Reduce任务可能需要多个Map任务的输出作为其特殊的输入文件,而每个Map任务的完成时间可能不同,当有一个Map任务完成时,Reduce任务就开始运行。Reduce任务根据分区号在多个Map输出中抓取(fetch)对应分区的数据,这个过程也就是Shuffle的copy过程。。reduce有少量的复制线程,因此能够并行的复制map的输出,默认为5个线程。可以通过参数mapreduce.reduce.shuffle.parallelcopies控制。
这个复制过程和map写入磁盘过程类似,也有阀值和内存大小,阀值一样可以在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操作和合并文件操作。
如果map输出很小,则会被复制到Reducer所在节点的内存缓冲区,缓冲区的大小可以通过mapred-site.xml文件中的mapreduce.reduce.shuffle.input.buffer.percent指定。一旦Reducer所在节点的内存缓冲区达到阀值,或者缓冲区中的文件数达到阀值,则合并溢写到磁盘。
如果map输出较大,则直接被复制到Reducer所在节点的磁盘中。随着Reducer所在节点的磁盘中溢写文件增多,后台线程会将它们合并为更大且有序的文件。当完成复制map输出,进入sort阶段。这个阶段通过归并排序逐步将多个map输出小文件合并成大文件。最后几个通过归并合并成的大文件作为reduce的输出
2.3、总结
当Reducer的输入文件确定后,整个Shuffle操作才最终结束。之后就是Reducer的执行了,最后Reducer会把结果存到HDFS上。
在Hadoop集群环境中,大部分map 任务与reduce任务的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map任务结果。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来说,我们对Shuffle过程的期望可以有:
1、完整地从map task端拉取数据到reduce 端。
2、在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
3、减少磁盘IO对task执行的影响。
在MapReduce计算框架中,主要用到两种排序算法:快速排序和归并排序。在Map任务发生了2次排序,Reduce任务发生一次排序:
1、第1次排序发生在Map输出的内存环形缓冲区,使用快速排序。当缓冲区达到阀值时,在溢写到磁盘之前,后台线程会将缓冲区的数据划分成相应分区,在每个分区中按照键值进行内排序。
2、第2次排序是在Map任务输出的磁盘空间上将多个溢写文件归并成一个已分区且有序的输出文件。由于溢写文件已经经过一次排序,所以合并溢写文件时只需一次归并排序即可使输出文件整体有序。
3、第3次排序发生在Shuffle阶段,将多个复制过来的Map输出文件进行归并,同样经过一次归并排序即可得到有序文件。
三、作业失败和容错
既然有作业的运行,肯定会有作业的失败,作业的失败(不考虑硬件,平台原因引起的失败)可能会存在不同的问题,如下:
3.1、任务运行失败
用户代码抛出异常(代码没写好):这种情况任务JVM会在退出之前向application master发送错误报告,并记录进用户日志,application master对该作业标记为failed,并释放掉占有的资源容器。
另一种就是JVM突然退出,这种情况节点管理器会注意到进程已经退出,并通知application master将此任务标记为失败,如果是因为推测执行而导致任务被终止,则不会被被标记为失败。而任务挂起又不同,一旦application master注意到有一段时间没有收到进度更新,便会把任务标记为失败,默认为10分钟,参数mapreduce.task.timeout控制application master被告知一个任务失败,将会重新调度该任务执行(会在与之前失败的不同节点上运行),默认重试4次,如果四次都失败,则作业判定为失败,参数控制为:
mapreduce.map.maxattempts
mapreduce.reduce.maxattempts
3.2、application master运行失败
AM也可能由于各种原因(如网络问题或者硬件故障)失效,Yarn同样会尝试重启AM
可以为每个作业单独配置AM的尝试重启次数:mapreduce.am.max-attempts,默认值为2
Yarn中的上限一起提高:yarn.resourcemanager.am.nax-attempts,默认为2,单个应用程序不可以超过这个限制,除非同时修改这两个参数。
恢复过程:application master向资源管理器发送周期性的心跳。当application master失败时,资源管理器会检测到该失败,并在一个新的容器中启动application master,并使用作业历史来恢复失败的应用程序中的运行任务状态,使其不必重新运行,默认情况下恢复功能是开启的,yarn.app.mapreduce.am.job.recovery.enable控制客户端向application master轮询作业状态时,如果application master运行失败了,则客户端会向资源管理器resourcemanager询问和缓存application master地址。
3.3、节点管理器运行失败
如果节点管理器崩溃或者运行非常缓慢,则就会停止向资源管理器发送心跳信息,如果10分钟(可以通过参数yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms设置)资源管理器没有收到一条心跳信息,则资源管理器将会通知停止发送心跳的节点管理器,并将其从自己的资源池中移除该节点管理器,在该节点上的application master和任务的失败,都通过如上两种恢复机制进行恢复。
3.4、资源管理器运行失败
资源管理器失败时一个很严重的问题,所有的任务将不能被分配资源,作业和容器都无法启动,那么整个通过yarn控制资源的集群都处于瘫痪状态。
容错机制:resourcemanager HA 详情见:hadoop高可用安装和原理详解
更多hadoop生态文章见: hadoop生态系列
参考:
《Hadoop权威指南 大数据的存储与分析 第四版》