深入理解hadoop之mapreduce
本文系原创,若有转载需要,请注明出处。https://www.cnblogs.com/bigdata-stone/
1.mapReduce简介
MapReduce是面向大数据并行处理的计算模型、框架和平台。
- 映射(Mapping) :对集合里的每个目标应用同一个操作。即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping(这里体现了移动计算而不是移动数据)。
- 化简(Reducing):遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing。
- MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce。简单来讲,map就是分,而reduce 就是合。
- 数据从map端进入(以key,value的形式),同样以(key,value)的形式传出mapper,进过shuffle过程,所有相同的key都会分配进入同一个reduce。而对于整个作业的map数目取决于split切片的个数,经过inputformat进行切割,切了多少块就有几个map,切完之后再在每个节点上并发执行,并行计算,所以可以有多个map,而reduce的个数是可以进行设置的,所以reduce的个数和map的个数是多对多的关系。
- 对于map端而言,产生的数据需要在map端进行分区,而分区partitions的数量取决于reduce的数量,而默认的分区函数就是hash(),所以在map端的分区相当于将数据进行预先的分组,经过shuffle之后,相应的分区就会进入相应的reduce里面去
2.图解计算框架:(画图不易,请勿挑剔)
2.1. inputformat工作机制
-
在 MapReduce 程序的开发过程中,往往需要用到 FileInputFormat与 TextInputFormat,我们会发现 TextInputFormat 这个类继承自FileInputFormat , FileInputFormat 这 个 类 继 承 自 InputFormat ,InputFormat 这个类会将文件 file 按照逻辑进行划分,划分成的每一个split 切片将会被分配给一个 Mapper 任务,文件先被切分成 split 块,而后每一个 split 切片对应一个 Mapper 任务 。
- TextInputFormat这个类继承自FileInputFormat这个抽象类,文本文件被切成行,使用回车换行符作为行结束的标志,key是这一行在文件中的位置,value是这一行文本。
- 关于切片split。hdfs切片的计算法则是:Math.max(minSize, Math.min(maxSize, blockSize));三值中取中间值。 简单地按照文件的内容长度进行切片切片大小,默认等于 block 大小。切片时不考虑数据集整体,而是逐个针对每一个文件单独切片默认情况下, split size =block size,在 hadoop 2.x 中为 128M
2.2 MapTask 端的工作机制
input File 通过 split 被逻辑切分为多个 split 文件,通过 Record按行读取内容给 map(用户自己实现的)进行处理,数据被 map 处理结束之后交给 OutputCollector 收集器,对其结果 key 进行分区(默认使用 hash 分区),然后写入 buffer,每个 map task 都有一个内存缓冲区,存储着 map 的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个 map task 结束后再对磁盘中这个 maptask 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待 reduce task 来拉数据。 Map 端的输入的(k,v)分别是该行的起始偏移量,以及每一行的数据内容,map 端的输出(k,v)可以根据需求进行自定义,但是如果输出的是 javabean 对象,需要对javabean 继承 writable 。
1)partitioner
分区函数partitioner 的作用是将 mapper输出的 key/value通过给定的分区函数来拆分为分片(shard),每个 reducer 对应一个分片 默认情况下, partitioner 先计算 key 的散列值(通常为 md5值)。然后通reducer 个数执行取模运算: key.hashCode%(reducer 个数)。这种方式不仅能够随机地将整个key空间平均分发给每个reducer,同时也能确保不同mapper产生的相同key能被分发到同一个reducer。也可以自定义分区去继承 partition<key,value>把不同的结果写入不同的文件中分区 Partitioner 主要作用在于以下两点 (1)根据业务需要,产生多个输出文件;(2)多个 reduce 任务并发运行,提高整体 job 的运行效率 map 端的 combine 组件。
2)Combiner
每一个 map 都可能会产生大量的本地输出, Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络 IO 性能,是 MapReduce 的一种优化手段之一combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件combiner 组件的父类就是 Reducercombiner 和 reducer 的区别在于运行的位置:combiner 是在每一个 maptask 所在的节点运行reducer 是接收全局所有 Mapper 的输出结果;combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量具体实现步骤:
1)自定义一个 combiner 继承 Reducer,重写 reduce 方法
2)中设置: job.setCombinerClass(CustomCombiner.class)combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combine输出 kv 应该跟 reducer 的输入 kv 类型要对应起来
Combiner 使用需要注意的是:
1.有很多人认为这个 combiner 和 map 输出的数据合并是一个过程,其实不然, map 输出的数据合并只会产生在有数据 spill 出的时候,即进行 merge 操作。
2.与 mapper 与 reducer 不同的是, combiner 没有默认的实现,需要显式的设置在 conf 中才有作用。
3.并不是所有的 job 都适用 combiner,只有操作满足结合律的才可设置 combiner。 combine 操作类似于: opt(opt(1, 2, 3), opt(4, 5,6))。如果 opt 为求和、求最大值的话,可以使用,但是如果是求中值的话,不适用。
4.一般来说, combiner 和 reducer 它们俩进行同样的操作。
2.3shuffle 的过程
shuffle 的过程是:Map 产生输出开始到 Reduc 取得数据作为输入之前的过程称作 shuffle.1).Collect 阶段:将 MapTask 的结果输出到默认大小为100M 的环形缓冲区,保存的是 key/value, Partition 分区信息等。2).Spill 阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 combiner,还会将有相同分区号和 key 的数据进行排序。3).Merge 段把所有溢出的临时文件进行一次合并操作,以确保一个MapTask 最终只产生一个中间数据文件。4).Copy 阶段: ReduceTask 启动 Fetcher 线程到已经完成MapTask 的节点上复制一份属于自己的数据,这些数据默认会存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。5).Merge 阶段:在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。6).Sort 阶段:在对数据进行合并的同时,会进行排序操作,由于 MapTask 阶段已经对数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整体有效性即可。Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认 100M
2.4reduceTask
reducer 将已经分好组的数据作为输入,并依次为每个键对应分组执行 reduce 函数。 reduce 函数的输入是键以及包含与该键对应的所有值的迭代器。reduce 端的输入是 map 端的输出,它的输出的(k,v)根据需求进行自定义reducetask 并行度同样影响整个 job 的执行并发度和执行效率,与maptask的并发数由切片数决定不同, Reducetask 数量的决定是可以直接手动设置:job.setNumReduceTasks(4);如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜。默认的 reduceTask 的是 1 。
2.5 outputformat
OutputFormat 主要用于描述输出数据的格式,它能够将用户提供的 key/value对写入特定格式的文件中。 Hadoop 自带了很多 OutputFormat 的实现,它们与InputFormat 实现相对应,足够满足我们业务的需要。
3.wordcount实例讲解
Map端代码
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text keyout = new Text();
IntWritable valueout = new IntWritable();
String[] arr = value.toString().split(" ");
for(String s:arr){
keyout.set(s);
valueout.set(1);
context.write(keyout,valueout);
}
}
}
reduce端代码:
public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0 ;
for(IntWritable iw:values){
count=iw.get()+count;
}
context.write(key,new IntWritable(count));
}
}
app端代码:
public class WCApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf= new Configuration();
// conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
job.setJobName("WCApp");
job.setJarByClass(WCApp.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
3.1各个角色实体
-
程序运行时过程设计到的一个角色实体
1.1. Client:编写mapreduce程序,配置作业,提交作业的客户端 ;
1.2. ResourceManager:集群中的资源分配管理 ;
1.3. NodeManager:启动和监管各自节点上的计算资源 ;
1.4. ApplicationMaster:每个程序对应一个AM,负责程序的任务调度,本身也是运行在NM的Container中 ;
1.5. HDFS:分布式文件系统,保存作业的数据、配置信息等等。 -
客户端提交Job
2.1. 客户端编写好Job后,调用Job实例的Submit()或者waitForCompletion()方法提交作业;
2.2. 客户端向ResourceManager请求分配一个Application ID,客户端会对程序的输出、输入路径进行检查,如果没有问题,进行作业输入分片的计算。 -
Job提交到ResourceManager
3.1. 将作业运行所需要的资源拷贝到HDFS中(jar包、配置文件和计算出来的输入分片信息等);
3.2. 调用ResourceManager的submitApplication方法将作业提交到ResourceManager。 -
给作业分配ApplicationMaster
4.1. ResourceManager收到submitApplication方法的调用之后会命令一个NodeManager启动一个Container ;
4.2. 在该NodeManager的Container上启动管理该作业的ApplicationMaster进程。 -
ApplicationMaster初始化作业
5.1. ApplicationMaster对作业进行初始化操作;
5.2. ApplicationMaster从HDFS中获得输入分片信息(map、reduce任务数) -
任务分配
6.1. ApplicationMaster为其每个map和reduce任务向RM请求计算资源;
6.2. map任务优先于reduce任,map数据优先考虑本地化的数据。任务执行,在 Container 上启动任务(通过YarnChild进程来运行),执行map/reduce任务。
3.2时间先后顺序
-
输入分片(input split)
每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为128M,可以设置)为一个分片。map输出的结果会暂且放在一个环形内存缓冲区中(mapreduce.task.io.sort.mb=100M
),当该缓冲区快要溢出时(默认mapreduce.map.sort.spill.percent=0.8
),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件; -
map阶段:由我们自己编写,最后调用 context.write(…);
-
partition分区阶段
3.1. 在map中调用 context.write(k2,v2)方法输出,该方法会立刻调用 Partitioner类对数据进行分区,一个分区对应一个 reduce task。
3.2. 默认的分区实现类是 HashPartitioner ,根据k2的哈希值 % numReduceTasks
,可能出现“数据倾斜”现象。
3.3. 可以自定义 partition ,调用 job.setPartitioner(…)自己定义分区函数。 -
combiner合并阶段:将属于同一个reduce处理的输出结果进行合并操作
4.1. 是可选的;
4.2. 目的有三个:1.减少Key-Value对;2.减少网络传输;3.减少Reduce的处理。 -
shuffle阶段:即Map和Reduce中间的这个过程
5.1. 首先 map 在做输出时候会在内存里开启一个环形内存缓冲区,专门用来做输出,同时map还会启动一个守护线程;
5.2. 如缓冲区的内存达到了阈值的80%,守护线程就会把内容写到磁盘上,这个过程叫spill,另外的20%内存可以继续写入要写进磁盘的数据;
5.3. 写入磁盘和写入内存操作是互不干扰的,如果缓存区被撑满了,那么map就会阻塞写入内存的操作,让写入磁盘操作完成后再继续执行写入内存操作;
5.4. 写入磁盘时会有个排序操作,如果定义了combiner函数,那么排序前还会执行combiner操作;
5.5. 每次spill操作也就是写入磁盘操作时候就会写一个溢出文件,也就是说在做map输出有几次spill就会产生多少个溢出文件,等map输出全部做完后,map会合并这些输出文件,这个过程里还会有一个Partitioner操作(如上)
5.6. 最后 reduce 就是合并map输出文件,Partitioner会找到对应的map输出文件,然后进行复制操作,复制操作时reduce会开启几个复制线程,这些线程默认个数是5个(可修改),这个复制过程和map写入磁盘过程类似,也有阈值和内存大小,阈值一样可以在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操作和合并文件操作,这些操作完了就会进行reduce计算了。 -
reduce阶段:由我们自己编写,最终结果存储在hdfs上的。