一、统计一段英文短文中个单词的数量

英文短文如下:
Convolutional layers are an important part of distinguishing traditional neural networks and play an important role in extracting feature information. Convolutional layers use convolution operations to extract features from an image. Convolutional neural networks generally contain multiple convolutional layers to obtain low-level features and high-level features, so that a large amount of data in the image is converted into a relatively small number of features, thus playing a role in improving the convergence speed. Where convolution is the sum of the convolutional kernels multiplied by one region (the receptive field) in the image. This results in a local summary of information.

主入口
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDemo {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //获取Hadoop集群的环境配置
        Configuration conf = new Configuration();
        //创建一个任务
        Job job = Job.getInstance(conf);
        job.setNumReduceTasks(2);
        job.setJobName("bigdata testMapReduce");
        job.setJarByClass(WordCountDemo.class);

        //设置map类
        job.setMapperClass(MyMapper.class);
        //设置reduce类
        job.setReducerClass(MyReduce.class);

        //设置map任务结果出来的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //指定数据在HDFS上的路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.waitForCompletion(true);
    }
}
自定义的mapper方法

将获取的每行数据进行split分隔分开,将每个单次作为key,给value赋值为1。将这样的key-value输出,这样就实现了map()方法。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        //先将Test类型转为String型
        String line = value.toString();

        //每一行数据用空格分隔
        String[] strings = line.split(" ");

        for (String string : strings) {
            context.write(new Text(string),new LongWritable(1L));
        }
    }
}
自定义的Reduce方法

reduce方法实现同一个key对应的value值的求和,并且输出新的key-value对,key为输入的呃key不变,value为输入的value求和。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class MyReduce extends Reducer<Text, LongWritable,Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum = 0L;

        for (LongWritable value : values) {
            long l = value.get();
            sum += l;
        }

        context.write(key,new LongWritable(sum));
    }
}

将文件打成jar包上传到虚拟机中,再将英文短文一txt文件的形式上传到hdfs上

上传完运行命令

hadoop jar hadoop-mapreduce-1.0-SNAPSHOT-jar-with-dependencies.jar com.shijia.WordCountDemo /shujia/bigdata/words.txt /shujia/bigdata/res1/

命令结构为:hadoop jar [jar包路径] [主程序入口] [文件在hdfs上的路径] [运行结果的输出路径(文件夹自动生成)]

由于设置的reduce的数量为2,所以结果生成两个为part-r-00000和part-r-00001,_SUCCESS为运行成功的标识文件。
结果文件的部分内容为:

the	1
Convolutional	3
Where	1
a	4
an	3
and	2
by	1
convergence	1
convolutional	2
distinguishing	1
extract	1
extracting	1
features	2
features,	2
field	1

结果以key value 的形式保存

二、IK分词器(统计三国演义指定词语个数)

IK分词器的简单使用(小例)

文章:第二种变化对古人而言更难察觉。在气候多变区的一年生植物,若是所有的种子同时快速发芽,就有绝种之虞。只要一阵干旱或霜害,幼苗就会全军覆没,物种也就无从延续了。因此,很多一年生植物必须演化出抑制发芽的规避风险方式,使种子得以休眠,在多年后仍可发芽。这样,即使大多数种子在一时之间遭天候摧残,逃过一劫的仍可在日后滋生、繁衍。

import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;

public class WordIk {
    public static void main(String[] args) throws IOException {
        //ik分词
        IKSegmenter ikSegmenter = new IKSegmenter(new StringReader("第二种变化对古人而言更难察觉。在气候多变区的一年生植物,若是所有的种子同时快速发芽,就有绝种之虞。只要一阵干旱或霜害,幼苗就会全军覆没,物种也就无从延续了。因此,很多一年生植物必须演化出抑制发芽的规避风险方式,使种子得以休眠,在多年后仍可发芽。这样,即使大多数种子在一时之间遭天候摧残,逃过一劫的仍可在日后滋生、繁衍。"),true);
        Lexeme next = null;
        while ((next = ikSegmenter.next())!=null){
            System.out.println(next.getLexemeText());
        }
    }
}

