Hadoop序列化之MapReduce案例
Hadoop序列化
序列化概述
序列化就是把内存中的对象、转换成字节系列(或者其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
1、JAVA序列化和hadoop序列化
java序列化:java序列化时一个重量级序列化框架,一个对象对序列化后,会附带很多额外的信息(包括校验信息,Header、继承体系),不便于在网络中高效传输。
Hadoop序列化特点
- 紧凑:高效使用存储空间
- 快速:读写数据的额外空间小
- 可拓展:随着通信协议的升级而升级
- 互操作:支持多语言的交互
hadoop的序列化 Writable
1)自定的类需要实现Writable接口
2)提高无参数的构造器(反序列化时通过反射的方式调用无参数构造器来创建对象)
3)重写write方法实现序列化的过程(自定义序列化的内容)
4)重写readFields方法实现反序列化过程
5)如果自定义的类需要作为输出的key或者value来使用的化,一般建议重写ToString方法,因为hadoop会默认调用对象的toString方法进行输出
Haoop序列化实例
在MapReduce之wordcount实例中已经对源码进行分析,这里不对源码继续分析
1、新建一个类进行封装手机号
FlowBean.java
package Wtritable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*
* 用于封装一个手机号的 上行流量 下行流量 总流量
* 因为flowBean要作为value在mr过程中传入,且会落盘,因此需要支持序列化和反序列化
*
* */
public class FlowBean implements Writable {
private Long upFlow;// 设置上行流量
private Long downFlow;//设置下行流量
private Long sumFlow;// 设置总流量
public FlowBean(){}
public Long getUpFlow() {
return upFlow;
}
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
public Long getSumFlow() {
return sumFlow;
}
public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow(){
this.sumFlow=this.getUpFlow()+this.getDownFlow();
}
// 序列化方法
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
// 反序列化方法
/* 注意:反序列融化读取数据孙粗需要与反序列化写入数据的顺序一致
* */
public void readFields(DataInput dataInput) throws IOException {
this.upFlow= dataInput.readLong();
this.downFlow=dataInput.readLong();
this.sumFlow=dataInput.readLong();
}
@Override
public String toString() {
return this.upFlow+"\t"+this.downFlow+'\t'+this.sumFlow;
}
}
2、重写Mapper类
FlowMapper.java
package Wtritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
// 定义写出的key
private Text outk=new Text();
// 定义写出的value
private FlowBean outv=new FlowBean();
@Override
protected void map(LongWritable key, Text vale,Context context) throws IOException, InterruptedException {
//1、将读取的一行数据转回String
// 数据格式 12345567788 234 342324 200
String line=vale.toString();
//2、切割数据
String[] flowMsg=line.split("\t");
//3、封装数据
outk.set(flowMsg[1]);
// 4、封装value
outv.setUpFlow(Long.parseLong(flowMsg[flowMsg.length-3]));
outv.setDownFlow(Long.parseLong(flowMsg[flowMsg.length-2]));
outv.setSumFlow();
//5、写出
context.write(outk,outv);
}
}
3、重写Reudcer类
FlowReudecr.java
package Wtritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text,FlowBean, Text,FlowBean> {
//写出的value
FlowBean outv=new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
long totalUpFlow=0;
long totalDownFlow=0;
long totalSumFlow=0;
//1、迭代values,将相同的key和values汇总到一起
for(FlowBean bean:values){
totalUpFlow+=bean.getUpFlow();
totalDownFlow+=bean.getDownFlow();
totalSumFlow+=bean.getSumFlow();
}
//2、封装value
outv.setUpFlow(totalUpFlow);
outv.setDownFlow(totalDownFlow);
outv.setSumFlow();
//3、写出
context.write(key,outv);
}
}
4、执行代码
FlowDriver.java内容
package Wtritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、创建配置环境
Configuration conf=new Configuration();
// 2、创建Job环境
Job job=Job.getInstance(conf);
//3、关联驱动类
job.setJarByClass(FlowDriver.class);
//4、关联Mapper和Reducer类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//5、设置map输出的k和v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 6、设置最终输出的k和v的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//7、设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\踩着上帝的小丑\\大数据测试数据\\a"));
FileOutputFormat.setOutputPath(job,new Path("D:\\踩着上帝的小丑\\大数据测试数据\\b")); //自己的路径
// 8、提交数据
job.waitForCompletion(true);
}
}