大数据学习(07)——Hadoop3.3高可用环境搭建

前面用了五篇文章来介绍Hadoop的相关模块,理论学完还得操作一把才能加深理解。这一篇我会花相当长的时间从环境搭建开始,到怎么在使用Hadoop,逐步介绍Hadoop的使用。

本篇分这么几段内容:

  1. 规划部署节点
  2. 节点免密和网络配置
  3. zookeeper分布式集群搭建
  4. Hadoop分布式集群搭建
  5. IDEA远程提交MapReduce任务到分布式集群

规划部署节点

HDFS高可用至少有两个NameNode(NN),副本存三份有 三个DataNode(DN)。Yarn高可用至少有两个Resource Manager(RM),计算向存储移动需要在每个DataNode上部署NodeManager(NM)。Zookeeper(ZK)三节点选主。JournalNode(JN)三节点才有过半成功。

基于以上考虑,规划部署节点如下:

主机 IP NN RM ZKFC DN NM JN ZK
server01 192.168.0.111        
server02 192.168.0.112        
server03 192.168.0.113      
server04 192.168.0.114      
server05 192.168.0.115      

注意:1.ZKFC与NameNode必须同节点才能选主。

           2.DataNode与NodeManager必须同节点,体现计算向数据移动。

           3.新建hadoop用户和组,以上软件全部用hadoop用户启动。

节点免密和网络配置

先要关闭五台机的防火墙,这一点非常重要。否则后面启动zookeeper后,zkServer.sh status不能正常查看zk状态,三个zk之间无法通讯。第二个Namenode也无法同步第一个Namenode的数据。

[root@server04 home]# systemctl stop firewalld
[root@server04 home]# systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.

按照规划修改/etc/hostname文件中的主机名。

在/etc/hosts文件中增加五台机的主机名解析。

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.0.111 server01
192.168.0.112 server02
192.168.0.113 server03
192.168.0.114 server04
192.168.0.115 server05

 节点免密

由于server01、server02上可能会启动其他节点的脚本,所以server01和server02要对其他节点免密。

以server01为例,依次输入以下命令,将server01的公钥分发给其他节点,达到免密登录的目的。

注意一点,ssh-copy-id生成的authorized_keys权限是600,如果权限是644就没办法免密登录,这个文件权限要求很苛刻。

