1、 MapReduce是什么

  Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。这个定义里面有着这些关键词,

2、 MapReduce做什么

  MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”。

  (1)Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:

一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。

  (2)Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。

  一个比较形象的语言解释MapReduce:  

  我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。

  现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。

3、第一个MapReduce程序:WordCount

  WordCount单词计数是最简单也是最能体现MapReduce思想的程序之一。

启动一个普通的maven工程。

1.先配置pom.xml

导入三个jar包

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.6.0</version>
</dependency>

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>2.6.0</version>
</dependency>

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.6.0</version>
</dependency>

2.创建一个Mapper类(注意导的包)

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
 * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,类型:long
 * 但是在Hadoop中有更精简的序列化接口,所以不直接用long ,而是用Longwriterable
 * VALUEIN:默认情况下,是mr框架所读的一行文本的内容, 类型:String  ,同上,用Text(import org.apache.hadoop.io.Text)
 *
 * KEYOUT:    是用户自定义逻辑处理完成之后输出数据的key,在此处是单词,类型 String  同上,用Text
 * VALUEOUT:  是用户自定义逻辑处理完成之后输出数据的value,在此处是单词次数,类型 Integer 同上,用 Intwriterable
 */

public class WordCountMap extends Mapper<LongWritable ,Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //读取每行文本
        String line = value.toString();
        //splite拆分每行
        String[] words = line.split(" "); //分词
        //去除每个单词
        for (String  woed: words
             ) {
            //将每行数据转化为text类型
            Text wordText = new Text(woed);
            //将 1 转化为intwritab类型
            IntWritable outValue = new IntWritable(1);
            //写出单词跟对应的1
            context.write(wordText,outValue);
        }

    }
}

3.创建一个Reducer类(注意导的包)

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;


public class WordCountReduce extends Reducer <Text, IntWritable,Text,IntWritable>{
    // key 输入单词的名字
    // values 输入一串 1
    // content 输出的工具

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable number:values
             ) {
            sum += number.get();
        }
        context.write(key,new IntWritable());
    }
}

4.创建driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //定义配置文件
        Configuration conf = new Configuration();
        //准备一个空任务
        Job job = Job.getInstance(conf,"wc");

        //设置任务的输入数据源
        FileInputFormat.addInputPath(job,new Path("D:\\桌面下载\\123.txt"));

        //设置你的Mapper任务类
        job.setMapperClass(WordCountMap.class);
        //设置mapper的key的输出数据类型
        job.setMapOutputKeyClass(Text.class);
        //设置mapper的valuer的输出数据类型
        job.setMapOutputValueClass(IntWritable.class);

        //设置你的Reducer任务类
        job.setReducerClass(WordCountReduce.class);
        //设置Reduce任务类输出的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置任务的输出数据目标
        FileOutputFormat.setOutputPath(job,new Path("D:\\桌面下载\\456.txt"));
        //启动任务并执行
        job.waitForCompletion(true);
    }
}

如果运行没有报错那么就去任务输出路径查看生成的文件夹。

如果报错可能是环境变量问题博文:https://blog.csdn.net/tmh1995/article/details/106551092

以上为在Windows系统上实现MapReduce的Word Count

版权声明:本文为and脱发周大侠原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/zzc1102/p/16246608.html