Hadoop四高

  • 高可靠:Mapreduce负责维护多个数据的副本,Hadoop计算数据出现故障,也不会造成数据丢失
  • 高扩展性:可以扩展数以千计的节点
  • 高效性:Hadoop是并行操作的
  • 高容错性:自动将失败的任务自动分配

HDFS三部分

graph TD;
id1(HDFS架构)
HDFS–>NameNode;
HDFS–>DataNode;
HDFS–>SecdonaryNode
  • NameNode(NN):存储文件的元数据(文件名、文件目录结构、文件属性),每个文件的快列表和块所在的DataNode
  • DataNode(DN):在本地文件系统中块数据,以及块的数据和校验和

​ 默认的块大小为:128M,例如一个200M的文件,存储在HDFS中后将会是 128M + 72M大小的文件

  • SecdonaryNameNode(2NN):Secondary NameNode所做的是在文件系统这设置一个Checkpoint来帮助NameNode更好的工作;它不是取代NameNode,也不是NameNode的备份。 Secondary NameNode的检查点进程启动,是由两个配置参数控制的: fs.checkpoint.period,指定连续两次检查点的最大时间间隔, 默认值是1小时。对元数据进行配分

YARN架构

graph TD;
id1(YARN架构)
Client,Client2–>ResourceManager;

ResourceManager–>NodeManager1;
NodeManager1–>ApplicationMaster1;
ApplicationMaster1–>Container1;

ResourceManager–>NodeManager2;
NodeManager2–>ApplicationMaster2;
ApplicationMaster2–>Container2;

ResourceManager–>NodeManager3;
NodeManager3–>ApplicationMaster3;
ApplicationMaster3–>Container3;

ResourceManager(RM)主要作用

  • 处理客户端请求
  • 监控NodeManager
  • 启动或者监控ApplicationMaster
  • 资源的分配和调度

NodeManager(NM)主要作用

  • 管理单个节点的资源
  • 处理来自ResourceManager的命令
  • 处理来自ApplicationMaster的命令

ApplicationMaster(AM)主要作用

  • 负责数据的切分
  • 为应用程序申请资源并分配给内部的任务
  • 任务的监控和容错

Container:YARN中的资源抽象,封装了某个节点的多维度的资源,如内存,CPU,磁盘网络等

MapReduce架构概述

  • Map:阶段并行处理输入数据
  • Reduce:阶段对Map的结果进行汇总
graph TD;
map–>数据1;
map–>数据2;
map–>数据3;
map–>数据n;
数据1–>reduce;
数据2–>reduce;
数据3–>reduce;
数据n–>reduce;

Hadoop–最小安装

yum install -y epel-release
yum install -y psmisc nc net-tools rsync vim lrzsz ntp libzstd openssl-static tree iotop

systemctl disable firewalld
systemctl is-enabled firewalld

useradd atguigu
passwd atguigu

vim /etc/sudoers    NOPASSWD:ALL
chwon atguigu:atguigu  module/ software/


卸载所有rpm包(查询到的)
rpm -qa | grep -i java | xargs n1 rpm -e

远程同步工具rsync

主要用于备份和镜像,速度快