运行结果(部分):

第二种
变化
对
古人
而言
更难
察觉
在

IK分词器用来字符串分割成一个一个单次的形式输出。

在mapreduce任务中使用ik分词器进行单词统计

该例只统计其中”植物”和”种子”两个词语的数量

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;

/**
 * @author zhoufeng
 * @date 2022/5/27 19:32
 */
class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        StringReader sr = new StringReader(value.toString());
        IKSegmenter ikSegmenter = new IKSegmenter(sr, true);
        Lexeme next = null;
        while ((next = ikSegmenter.next()) != null) {
            String word = next.getLexemeText();
            if("植物".equals(word)||"种子".equals(word)){
                context.write(new Text(word), new LongWritable(1L));
            }
        }
    }
}

class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        Long sum = 0L;

        for (LongWritable value : values) {
            long l = value.get();
            sum+=l;
        }

        context.write(key,new LongWritable(sum));
    }
}

public class SGYYDemo {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setNumReduceTasks(1);
        job.setJobName("SGYY");

        job.setJarByClass(SGYYDemo.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setMapOutputValueClass(LongWritable.class);
        job.setMapOutputKeyClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

结果为:

植物	2
种子	3

三、PM2.5平均值

date hour type 1001A 1002A 1003A 1004A
20180101 0 AQI 74 56 103 82
20180101 0 PM2.5 52 31 77 60
20180101 0 PM2.5_24h 34 22 37 34
20180101 0 PM10 97 62 139 86
20180101 0 PM10_24h 75 51 76 64
20180101 0 SO2 6 4 13 2
20180101 0 SO2_24h 3 7 7 3
20180101 0 NO2 82 36 93 74
20180101 0 NO2_24h 55 34 50 50
20180101 0 O3 25 3 2

将表格中数据进行处理,得到每个城市的PM2.5的平均值

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        //将一行的数据转成String类型
        String line = value.toString();
        String[] strings = line.split(",");
        //过滤出PM2.5对应的数据
        if (strings.length >= 4 && "PM2.5".equals(strings[2])) {
            for (int i = 3, j = 1001; i < strings.length; i++, j++) {
                //对一行数据做简单的清洗,因为有的时间没有监控到PM2.5的值
                if ("".equals(strings[i]) || strings[i] == null || " ".equals(strings[i])) {
                    strings[i] = "0";
                }

                context.write(new Text("date: " + strings[0] + "-城市编号:" + j), new LongWritable(Long.parseLong(strings[i])));
            }
        }
    }
}


class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum = 0L;
        for (LongWritable value : values) {
            long l = value.get();
            sum += l;
        }
        //除以24得到该城市当天的PM2.5平均值
        long avg = sum / 24;
        context.write(key, new LongWritable(avg));
    }
}



public class PM25Avg {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //获取Hadoop集群的环境配置
        Configuration conf = new Configuration();
        //创建一个任务
        Job job = Job.getInstance(conf);
        job.setNumReduceTasks(1);
        job.setJobName("计算每个城市每天PM2.5");
        job.setJarByClass(PM25Avg.class);

        //设置map类
        job.setMapperClass(MyMapper.class);
        //设置reduce类
        job.setReducerClass(MyReducer.class);

        //设置map任务结果出来的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //设置reduce任务结果出来的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //指定数据在HDFS上的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

结果(部分):
date: 20180101-城市编号:1001 26
date: 20180101-城市编号:1002 19
date: 20180101-城市编号:1003 28
date: 20180101-城市编号:1004 28
date: 20180101-城市编号:1005 29
date: 20180101-城市编号:1006 25
date: 20180101-城市编号:1007 24
date: 20180101-城市编号:1008 25
date: 20180101-城市编号:1009 13
date: 20180101-城市编号:1010 17

版权声明:本文为f-1000原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/f-1000/p/16406628.html