Hadoop
Hadoop四高
- 高可靠:Mapreduce负责维护多个数据的副本,Hadoop计算数据出现故障,也不会造成数据丢失
- 高扩展性:可以扩展数以千计的节点
- 高效性:Hadoop是并行操作的
- 高容错性:自动将失败的任务自动分配
HDFS三部分
id1(HDFS架构)
HDFS–>NameNode;
HDFS–>DataNode;
HDFS–>SecdonaryNode
- NameNode(NN):存储文件的元数据(文件名、文件目录结构、文件属性),每个文件的快列表和块所在的DataNode
- DataNode(DN):在本地文件系统中块数据,以及块的数据和校验和
默认的块大小为:128M,例如一个200M的文件,存储在HDFS中后将会是 128M + 72M大小的文件
- SecdonaryNameNode(2NN):Secondary NameNode所做的是在文件系统这设置一个Checkpoint来帮助NameNode更好的工作;它不是取代NameNode,也不是NameNode的备份。 Secondary NameNode的检查点进程启动,是由两个配置参数控制的: fs.checkpoint.period,指定连续两次检查点的最大时间间隔, 默认值是1小时。对元数据进行配分
YARN架构
id1(YARN架构)
Client,Client2–>ResourceManager;
ResourceManager–>NodeManager1;
NodeManager1–>ApplicationMaster1;
ApplicationMaster1–>Container1;
ResourceManager–>NodeManager2;
NodeManager2–>ApplicationMaster2;
ApplicationMaster2–>Container2;
ResourceManager–>NodeManager3;
NodeManager3–>ApplicationMaster3;
ApplicationMaster3–>Container3;
ResourceManager(RM)主要作用
- 处理客户端请求
- 监控NodeManager
- 启动或者监控ApplicationMaster
- 资源的分配和调度
NodeManager(NM)主要作用
- 管理单个节点的资源
- 处理来自ResourceManager的命令
- 处理来自ApplicationMaster的命令
ApplicationMaster(AM)主要作用
- 负责数据的切分
- 为应用程序申请资源并分配给内部的任务
- 任务的监控和容错
Container:YARN中的资源抽象,封装了某个节点的多维度的资源,如内存,CPU,磁盘网络等
MapReduce架构概述
- Map:阶段并行处理输入数据
- Reduce:阶段对Map的结果进行汇总
map–>数据1;
map–>数据2;
map–>数据3;
map–>数据n;
数据1–>reduce;
数据2–>reduce;
数据3–>reduce;
数据n–>reduce;
Hadoop–最小安装
yum install -y epel-release
yum install -y psmisc nc net-tools rsync vim lrzsz ntp libzstd openssl-static tree iotop
systemctl disable firewalld
systemctl is-enabled firewalld
useradd atguigu
passwd atguigu
vim /etc/sudoers NOPASSWD:ALL
chwon atguigu:atguigu module/ software/
卸载所有rpm包(查询到的)
rpm -qa | grep -i java | xargs n1 rpm -e
远程同步工具rsync
主要用于备份和镜像,速度快
rsync -av 和scp一样
# !/bin/bash
if [ $# -lt 1 ]
then
echo NOT ENOUGH ARGUMENT!
exit;
fi
for host in hadoop200 hadoop201 hadoop202
do
echo ==============$host===============
for file in $@
do
if [ -e $file ]
then
positionDir=$(cd -P $(dirname $file); pwd);
fname=$(basename $file)
ssh $host "mkdir -p $positionDir"
rsync -av $positionDir/$fname $host:$positionDir
else
echo $file does not exists!
fi
done
done
要获取的默认文件 | 文件存放在Hadoop的jar包中的位置 |
---|---|
[core-default.xml] |
hadoop-common-2.7.2.jar/ core-default.xml |
[hdfs-default.xml] |
hadoop-hdfs-2.7.2.jar/ hdfs-default.xml |
[yarn-default.xml] |
hadoop-yarn-common-2.7.2.jar/ yarn-default.xml |
[mapred-default.xml] |
hadoop-mapreduce-client-core-2.7.2.jar/ mapred-default.xml |
Hadoop执行步骤
Hadoop200–>NameNode
Hadoop200–>DataNode200
Hadoop201–>ResourceManager
Hadoop201–>DataNode201
Hadoop202–>SecdonaryNameNode
Hadoop202–>DataNode202
ResourceManager–>NodeManager
NodeManager–>DataNode200_
NodeManager–>DataNode201_
NodeManager–>DataNode202_
core 配置
- 入口地址
- 数据地址,可以自己创建可以用 haddop的dfs.tmp.dir
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop200:9820</value>
<!--value>file:///</value-->
</property>
<property>
<name>hadoop.data.dir</name>
<value>/opt/module/hadoop-3.1.3/data</value>
</property>
</configuration>
hdfs 配置
- namenode的存储地址
- datanode的存储地址
- namenode和secondary的访问地址
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.data.dir}/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.data.dir}/data</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop200:9870</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop202:9868</value>
</property>
</configuration>
yarn 配置
- nodemanager的辅助服务
- resourceManager的地址
- 继承环境变量 env-whitelist
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- resourceManager的主机地址 -->
<property>
<description>The hostname of the RM.</description>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop201</value>
</property>
<!-- 继承环境变量 -->
<property>
<description>Environment variables that containers may override rather than use NodeManager's default.</description>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_HOME,PATH,LANG,TZ</value>
</property>
<!-- 虚拟内存的限制 -->
<!--
<property>
<description>Whether virtual memory limits will be enforced for
containers.</description>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>true</value>
</property>
-->
</configuration>
mapred 配置
- mapreduce的服务框架名为yarn
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
如果resourceManager不启动会怎么样?
问题:重复格式化namenode
格式化之后会生成一个新的namenode,但是需要注意的是新的namenode和之前的namenode不一致
导致重复格式化之后生成新的namenode的集群ID和原来datanode所绑定的集群ID不一致,导致新启动的namenode,无法重新启动datanode
解决方案是:删除原有的data数据
SSH加密算法
Machine1 — 1. publicKey —> Machine2;
Machine1 — 2. data —> Machine2;
id1((Machine2))– 3. publicKey + Calculate_data —> Password;
Machine2–>id1
Password–>Machine2;
Machine1 –> id2
Machine2 <– 4. Password— Machine1;
id2((Machine1))–5. privateKey + Calculate_password –> data
Machine1 — 6. MD5 data+sessionID —>Machine2;
Machine2 — 7.MD5 data+sessionID—> id3[7. data+sessionID=6.]
免密登录步骤
ssh-keygen -t rsa[4次回车]
ssh-copy-id 主机名
总结
集群单点启动
hdfs --daemon start namenode
hdfs --daemon start datanode
yarn --daemon start nodemanager
yarn --daemon start resourcemanager
集群停止
hdfs --daemon stop namenode
hdfs --daemon stop datanode
yarn --daemon stop nodemanager
yarn --daemon stop resourcemanager
集群群起
3.x 需要在works指定datanode 和 nodemanager 的机器
2.x slaves
Hadoop存储文件的位置在/data/data/current/BP-961404918-192.168.0.200-1659065001458/current/finalized/subdir0/subdir0
在hdfs中的命令
hdfs fs -mkdir
创建目录hdfs fs -put
上传文件hdfs fs -rm -r
删除文件或者目录
YARN中的历史服务器
启动yarn的历史服务器
mapred --daemon start historyserver
需要在mapred-site.xml中添加
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop201:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop201:19888</value>
</property>
</configuration>
yarn的日志聚集功能的开启
<!-- yarn的日志服务器 -->
<!-- yarn的日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 日志服务器地址 -->
<property>
<name>yarn.log-sever.url</name>
<value>http://hadoop201:19888/jobhistory/logs</value>
</property>
<!-- 日志存储时间 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604880</value>
</property>
集群时间同步
时间同步的方式:
- 找一台机器,作为时间服务器
- 所有机器和这台机器时间进行定时同步,比如每隔一分钟同步一次时间
① 关闭所有的机器的ntp服务
systemctl stop ntpd
systemctl disable ntpd
systemctl is-enable ntpd
② 将Host on.. 下的内容取消注释,并注释server 0….的内容,加入以下内容
# Hosts on local network are less restricted.
restrict 192.168.1.0 mask 255.255.255.0 nomodify notrap
# Use public servers from the pool.ntp.org project.
# Please consider joining the pool (http://www.pool.ntp.org/join.html).
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
server 127.127.1.0
fudge 127.127.1.0 stratum 10
③ 在/etc/sysconfig/ntpd
文件中添加如下(硬件时间和系统时间同步)
SYNC_HWCLOCK=yes
④ 重启ntp服务
systemctl start ntpd
systemctl enable ntpd
systemctl is-enable ntpd
⑤ 使用 crontab 命令重新 对其他两台机器实施时间同步
crontab -e
*/1 * * * * /usr/sbin/ntpdate hadoop200
Hadoop常用端口号
- NameNode内部通信端口:8020
- NameNode Web端端口:9870
- SecondaryNameNode Web端口:9868
- (Yarn)ResourceManager Web端口:8088
- 历史服务器Web端端口:19888
Hadoop3.X和2.X主要区别
- 从JDK7升级到了JDK8
- 增加了纠删码
- 支持超过了两个NameNode
- 默认端口的改变
HDFS 分布式文件系统
HDFS(Hadoop Distributed File System),适合一次写入多次读出的场景,且不支持文件的修改。但可以在文件后追加
优点
- 数据自动保存多个副本、它通过增加副本的方式,提高容错性。
- 如果一个副本丢失以后,可以通过其他副本自动恢复。
- 适合处理大数据,数据规模可以处理数据规模达到GB、TB、PB;文件规模可以处理百万规模以上的文件数量。
- 可以构建在廉价的机器上,通过多个副本机制,提高可靠性。
缺点
- 不适合低延时的数据,比如毫秒级的数据
- 无法高效的对大量小文件进行存储。
- 存储小文件会占用NameNode大量的内存来存储目录和块信息。
- 小文件存储的寻址时间会超过读取时间,违法了HDFS设计的初衷。
- 不支持并发写入、文件的随机修改
- 一个文件只能一个写,不支持多线程写。
- 仅支持数据追加(Append),不支持文件的随机修改。
HDFS的组成架构
- NameNode:Master;管理HDFS的名称空间;配置副本策略;管理快的映射信息
- DataNode:Slave;NameNode下达命令,DataNode实际操作
- SecondaryNameNode:并非NameNode的热备份;当NameNode挂掉的时候,它并不能马上替换NaemNode并提供服务;辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode;只有在紧急情况下,可以辅助恢复NameNode
- Client客户端:文件的切分;与NameNode交互,获取文件的位置信息;与DataNode交互;提供命令来管理HDF
HDFS的操作
将本地的文件剪切到HDFS文件系统(本地文件消息)
hadoop fs -moveFromLocal TestData /
将本地的文件内容追加到HDFS文件系统中的文件中
hadoop fs -appendToFile TempFileOfData /TestData
将本地的文件拷贝到HDFS文件系统中
hadoop fs -copyToLocal /TestData
将HFDS的文件移动到HDFS另一个文件
hadoop fs -mv /sxxx /xxx
将HFDS的文件复制到HDFS另一个文件
hadoop fs -cp /sxxx /xxx
合并下载多个文件
hadoop fs -getmerge /xml ./all.xml
Haddoop环境准备Windows
配置Maven
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
Java连接HADOOP
通过FileSystem.get方法连接最后关闭资源
Configuration configuration = new Configuration();
URI uri = URI.create("hdfs://192.168.0.200:9820");
String user = "root";
FileSystem fs = FileSystem.get(uri, configuration, user);
fs.mkdirs(new Path("/javaPath"));
fs.close();
使用copyFromLocalFile方法上传文件
- delSrc是否删除原有的src文件
- overwrite是否重写当前文件
- 本地路径
- Hadoop文件系统路径
Path pathHadoop = new Path("/");
Path pathLocal = new Path("C:\\Users\\rabbi\\Downloads\\canvas.png");
try {
fs.copyFromLocalFile(
false, true,
pathLocal, pathHadoop
);
} catch (IOException e) {
e.printStackTrace();
}
可以在idea中编写一个hdfs-site更新replication的数量,也可以更新configuration中的配置对象
configuration.set("dfs.replication", "1");
迭代文件内容
FileStatus[] fileStatuses = fs.listStatus(new Path("/")); // 拥有是否使文件的属性
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(new Path("/"), true);
- true 深度遍历
- false 不遍历
文件下载
fs.copyToLocalFile(
false, // 是否删除元文件
new Path("/canvas.png"), //HDFS文件
new Path("./"), // 本地存储路径
true); // 元数据
文件上传
fs.copyFromLocalFile(
false, true,
pathLocal, pathHadoop);
文件移动
fs.rename(
new Path("/canvas.png"),
new Path("/BeautifulGirl.png"));
通过流去下载文件
String input = "/BeautifulGirl.png";
String output = "./Be.png";
FSDataInputStream fis = fs.open(new Path(input));
FileOutputStream fos = new FileOutputStream(new File(output));
IOUtils.copyBytes(fis, fos, configuration);
HadoopInput– 输入 —>LocalOutput
通过流去上传文件
String localPath = "./canvas.png";
String hadoopPath = "/canvas.png";
FSDataOutputStream fsDataOutputStream = fs.create(new Path(hadoopPath));
FileInputStream fileInputStream = new FileInputStream(new File(localPath));
OUtils.copyBytes(fileInputStream, fsDataOutputStream, configuration);
IOUtils.closeStream(fileInputStream);
IOUtils.closeStream(fsDataOutputStream);
LocalInput– 输入 —> HadoopOutput
HADOOP 查看fsimage和edits的日志内容
使用hdfs的oiv,oev命令
hdfs oiv -p 文件类型 -i 镜像文件 -o 转换后的文件输出路径
export PYSPARK_DRIVER_PYTHON="jupyter" export PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" pyspark --master spark://localhost:7077
MapReduce
MapReduce分为Map和Reduce,Map阶段只是负责数据的处理,并不负责计算,计算交给Reduce来做。
WordCount程序
Java类型 | Hadoop类型 |
---|---|
Boolean | BooleanWriteable |
Byte | ByteWriteable |
Int | IntWriteable |
Long | LongWriteable |
…. | |
String | Text |
编程规范、需要写Mapper、Reduce和Driver
Mapper文件
package com.wordCount;
import jdk.incubator.vector.VectorOperators;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
// 编写一个MR程序,通常需要分三步
// 1. 编写Mapper
// 2. 编写Reducer
// 3. 编写Driver
// 一共有四个泛型
// KEYIN LongWriteable 表示偏移量,从文件的那个位置读取数据
// KEYOUT Text 从文件中读取到的数据
// VALUEIN Text
// VALUEOUT IntWriteable
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 插件类型开发流程
// 1. 继承或者实现类接口
// 2. 实现或者重写方法
// 提交执行
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
//分割当前数据
String[] words = line.split(" ");
for(String word: words){
outKey.set(word);
context.write(outKey, outValue);
}
}
}
Reducer.java
package com.wordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outKey = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outKey.set(sum);
context.write(key, outKey);
}
}
Mapper类解析
setup()
在MapTask开始前调用一次。map()
每次执行前都需要执行一次map方法cleanup
在MapTask结束前调用一次run()
方法
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}