Hadoop_Mapreduce
一、MapReduce
1.1、什么是MapReduce
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念”Map(映射)”和”Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
1.2、MapReduce的用途
1.3、Map函数
接受一个键值对(key-value pair),产生一组中间键值对。MapReduce框架会将map函数产生的中间键值对里键相同的值传递给一个reduce函数。
1.4、Reduce函数
接受一个键,以及相关的一组值,将这组值进行合并产生一组规模更小的值(通常只有一个或零个值
2、案例
统计文本的每个单词个数
首先先编写一个Maph函数(注意别倒错包了)
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * LongWritable 偏移量 long,表示该行在文件中的位置,而不是行号 * Text map阶段的输入数据 一行文本信息 字符串类型 String * Text map阶段的数据字符串类型 String * IntWritable map阶段输出的value类型,对应java中的int类型,表示行号 */ public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //读取每行文本 String line = value.toString(); //split拆分每行 String[] words = line.split(" ");//分词 //取出每个单词 for (String word : words) { //将单词转为Text类型 Text wordText = new Text(word); //将1转变为IntWritable IntWritable intWritable = new IntWritable(1); //写出单词,跟对应的1 context.write(wordText, intWritable); } } }
然后在编写Reduce函数
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * Text输入类型,就是输入的字符串类型,序列化 * IntWritable 输入字符串1,序列化 * Text 输出的字符串类型,序列化 * IntWritable 输出的求和数组,序列化 */ import java.io.IOException; /** * key 输入单词名字 * values 输入一串1 * context 输出的工具 */ public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable number : values) { sum += number.get(); } context.write(key, new IntWritable(sum)); } }
最后编写测试类来进行测试
注意:map阶段输入的文件必须存在,然后reduce的输出文件如果存在的话就会报错
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCount { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //定义配置对象 Configuration conf = new Configuration(); //定义一个工作任务对象 Job job = Job.getInstance(conf); //获取mapper的对象 job.setMapperClass(WordCountMap.class); //指定mapper阶段的一个输出的key job.setMapOutputKeyClass(Text.class); //指定mapper阶段输出的values类型 job.setMapOutputValueClass(IntWritable.class); //指定mapper阶段的输入文件 FileInputFormat.setInputPaths(job,new Path("C:\\Users\\wm020819\\Desktop\\123.txt")); //指定Reduce的类 job.setReducerClass(WordCountReduce.class); //指定Reduce阶段的一个输出的key job.setOutputKeyClass(Text.class); //指定Reduce阶段输出的values类型 job.setOutputValueClass(IntWritable.class); //指定Reduce阶段的出入文件,如果文件存在就会报文件已存在的错误 FileOutputFormat.setOutputPath(job,new Path("C:\\Users\\wm020819\\Desktop\\456")); //提交job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }