大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客共同粉丝案例+常见错误及解决方案
第6章 Hadoop企业优化(重中之重)6.1 MapReduce 跑的慢的原因6.2 MapReduce优化方法6.2.1 数据输入6.2.2 Map阶段6.2.3 Reduce阶段6.2.4 I/O传输6.2.5 数据倾斜问题6.2.6 常用的调优参数6.3 HDFS小文件优化方法6.3.1 HDFS小文件弊端6.3.2 HDFS小文件解决方案第7章 MapReduce扩展案例7.1 倒排索引案例(多job串联)7.2 TopN案例7.3 找博客共同粉丝案例第8章 常见错误及解决方案
第6章 Hadoop企业优化(重中之重)
6.1 MapReduce 跑的慢的原因
6.2 MapReduce优化方法
MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。
6.2.1 数据输入
6.2.2 Map阶段
6.2.3 Reduce阶段
1、
2、
6.2.4 I/O传输
6.2.5 数据倾斜问题
1、
2、
6.2.6 常用的调优参数
1、资源相关参数
(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)
(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)
(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)
2、容错相关参数(MapReduce性能优化)
6.3 HDFS小文件优化方法
6.3.1 HDFS小文件弊端
HDFS上每个文件都要在NameNode上建立一个索引
,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢
。
6.3.2 HDFS小文件解决方案
小文件的优化无非以下几种方式:
(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。
1、
2、
第7章 MapReduce扩展案例
7.1 倒排索引案例(多job串联)
1、需求
有大量的文本(文档、网页),需要建立搜索索引,如下图所示。
(1)数据输入
(2)期望输出数据
atguigu c.txt-->2 b.txt-->2 a.txt-->3
pingping c.txt-->1 b.txt-->3 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2
2、需求分析
3、第一次处理
(1)第一次处理,编写OneIndexMapper类
package com.atguigu.mr.index;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String name;
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// 获取文件名称
FileSplit split = (FileSplit) context.getInputSplit();
name = split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// atguigu pingping
// 1、获取一行数据
String line = value.toString();
// 2、切割
String[] fields = line.split(" ");
for (String word : fields) {
// 3、拼接
k.set(word + "---" + name); // atguigu---a.txt
v.set(1);
// 4、写出
context.write(k, v); // <atguigu---a.txt,1>
}
}
}
(2)第一次处理,编写OneIndexReducer类
package com.atguigu.mr.index;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 1、累加求和
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
// 2、写出
context.write(key, v);
}
}
(3)第一次处理,编写OneIndexDriver类
package com.atguigu.mr.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OneIndexDriver {
public static void main(String[] args) throws Exception {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "d:/temp/atguigu/0529/input/inputoneindex", "d:/temp/atguigu/0529/output17" };
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(OneIndexDriver.class);
job.setMapperClass(OneIndexMapper.class);
job.setReducerClass(OneIndexReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
(4)查看第一次输出结果
atguigu---a.txt 3
atguigu---b.txt 2
atguigu---c.txt 2
pingping---a.txt 1
pingping---b.txt 3
pingping---c.txt 1
ss---a.txt 2
ss---b.txt 1
ss---c.txt 1
4、第二次处理
(1)第二次处理,编写TwoIndexMapper类
package com.atguigu.mr.index;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 输入为:
// atguigu--a.txt 3
// atguigu--b.txt 2
// atguigu--c.txt 2
// 输出为:(atguigu,a.txt 3)atguigu c.txt-->2 b.txt-->2 a.txt-->3
// 1、获取一行数据
String line = value.toString();
// 2、用“--”切割
String[] fields = line.split("--"); // 结果为:(atguigu,a.txt 3)
// 3、封装数据
k.set(fields[0]);
v.set(fields[1]);
// 4、写出
context.write(k, v);
}
}
(2)第二次处理,编写TwoIndexReducer类
package com.atguigu.mr.index;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TwoIndexReducer extends Reducer<Text, Text, Text, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 输入为:(atguigu,a.txt 3)(atguigu,b.txt 2)(atguigu,c.txt 2)
// 输出为:atguigu c.txt-->2 b.txt-->2 a.txt-->3
StringBuffer sb = new StringBuffer();
// 拼接
for (Text value : values) {
sb.append(value.toString().replace("\t", "-->") + "\t");
}
// 封装
v.set(sb.toString());
// 写出
context.write(key, v);
}
}
(3)第二次处理,编写TwoIndexDriver类
package com.atguigu.mr.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoIndexDriver {
public static void main(String[] args) throws Exception {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "d:/temp/atguigu/0529/input/inputtowindex", "d:/temp/atguigu/0529/output18" };
Configuration config = new Configuration();
Job job = Job.getInstance(config);
job.setJarByClass(TwoIndexDriver.class);
job.setMapperClass(TwoIndexMapper.class);
job.setReducerClass(TwoIndexReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(4)第二次查看最终结果
atguigu c.txt-->2 b.txt-->2 a.txt-->3
pingping c.txt-->1 b.txt-->3 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2
7.2 TopN案例
1、需求
对需求2.3输出结果进行加工,输出流量使用量在前10的用户信息。
(1)输入数据 (2)输出数据
2、需求分析
同上图。
3、实现代码
(1)编写FlowBean类
package com.atguigu.mr.topn;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow; // 总流量
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public void set(long downFlow2, long upFlow2) {
downFlow = downFlow2;
upFlow = upFlow2;
sumFlow = downFlow2 + upFlow2;
}
@Override
public int compareTo(FlowBean bean) {
int result;
// 按照总流量大小,倒序排列
if (this.sumFlow > bean.getSumFlow()) {
result = -1;
} else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
} else {
result = 0;
}
return result;
}
}
(2)编写TopNMapper类
package com.atguigu.mr.topn;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
// 定义一个TreeMap作为存储数据的容器(天然按key排序,降序)
private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();
private FlowBean kBean;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
kBean = new FlowBean();
Text v = new Text();
// 13470253144 180 180 360
// 1、获取一行
String line = value.toString();
// 2、切割
String[] fields = line.split("\t");
// 3、封装数据
String phoneNum = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
long sumFlow = Long.parseLong(fields[3]);
kBean.setUpFlow(upFlow);
kBean.setDownFlow(downFlow);
kBean.setSumFlow(sumFlow);
v.set(phoneNum);
// 4、向TreeMap中添加数据
flowMap.put(kBean, v);
// 5、限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据
if (flowMap.size() > 10) {
// flowMap.remove(flowMap.firstKey()); // 升序删除第一个
flowMap.remove(flowMap.lastKey()); // 降序删除最后一个
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 6、遍历TreeMap集合,输出数据
Iterator<FlowBean> bean = flowMap.keySet().iterator();
while (bean.hasNext()) {
FlowBean k = bean.next();
context.write(k, flowMap.get(k));
}
}
}
(3)编写TopNReducer类
package com.atguigu.mr.topn;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
// 定义一个TreeMap作为存储数据的容器(天然按key排序)
TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
FlowBean bean = new FlowBean();
bean.set(key.getDownFlow(), key.getUpFlow());
// 1、向treeMap集合中添加数据
flowMap.put(bean, new Text(value));
// 2、限制TreeMap数据量,超过10条就删除掉流量最小的一条数据
if (flowMap.size() > 10) {
// flowMap.remove(flowMap.firstKey()); // 升序删除第一个
flowMap.remove(flowMap.lastKey()); // 降序删除最后一个
}
}
}
@Override
protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
// 3、遍历集合,输出数据
Iterator<FlowBean> bean = flowMap.keySet().iterator();
while (bean.hasNext()) {
FlowBean v = bean.next();
context.write(new Text(flowMap.get(v)), v);
}
}
}
(4)编写TopNDriver类
package com.atguigu.mr.topn;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TopNDriver {
public static void main(String[] args) throws Exception {
args = new String[] { "d:/temp/atguigu/0529/input/inputtopn", "d:/temp/atguigu/0529/output20" };
// 1、获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6、指定本程序的jar包所在的本地路径
job.setJarByClass(TopNDriver.class);
// 2、指定本业务job要使用的mapper/reducer业务类
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);
// 3、指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 4、指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5、指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7、将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
7.3 找博客共同粉丝案例
1、需求
以下是博客的粉丝列表数据,冒号前是一个用户,冒号后是该用户的所有粉丝(数据中的粉丝关系是单向
的)
求出哪些人两两之间有共同粉丝,及他俩的共同粉丝都有谁?
(1)数据输入
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
2、需求分析
先求出A、B、C、……等是谁的粉丝
第一次输出结果
A I,K,C,B,G,F,H,O,D,
B A,F,J,E,
C A,E,B,H,F,G,K,
D G,C,K,A,L,F,E,H,
E G,M,L,H,A,F,B,D,
F L,M,D,C,G,A,
G M,
H O,
I O,C,
J O,
K B,
L D,E,
M E,F,
O A,H,I,J,F,
第二次输出结果
A-B E C
A-C D F
A-D E F
A-E D B C
A-F O B C D E
A-G F E C D
A-H E C D O
A-I O
A-J O B
A-K D C
A-L F E D
A-M E F
B-C A
B-D A E
B-E C
B-F E A C
B-G C E A
B-H A E C
B-I A
B-K C A
B-L E
B-M E
B-O A
C-D A F
C-E D
C-F D A
C-G D F A
C-H D A
C-I A
C-K A D
C-L D F
C-M F
C-O I A
D-E L
D-F A E
D-G E A F
D-H A E
D-I A
D-K A
D-L E F
D-M F E
D-O A
E-F D M C B
E-G C D
E-H C D
E-J B
E-K C D
E-L D
F-G D C A E
F-H A D O E C
F-I O A
F-J B O
F-K D C A
F-L E D
F-M E
F-O A
G-H D C E A
G-I A
G-K D A C
G-L D F E
G-M E F
G-O A
H-I O A
H-J O
H-K A C D
H-L D E
H-M E
H-O A
I-J O
I-K A
I-O A
K-L D
K-O A
L-M E F
3、代码实现
(1)第一次Mapper类
package com.atguigu.mr.friends;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// A:B,C,D,F,E,O
// 1、获取一行
String line = value.toString();
// 2、切割
String[] fields = line.split(":");
// 3、获取用户和用户的粉丝
String user = fields[0]; // person = A
String[] friends = fields[1].split(","); // firends = [B, C, D, F, E, O]
// 封装
v.set(user);
// 4、写出去
for (String friend : friends) {
k.set(friend);
context.write(k, v); // <粉丝,用户> <B,A><C,A><D,A>
}
}
}
(2)第一次Reducer类
package com.atguigu.mr.friends;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
// <B,A><C,A><D,A>
// 1、拼接
for (Text user : values) {
sb.append(user).append(","); //
}
v.set(sb.toString());
// 2、写出
context.write(key, v); // A I,K,C,B,G,F,H,O,D,
}
}
(3)第一次Driver类
package com.atguigu.mr.friends;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OneShareFriendsDriver {
public static void main(String[] args) throws Exception {
// 0、根据自己电脑路径重新配置
args = new String[] { "d:/temp/atguigu/0529/input/inputfriend", "d:/temp/atguigu/0529/output21" };
// 1、获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2、指定jar包运行的路径
job.setJarByClass(OneShareFriendsDriver.class);
// 3、指定map/reduce使用的类
job.setMapperClass(OneShareFriendsMapper.class);
job.setReducerClass(OneShareFriendsReducer.class);
// 4、指定map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5、指定最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 6、指定job的输入原始所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7、提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(4)第二次Mapper类
package com.atguigu.mr.friends;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// A I,K,C,B,G,F,H,O,D,
// 粉丝 用户,用户,用户
// 1、获取一行
String line = value.toString();
// 2、切割
String[] friend_users = line.split("\t");
// A
String friend = friend_users[0];
// I,K,C,B,G,F,H,O,D,
String[] users = friend_users[1].split(",");
Arrays.sort(users); // B,C,D,F,G,H,I,K,O
for (int i = 0; i < users.length - 1; i++) {
for (int j = i + 1; j < users.length; j++) {
context.write(new Text(users[i] + "-" + users[j]), new Text(friend));
}
}
}
}
(5)第二次Reducer类
package com.atguigu.mr.friends;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text friend : values) {
sb.append(friend).append(" ");
}
context.write(key, new Text(sb.toString()));
}
}
(6)第二次Driver类
package com.atguigu.mr.friends;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoShareFriendsDriver {
public static void main(String[] args) throws Exception {
// 0、根据自己电脑路径重新配置
args = new String[] { "d:/temp/atguigu/0529/input/inputfriends", "d:/temp/atguigu/0529/output22" };
// 1、获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2、指定jar包运行的路径
job.setJarByClass(TwoShareFriendsDriver.class);
// 3、指定map/reduce使用的类
job.setMapperClass(TwoShareFriendsMapper.class);
job.setReducerClass(TwoShareFriendsReducer.class);
// 4、指定map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//
// 5、指定最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 6、指定job的输入原始所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7、提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
第8章 常见错误及解决方案
1)导包容易出错。尤其Text和CombineTextInputFormat。
2)Mapper中第一个输入的参数必须是LongWritable或者NullWritable,不可以是IntWritable,报的错误是类型转换异常。
3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656(4),说明Partition和ReduceTask个数没对上,调整ReduceTask个数。
4)如果分区数不是1,但是reducetask为1,是否执行分区过程。
答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
5)在Windows环境编译的jar包导入到Linux环境中运行:
hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/ /user/atguigu/output
报如下错误:
Exception in thread "main" java.lang.UnsupportedClassVersionError: com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0
原因是Windows环境用的jdk1.7,Linux环境用的jdk1.8。
解决方案:统一jdk版本。
6)缓存pd.txt小文件案例中,报找不到pd.txt文件
原因:大部分为路径书写错误。还有就是要检查pd.txt.txt的问题。还有个别电脑写相对路径找不到pd.txt,可以修改为绝对路径。
7)报类型转换异常。
通常都是在驱动函数中设置Map输出和最终输出时编写错误。
Map输出的key如果没有排序,也会报类型转换异常。
8)集群中运行wc.jar时出现了无法获得输入文件。
原因:WordCount案例的输入文件不能放在 HDFS 集群的根目录。
9)出现了如下相关异常
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)
解决方案一:拷贝hadoop.dll文件(文件位置:D:\work\Hadoop\hadoop-2.7.2\bin)到Windows目录C:\Windows\System32。个别同学电脑还需要修改Hadoop源码。
解决方案二:创建如下包名,并将NativeIO.java拷贝到该包名下
10)自定义Outputformat时,注意在RecordWirter中的close()方法必须关闭流资源。否则输出的文件内容中数据为空。
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (atguigufos != null) {
atguigufos.close();
}
if (otherfos != null) {
otherfos.close();
}
}