rsync -av 和scp一样
# !/bin/bash
if [ $# -lt 1 ]
then
  echo NOT ENOUGH ARGUMENT!
  exit;
fi
for host in hadoop200 hadoop201 hadoop202
do
  echo ==============$host===============
  for file in $@
  do
    if [ -e $file ]
    then
      positionDir=$(cd -P $(dirname $file); pwd);
      fname=$(basename $file)
      ssh $host "mkdir -p $positionDir"
      rsync -av $positionDir/$fname $host:$positionDir
    else
      echo $file does not exists!
    fi
  done
done

要获取的默认文件 文件存放在Hadoop的jar包中的位置
[core-default.xml] hadoop-common-2.7.2.jar/ core-default.xml
[hdfs-default.xml] hadoop-hdfs-2.7.2.jar/ hdfs-default.xml
[yarn-default.xml] hadoop-yarn-common-2.7.2.jar/ yarn-default.xml
[mapred-default.xml] hadoop-mapreduce-client-core-2.7.2.jar/ mapred-default.xml

Hadoop执行步骤

graph LR;
Hadoop200–>NameNode
Hadoop200–>DataNode200

Hadoop201–>ResourceManager
Hadoop201–>DataNode201

Hadoop202–>SecdonaryNameNode
Hadoop202–>DataNode202

ResourceManager–>NodeManager
NodeManager–>DataNode200_
NodeManager–>DataNode201_
NodeManager–>DataNode202_

core 配置

  • 入口地址
  • 数据地址,可以自己创建可以用 haddop的dfs.tmp.dir
<configuration>
	<property>
 		<name>fs.defaultFS</name>
		<value>hdfs://hadoop200:9820</value>
  		<!--value>file:///</value-->
	</property>	

	<property>
 		<name>hadoop.data.dir</name>
		<value>/opt/module/hadoop-3.1.3/data</value>
	</property>	
</configuration>

hdfs 配置

  • namenode的存储地址
  • datanode的存储地址
  • namenode和secondary的访问地址
<configuration>
<property>
  <name>dfs.namenode.name.dir</name>
  <value>file://${hadoop.data.dir}/name</value>
</property>

<property>
  <name>dfs.datanode.data.dir</name>
  <value>file://${hadoop.data.dir}/data</value>
</property>

<property>
  <name>dfs.namenode.http-address</name>
  <value>hadoop200:9870</value>
</property>

<property>
  <name>dfs.namenode.secondary.http-address</name>
  <value>hadoop202:9868</value>
</property>

</configuration>

yarn 配置

  • nodemanager的辅助服务
  • resourceManager的地址
  • 继承环境变量 env-whitelist
<configuration>

<!-- Site specific YARN configuration properties -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
<!-- resourceManager的主机地址 -->
  <property>
    <description>The hostname of the RM.</description>
    <name>yarn.resourcemanager.hostname</name>
    <value>hadoop201</value>
  </property>
<!-- 继承环境变量 -->
  <property>
    <description>Environment variables that containers may override rather than use NodeManager's default.</description>
    <name>yarn.nodemanager.env-whitelist</name>
    <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_HOME,PATH,LANG,TZ</value>
  </property>
<!-- 虚拟内存的限制 -->
<!--
  <property>
    <description>Whether virtual memory limits will be enforced for
    containers.</description>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>true</value>
  </property>

-->
</configuration>

mapred 配置

  • mapreduce的服务框架名为yarn
  <configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
  </configuration>

如果resourceManager不启动会怎么样?

问题:重复格式化namenode

格式化之后会生成一个新的namenode,但是需要注意的是新的namenode和之前的namenode不一致
导致重复格式化之后生成新的namenode的集群ID和原来datanode所绑定的集群ID不一致,导致新启动的namenode,无法重新启动datanode

解决方案是:删除原有的data数据

SSH加密算法

graph LR;
Machine1 — 1. publicKey —> Machine2;
Machine1 — 2. data —> Machine2;
id1((Machine2))– 3. publicKey + Calculate_data —> Password;
Machine2–>id1
Password–>Machine2;
Machine1 –> id2
Machine2 <– 4. Password— Machine1;
id2((Machine1))–5. privateKey + Calculate_password –> data

Machine1 — 6. MD5 data+sessionID —>Machine2;
Machine2 — 7.MD5 data+sessionID—> id3[7. data+sessionID=6.]

免密登录步骤

ssh-keygen -t rsa[4次回车]
ssh-copy-id 主机名

总结

集群单点启动

hdfs --daemon start namenode
hdfs --daemon start datanode
yarn --daemon start nodemanager
yarn --daemon start resourcemanager

集群停止

hdfs --daemon stop namenode
hdfs --daemon stop datanode
yarn --daemon stop nodemanager
yarn --daemon stop resourcemanager 

集群群起

3.x 需要在works指定datanode 和 nodemanager 的机器
2.x      slaves

Hadoop存储文件的位置在/data/data/current/BP-961404918-192.168.0.200-1659065001458/current/finalized/subdir0/subdir0

在hdfs中的命令

  • hdfs fs -mkdir 创建目录
  • hdfs fs -put 上传文件
  • hdfs fs -rm -r 删除文件或者目录

YARN中的历史服务器

启动yarn的历史服务器

mapred --daemon start historyserver

需要在mapred-site.xml中添加

<property>
  <name>mapreduce.jobhistory.address</name>
  <value>hadoop201:10020</value>
</property>

<property>
  <name>mapreduce.jobhistory.webapp.address</name>
  <value>hadoop201:19888</value>
</property>
</configuration>

yarn的日志聚集功能的开启

<!-- yarn的日志服务器  -->
<!-- yarn的日志聚集功能  -->
<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<!-- 日志服务器地址  -->
<property>
  <name>yarn.log-sever.url</name>
  <value>http://hadoop201:19888/jobhistory/logs</value>
</property>
<!-- 日志存储时间  -->
<property>
  <name>yarn.log-aggregation.retain-seconds</name>
  <value>604880</value>
</property>

集群时间同步

时间同步的方式:

  • 找一台机器,作为时间服务器
  • 所有机器和这台机器时间进行定时同步,比如每隔一分钟同步一次时间

① 关闭所有的机器的ntp服务

systemctl stop ntpd
systemctl disable ntpd
systemctl is-enable ntpd

② 将Host on.. 下的内容取消注释,并注释server 0….的内容,加入以下内容

# Hosts on local network are less restricted.
restrict 192.168.1.0 mask 255.255.255.0 nomodify notrap

# Use public servers from the pool.ntp.org project.
# Please consider joining the pool (http://www.pool.ntp.org/join.html).
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
server 127.127.1.0
fudge 127.127.1.0 stratum 10

③ 在/etc/sysconfig/ntpd文件中添加如下(硬件时间和系统时间同步)

SYNC_HWCLOCK=yes

④ 重启ntp服务

systemctl start ntpd
systemctl enable ntpd
systemctl is-enable ntpd

⑤ 使用 crontab 命令重新 对其他两台机器实施时间同步

crontab -e
*/1 * * * * /usr/sbin/ntpdate hadoop200

Hadoop常用端口号

  • NameNode内部通信端口:8020
  • NameNode Web端端口:9870
  • SecondaryNameNode Web端口:9868
  • (Yarn)ResourceManager Web端口:8088
  • 历史服务器Web端端口:19888

Hadoop3.X和2.X主要区别

  • 从JDK7升级到了JDK8
  • 增加了纠删码
  • 支持超过了两个NameNode
  • 默认端口的改变

HDFS 分布式文件系统

HDFS(Hadoop Distributed File System),适合一次写入多次读出的场景,且不支持文件的修改。但可以在文件后追加

优点

  • 数据自动保存多个副本、它通过增加副本的方式,提高容错性。
  • 如果一个副本丢失以后,可以通过其他副本自动恢复。
  • 适合处理大数据,数据规模可以处理数据规模达到GB、TB、PB;文件规模可以处理百万规模以上的文件数量。
  • 可以构建在廉价的机器上,通过多个副本机制,提高可靠性。

缺点

  1. 不适合低延时的数据,比如毫秒级的数据
  2. 无法高效的对大量文件进行存储。
    1. 存储小文件会占用NameNode大量的内存来存储目录和块信息。
    2. 小文件存储的寻址时间会超过读取时间,违法了HDFS设计的初衷。
  3. 不支持并发写入、文件的随机修改
    1. 一个文件只能一个写,不支持多线程写。
    2. 仅支持数据追加(Append),不支持文件的随机修改。

HDFS的组成架构

  • NameNode:Master;管理HDFS的名称空间;配置副本策略;管理快的映射信息
  • DataNode:Slave;NameNode下达命令,DataNode实际操作
  • SecondaryNameNode:并非NameNode的热备份;当NameNode挂掉的时候,它并不能马上替换NaemNode并提供服务;辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode;只有在紧急情况下,可以辅助恢复NameNode
  • Client客户端:文件的切分;与NameNode交互,获取文件的位置信息;与DataNode交互;提供命令来管理HDF

HDFS的操作

将本地的文件剪切到HDFS文件系统(本地文件消息)

hadoop fs -moveFromLocal TestData /

将本地的文件内容追加到HDFS文件系统中的文件中

hadoop fs -appendToFile TempFileOfData /TestData

将本地的文件拷贝到HDFS文件系统中

hadoop fs -copyToLocal /TestData

将HFDS的文件移动到HDFS另一个文件

hadoop fs -mv /sxxx /xxx

将HFDS的文件复制到HDFS另一个文件

hadoop fs -cp /sxxx /xxx

合并下载多个文件

 hadoop fs -getmerge /xml ./all.xml

Haddoop环境准备Windows

配置Maven

<dependencies>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
    <scope>test</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.12.0</version>
  </dependency>

  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
  </dependency>
</dependencies>

Java连接HADOOP

通过FileSystem.get方法连接最后关闭资源

Configuration configuration = new Configuration();
URI uri = URI.create("hdfs://192.168.0.200:9820");
String user = "root";
FileSystem fs = FileSystem.get(uri, configuration, user);
	
fs.mkdirs(new Path("/javaPath"));
fs.close();

使用copyFromLocalFile方法上传文件

  • delSrc是否删除原有的src文件
  • overwrite是否重写当前文件
  • 本地路径
  • Hadoop文件系统路径
Path pathHadoop = new Path("/");
Path pathLocal = new Path("C:\\Users\\rabbi\\Downloads\\canvas.png");
try {
  fs.copyFromLocalFile(
    false, 			true,
    pathLocal, 	pathHadoop
  );
} catch (IOException e) {
  e.printStackTrace();
}

可以在idea中编写一个hdfs-site更新replication的数量,也可以更新configuration中的配置对象

configuration.set("dfs.replication", "1");

迭代文件内容

FileStatus[] fileStatuses = fs.listStatus(new Path("/")); // 拥有是否使文件的属性

RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(new Path("/"), true);
- true     深度遍历
- false    不遍历

文件下载

fs.copyToLocalFile(
  false, // 是否删除元文件
  new Path("/canvas.png"),  //HDFS文件
  new Path("./"),   // 本地存储路径
  true);  // 元数据	

文件上传

fs.copyFromLocalFile(
  false, true,
  pathLocal, pathHadoop);

文件移动

fs.rename(
  new Path("/canvas.png"),
  new Path("/BeautifulGirl.png"));

通过流去下载文件

String input = "/BeautifulGirl.png";
String output = "./Be.png";

FSDataInputStream fis = fs.open(new Path(input));
FileOutputStream fos = new FileOutputStream(new File(output));
IOUtils.copyBytes(fis, fos, configuration);
graph LR;
HadoopInput– 输入 —>LocalOutput

通过流去上传文件

String localPath = "./canvas.png";
String hadoopPath = "/canvas.png";

FSDataOutputStream fsDataOutputStream = fs.create(new Path(hadoopPath));
FileInputStream fileInputStream = new FileInputStream(new File(localPath));

OUtils.copyBytes(fileInputStream, fsDataOutputStream, configuration);
IOUtils.closeStream(fileInputStream);
IOUtils.closeStream(fsDataOutputStream);
graph LR;
LocalInput– 输入 —> HadoopOutput

HADOOP 查看fsimage和edits的日志内容

使用hdfs的oiv,oev命令

hdfs oiv -p 文件类型 -i 镜像文件 -o 转换后的文件输出路径
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root"
pyspark --master spark://localhost:7077

MapReduce

MapReduce分为Map和Reduce,Map阶段只是负责数据的处理,并不负责计算,计算交给Reduce来做。

WordCount程序

Java类型 Hadoop类型
Boolean BooleanWriteable
Byte ByteWriteable
Int IntWriteable
Long LongWriteable
….
String Text

编程规范、需要写Mapper、Reduce和Driver

Mapper文件

package com.wordCount;

import jdk.incubator.vector.VectorOperators;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

// 编写一个MR程序,通常需要分三步
// 1. 编写Mapper
// 2. 编写Reducer
// 3. 编写Driver

// 一共有四个泛型
// KEYIN    LongWriteable 表示偏移量,从文件的那个位置读取数据
// KEYOUT   Text 从文件中读取到的数据

// VALUEIN  Text
// VALUEOUT IntWriteable
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    // 插件类型开发流程
    // 1. 继承或者实现类接口
    // 2. 实现或者重写方法
    // 提交执行
    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        //分割当前数据
        String[] words = line.split(" ");

        for(String word: words){
            outKey.set(word);
            context.write(outKey, outValue);
        }

    }
}

Reducer.java

package com.wordCount;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable outKey = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outKey.set(sum);
        context.write(key, outKey);
    }
}

Mapper类解析

  • setup()在MapTask开始前调用一次。
  • map() 每次执行前都需要执行一次map方法
  • cleanup 在MapTask结束前调用一次
  • run()方法
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    this.setup(context);

    try {
      while(context.nextKeyValue()) {
        this.map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      this.cleanup(context);
    }

  }
版权声明:本文为anas-kai原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/anas-kai/p/17011206.html