Hadoop:mapreduce代码统计文本单词
首先编写wordcountMap类
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMap extends Mapper<LongWritable, Text,Text, IntWritable> { /* * LongWritable:偏移量,表示该行在文件中的位置,而不是行号 * Text map阶段的输入数据,一行文本信息,字符串类型String * Text map阶段的数据字符串类型String * IntWritable map阶段输出的value类型,对应Java中的int类型,表示行号 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取每行文本 String line = value.toString(); //splite拆分 String[] words= line.split(" "); //取出每个单词 for (String word:words){ //将单词转换为Text类型的 Text wordText = new Text(word); //将1转变为IntWritablele IntWritable outValue = new IntWritable(1); //写出单词跟对应1 context.write(wordText,outValue); } } }
再编写wordcountreduce类
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> { /* * Text:输入的字符串类型,序列化 * IntWritable:输入一串1,序列化 * Text:输出的字符串类型,序列化 * IntWritable:输出的求和数组,序列化 * */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { /* * key:输入的单词的名字 * values:输入一串1 * context:输入的工具 * */ int sum=0; for(IntWritable number:values){ sum+=number.get(); } context.write(key,new IntWritable(sum)); } }
最后编写wordcount类将前面的两个类结合起来
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static void main(String[] args) throws Exception { // 创建本次mr程序的job实例 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 指定本次job运行的主类 job.setJarByClass(WordCount.class); // 指定本次job的具体mapper reducer实现类 job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); // 指定本次job map阶段的输出数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定本次job reduce阶段的输出数据类型 也就是整个mr任务的最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定本次job待处理数据的目录 和程序执行完输出结果存放的目录 FileInputFormat.setInputPaths(job, "E:\\Demo\\hadoop\\input\\Wordcount.txt"); FileOutputFormat.setOutputPath(job, new Path("E:\\Demo\\hadoop\\output")); // 提交本次job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
(需要提前在Wordcount.txt中写入文本)