[hadoop@server01 ~]$ ssh-keygen -t rsa -P \'\' -f ~/.ssh/id_rsa
[hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub  hadoop@server01
[hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub  hadoop@server02
[hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub  hadoop@server03
[hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub  hadoop@server04
[hadoop@server01 ~]$ ssh-copy-id -i ~/.ssh/id_rsa.pub  hadoop@server05

zookeeper分布式集群搭建

下载zookeeper

从官网上下载apache-zookeeper-3.5.8-bin.tar.gz包,注意不带bin的包里面是源码,不能启动QuorumPeerMain。

安装zookeeper

将apache-zookeeper-3.5.8-bin.tar.gz包解压到/usr目录下,把用户和组设置为hadoop。

在环境变量里增加

ZOOKEEPER_HOME=/usr/apache-zookeeper-3.5.8-bin
PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin

将/usr/apache-zookeeper-3.5.8-bin/conf/zoo_sample.cfg复制一份到相同目录,命令为zoo.cfg,这是启动zookeeper时要读取的配置文件。

以server03为例,修改zoo.cfg的内容。

[hadoop@server03 home]$ cat /usr/apache-zookeeper-3.5.8-bin/conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
# 这里存放zookeeper的myid文件
dataDir=/opt/zookeeper/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

# server.x的数值与myid中的一致,后面接是zookeeper的三个节点。
# 2888端口是集群节点通讯使用
# 3888端口是选举leader使用
server.3=server03:2888:3888
server.4=server04:2888:3888
server.5=server05:2888:3888

在/opt/zookeeper/data目录下创建myid文件

[hadoop@server03 home]$ echo 3 > /opt/zookeeper/data/myid

至此,zookeeper搭建完毕,server04和server05的步骤一致。

在三台机器分别启动zookeeper。

[hadoop@server03 home]$ zkServer.sh start
[hadoop@server03 home]$ jps
3762 Jps
1594 QuorumPeerMain
[hadoop@server03 home]$ zkServer.sh status
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

server03被选为主节点了。

Hadoop分布式集群搭建

 Hadoop基于Java开发,必须先安装jdk。本次实验使用的版本是jdk-11.0.8,安装完成后需要在环境变量里加入JAVA_HOME。

下载Hadoop

从官网下载hadoop-3.3.0.tar.gz,解压到/usr目录下,并将用户和组修改为hadoop。

配置Hadoop

官方网站上有详细的说明 Apache Hadoop 3.3.0,把单节点的配置和高可用配置放在相应的文件下就行,这里我再梳理一遍。在此之前,需要修改/usr/hadoop-3.3.0/etc/hadoop/hadoop-env.sh,增加export JAVA_HOME=/usr/java/jdk-11.0.8,让hadoop运行命令时能调用jdk。

在环境变量里增加

HADOOP_HOME=/usr/hadoop-3.3.0
PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

1. /usr/hadoop-3.3.0/etc/hadoop/core-site.xml

<configuration>
    <property>
      <name>fs.defaultFS</name>
      <value>hdfs://mycluster</value>
    </property>
    <property>
      <name>hadoop.tmp.dir</name>
      <value>/opt/hadoop/tmp</value>
    </property>
     <property>
       <name>ha.zookeeper.quorum</name>
       <value>server03:2181,server04:2181,server05:2181</value>
     </property>
</configuration>

2./usr/hadoop-3.3.0/etc/hadoop/hdfs-site.xml

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
      <name>dfs.nameservices</name>
      <value>mycluster</value>
    </property>
    <property>
      <name>dfs.ha.namenodes.mycluster</name>
      <value>nn1,nn2</value>
    </property>
    <property>
      <name>dfs.namenode.rpc-address.mycluster.nn1</name>
      <value>server01:8020</value>
    </property>
    <property>
      <name>dfs.namenode.rpc-address.mycluster.nn2</name>
      <value>server02:8020</value>
    </property>
    <property>
      <name>dfs.namenode.http-address.mycluster.nn1</name>
      <value>server01:9870</value>
    </property>
    <property>
      <name>dfs.namenode.http-address.mycluster.nn2</name>
      <value>server02:9870</value>
    </property>
    <property>
      <name>dfs.namenode.shared.edits.dir</name>
      <value>qjournal://server03:8485;server04:8485;server05:8485/mycluster</value>
    </property>
    <property>
      <name>dfs.client.failover.proxy.provider.mycluster</name>
      <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <property>
      <name>dfs.ha.fencing.methods</name>
      <value>sshfence</value>
    </property>
    <property>
      <name>dfs.ha.fencing.ssh.private-key-files</name>
      <value>/home/hadoop/.ssh/id_rsa</value>
    </property>
    <property>
      <name>dfs.ha.fencing.ssh.connect-timeout</name>
      <value>30000</value>
    </property>
    <property>
      <name>dfs.journalnode.edits.dir</name>
      <value>/opt/hadoop/journaldata</value>
    </property>
    <property>
      <name>dfs.ha.nn.not-become-active-in-safemode</name>
      <value>true</value>
    </property>
     <property>
       <name>dfs.ha.automatic-failover.enabled</name>
       <value>true</value>
     </property>
</configuration>

3./usr/hadoop-3.3.0/etc/hadoop/mapred-site.xml

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>mapreduce.application.classpath</name>
        <value>$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/mapreduce/lib/*</value>
    </property>
    <property>
        <name>yarn.app.mapreduce.am.env</name>
        <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
    </property>
    <property>
        <name>mapreduce.map.env</name>
        <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
    </property>
    <property>
        <name>mapreduce.reduce.env</name>
        <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
    </property>
</configuration>

4./usr/hadoop-3.3.0/etc/hadoop/yarn-site.xml

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.application.classpath</name>
        <value>这里配置为server01上执行hadoop classpath返回的内容</value>
    </property>
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>
    <property>
      <name>yarn.resourcemanager.ha.enabled</name>
      <value>true</value>
    </property>
    <property>
      <name>yarn.resourcemanager.cluster-id</name>
      <value>cluster1</value>
    </property>
    <property>
      <name>yarn.resourcemanager.ha.rm-ids</name>
      <value>rm1,rm2</value>
    </property>
    <property>
      <name>yarn.resourcemanager.hostname.rm1</name>
      <value>server01</value>
    </property>
    <property>
      <name>yarn.resourcemanager.hostname.rm2</name>
      <value>server02</value>
    </property>
    <property>
      <name>yarn.resourcemanager.webapp.address.rm1</name>
      <value>server01:8088</value>
    </property>
    <property>
      <name>yarn.resourcemanager.webapp.address.rm2</name>
      <value>server02:8088</value>
    </property>
    <property>
      <name>hadoop.zk.address</name>
      <value>server03:2181,server04:2181,server05:2181</value>
    </property>
</configuration>

 

5./usr/hadoop-3.3.0/etc/hadoop/works

server03
server04
server05

 启动zookeeper

在serverr03、server04、server05三台机器上使用hadoop用户执行zkServer.sh start启动zookeeper。三台都启动之后,在每台机执行zkServer.sh status可以查看本节点是主节点(leader)还是从节点(follower)。

[hadoop@server03 ~]$ zkServer.sh start
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@server03 ~]$ jps
3383 QuorumPeerMain
3422 Jps
[hadoop@server03 ~]$ zkServer.sh status
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

启动JournalNode

在启动journalnode之前,我们在hdfs-site.xml里做了如下配置

    <property>
      <name>dfs.journalnode.edits.dir</name>
      <value>/opt/hadoop/journaldata</value>
    </property>

我们先用root用户创建这个路径,再把用户和组修改成hadoop

[root@server04 ~]# cd /opt
[root@server04 opt]# ll
总用量 0
drwxr-xr-x. 3 hadoop hadoop 18 8月   8 23:37 zookeeper
[root@server04 opt]# mkdir -p hadoop/journaldata
[root@server04 opt]# chown -R hadoop:hadoop hadoop/
[root@server04 opt]# ll
总用量 0
drwxr-xr-x. 3 hadoop hadoop 25 8月  11 22:42 hadoop
drwxr-xr-x. 3 hadoop hadoop 18 8月   8 23:37 zookeeper

在server03、server04、server05三台机执行hadoop-daemon.sh start journalnode命令,启动journalnode

[hadoop@server03 sbin]$ hadoop-daemon.sh start journalnode
WARNING: Use of this script to start HDFS daemons is deprecated.
WARNING: Attempting to execute replacement "hdfs --daemon start" instead.
[hadoop@server03 sbin]$ jps
3383 QuorumPeerMain
4439 JournalNode
4476 Jps

格式化HDFS

我们在core-site.xml里有这么一段配置

    <property>
      <name>hadoop.tmp.dir</name>
      <value>/opt/hadoop/tmp</value>
    </property>

先用root创建这个目录,然后修改用户和组为hadoop

[root@server01 ~]# cd /opt
[root@server01 opt]# ll
总用量 0
[root@server01 opt]# mkdir -p hadoop/tmp
[root@server01 opt]# chown -R hadoop:hadoop hadoop/
[root@server01 opt]# ll
总用量 0
drwxr-xr-x. 3 hadoop hadoop 17 8月  11 22:54 hadoop

在server01上执行hadoop namenode -format,对HDFS格式化。

启动Hadoop

第一次启动,在server01上执行hadoop-daemon.sh start namenode。

[hadoop@server01 tmp]$ hadoop-daemon.sh start namenode
WARNING: Use of this script to start HDFS daemons is deprecated.
WARNING: Attempting to execute replacement "hdfs --daemon start" instead.
[hadoop@server01 tmp]$ jps
24664 NameNode
24700 Jps

在server02上执行hadoop namenode -bootstrapStandby同步server01的信息。

在server01上执行hdfs zkfc -formatZK格式化zkfc。

以上操作完成后,在server01上执行stop-all.sh,把相关的服务全部停掉。

在server01上执行start-all.sh,启动集群。

[hadoop@server01 hadoop]$ start-all.sh
WARNING: Attempting to start all Apache Hadoop daemons as hadoop in 10 seconds.
WARNING: This is not a recommended production deployment configuration.
WARNING: Use CTRL-C to abort.
Starting namenodes on [server01 server02]
Starting datanodes
Starting journal nodes [server04 server03 server05]
Starting ZK Failover Controllers on NN hosts [server01 server02]
Starting resourcemanagers on [ server01 server02]
Starting nodemanagers
[hadoop@server01 hadoop]$ jps
6323 DFSZKFailoverController
5977 NameNode
6668 ResourceManager
6797 Jps
[hadoop@server02 ~]$ jps
6736 ResourceManager
6600 DFSZKFailoverController
6492 NameNode
7007 Jps
[hadoop@server03 current]$ jps
27705 NodeManager
27978 Jps
5531 QuorumPeerMain
27580 JournalNode
27470 DataNode
[hadoop@server04 tmp]$ jps
5104 QuorumPeerMain
26819 NodeManager
26694 JournalNode
27094 Jps
26584 DataNode
[hadoop@server05 tmp]$ jps
4663 QuorumPeerMain
26394 NodeManager
26268 JournalNode
26684 Jps
26159 DataNode

看到这些结果,我忍不住给自己点个赞,让开发人员来搭环境真不容易啊。搭建环境的过程遇到了很多坑,查找资料一一解决,最后成功,这种感觉挺棒的。

IDEA远程提交MapReduce任务到分布式集群

环境搭好了就该实战一把了。

出一个题目,给出一些日期的气温,从中找出每个月温度最高的两天所对应的气温。注意:一天的气温可能会记录多次,但是结果只能输出最高的一条。

气温文件
1949-10-01 34c
1949-10-01 38c
1949-10-02 36c
1950-01-01 32c
1950-10-01 37c
1951-12-01 23c
1950-10-02 41c
1950-10-03 27c
1951-07-01 45c
1951-07-02 46c
1951-07-03 47c

结题思路:每一行记录可以全部作为Map阶段输出的Key值,按照Key值中的年、月分组,再按照年、月、温度排序,最后去掉一天有多条的记录,输出结果。

HadoopClient.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

public class HadoopClient {
    public static void main(String[] args) throws Exception {

        //1,conf

        System.setProperty("HADOOP_USER_NAME", "hadoop");
        Configuration conf = new Configuration(true);
        conf.set("mapreduce.app-submission.cross-platform", "true");
        conf.set("mapreduce.job.jar","D:\\IDEAProject\\hadoopmr\\target\\hadoopmr-1.0-SNAPSHOT.jar");

        //2,job
        Job job = Job.getInstance(conf);
        job.setJarByClass(HadoopClient.class);
        job.setJobName("FirstMR");

        //3,输入输出路径
        Path input = new Path("/tq/input/data.txt");
        FileInputFormat.addInputPath(job, input);

        Path output = new Path(URI.create("/tq/output/"));
        //
        if(output.getFileSystem(conf).exists(output)){
            output.getFileSystem(conf).delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output );

        //4,map
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(MyKey.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setPartitionerClass(MyPartitioner.class);
        job.setSortComparatorClass(MySortComparator.class);


        //5,reduce
        // 分组比较器
        job.setGroupingComparatorClass(MyGroupingComparator.class);
        job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(1);
        job.setCombinerKeyGroupingComparatorClass(MyGroupingComparator.class);

        //7,submit
        job.waitForCompletion(true);
    }
}

MyKey.java

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MyKey implements WritableComparable<MyKey> {
    private int year;
    private int month;
    private int day;
    private int temperature;

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getDay() {
        return day;
    }

    public void setDay(int day) {
        this.day = day;
    }

    public int getTemperature() {
        return temperature;
    }

    public void setTemperature(int temperature) {
        this.temperature = temperature;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(month);
        out.writeInt(day);
        out.writeInt(temperature);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year=in.readInt();
        this.month=in.readInt();
        this.day=in.readInt();
        this.temperature=in.readInt();
    }

    @Override
    public int compareTo(MyKey that) {
        // 约定俗成:日期正序
        int c1=Integer.compare(this.getYear(), that.getYear());
        if(c1==0) {
            int c2 = Integer.compare(this.getMonth(), that.getMonth());
            if(c2==0) {
                // 比完日期,就没事了
                return Integer.compare(this.getDay(), that.getDay());
            }
            return c2;
        }
        return c1;
    }
}

MyMapper.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class MyMapper extends Mapper <LongWritable, Text, MyKey, IntWritable>{
    // 放外面,不用每次都创建!
    MyKey mkey = new MyKey();
    IntWritable mval = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

        try {
            // value:  1949-10-01 14:21:02   34c  >>  Mykey
            String[] strs = StringUtils.split(value.toString(), \' \');

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            Date date = sdf.parse(strs[0]);

            Calendar cal = Calendar.getInstance();
            cal.setTime(date);

            mkey.setYear(cal.get(Calendar.YEAR));
            mkey.setMonth(cal.get(Calendar.MONTH)+1);
            mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));

            int temperature = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
            mkey.setTemperature(temperature);
            mval.set(temperature);
            context.write(mkey, mval);
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }
}

MyPartitioner.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends  Partitioner<MyKey, IntWritable>{

    @Override
    public int getPartition(MyKey mykey, IntWritable intWritable, int i) {
        return mykey.getYear() % i;
    }
}

MySortComparator.java

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MySortComparator extends WritableComparator {
    public MySortComparator() {
        super(MyKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        MyKey t1 = (MyKey)a;
        MyKey t2 = (MyKey)b;

        int c1=Integer.compare(t1.getYear(), t2.getYear());
        if(c1==0){
            int c2=Integer.compare(t1.getMonth(), t2.getMonth());
            if(c2==0){
                // 从大到小,倒序
                return -Integer.compare(t1.getTemperature(), t2.getTemperature());
            }
            return c2;
        }
        return c1;
    }
}

MyGroupingComparator.java

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroupingComparator extends WritableComparator {
    public MyGroupingComparator() {
        super(MyKey.class,true);
    }

    public int compare(WritableComparable a, WritableComparable b) {
        MyKey t1 = (MyKey)a;
        MyKey t2 = (MyKey)b;

        int c1=Integer.compare(t1.getYear(), t2.getYear());
        if(c1==0){
            return Integer.compare(t1.getMonth(), t2.getMonth());
        }
        return c1;
    }
}

MyReducer.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class MyReducer extends Reducer<MyKey, IntWritable, Text, IntWritable> {
    Text rkey = new Text();
    IntWritable rval = new IntWritable();

    @Override
    protected void reduce(MyKey key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int flg = 0;
        int day = 0;

        // 1970 01 20  34     34
        // 1970 01 12  28     28
        for (IntWritable v : values) {  // 根本就不用v,key跟着变动的
            if (flg == 0) {
                // 1970-01-20:34
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                rval.set(key.getTemperature());
                context.write(rkey,rval );

                day = key.getDay();
                flg++;
            }
            // 将同一天,多条记录排除
            if(flg!=0 && day != key.getDay()){
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                rval.set(key.getTemperature());
                context.write(rkey,rval);

                break;
            }
        }
    }
}

注意,要把core-site.xml、hdfs-stie.xml、mapred-sit.xml、yarn-site.xml这4个文件放到IDEA工程的src/main/resources目录下,resources目录设置为Resources Root。

pom里面增加配置

<resources>
      <resource>
        <directory>src/main/resources</directory>
        <includes>
          <include>**/*.properties</include>
          <include>**/*.xml</include>
        </includes>
        <filtering>false</filtering>
      </resource>
      <resource>
        <directory>src/main/java</directory>
        <includes>
          <include>**/*.properties</include>
          <include>**/*.xml</include>
        </includes>
        <filtering>false</filtering>
      </resource>
</resources>

以上配置完毕后,执行客户端主程序,用浏览器打开server01:8088查看Applications里的执行结果。执行成功之后,到server01上用命令行查看结果文件。

[hadoop@server01 ~]$ hdfs dfs -cat /tq/output/part-r-00000
1949-10-1    38
1949-10-2    36
1950-1-1     32
1950-10-2    41
1950-10-1    37
1951-7-3     47
1951-7-2     46
1951-12-1    23

结果无误。回过头来再慢慢看看每一个步骤的代码逻辑是怎么写的,多练习就熟悉了。

版权声明:本文为burningblade原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/burningblade/archive/2004/01/13/13416032.html