|NO.Z.00041|——————————|BigDataEnd|——|Hadoop&MapReduce.V14|——|Hadoop.v14|MR map端join|
一、MR reduce端Join分析:
### --- MR reduce端Join分析:
~~~ [Map端join_实现分析]
~~~ [Map端join_代码实现]
~~~ [Map端join_程序验证]
~~~ # 缺点:
~~~ 这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,
~~~ map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
~~~ # 解决方案:
~~~ map端join实现方式
二、MR map端join
### --- 需求分析
~~~ 适用于关联表中有小表的情形;
~~~ 可以将小表分发到所有的map节点,这样,
~~~ map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,
~~~ 可以大大提高join操作的并发度,加快处理速度
三、编程实现
### --- 代码实现、代码实现
~~~ 在Mapper的setup阶段,将文件读取到缓存集合中
~~~ 在驱动函数中加载缓存。
~~~ 缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/position.txt"));
### --- 创建程序:map_join
~~~ Driver
package com.yanqi.mr.map_join;
import com.yanqi.mr.reduce_join.ReduceJoinReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "MapJoinDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(MapJoinDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(MapJoinMapper.class);
// 4. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//5. 指定job读取数据路径
FileInputFormat.setInputPaths(job, new Path(args[0])); //指定读取数据的原始路径
// 6. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定结果数据输出路径
//设置加载缓存文件
job.addCacheFile(new URI("file:///E:/map_join/cache/position.txt"));
//设置reducetask数量为0
job.setNumReduceTasks(0);
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
### --- Mapper
package com.yanqi.mr.map_join;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
/*
使用map端join完成投递行为与职位数据的关联
map端缓存所有的职位数据
map方法读取的文件数据是投递行为数据
基于投递行为数据的positionid去缓存中查询出positionname,输出即可。
这个job中无需reducetask,setnumreducetask为0
*/
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
HashMap<String, String> map = new HashMap<>();
Text k = new Text();
//加载职位数据
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//读取缓存文件
InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream("position.txt"), "UTF-8");
BufferedReader reader = new BufferedReader(inputStreamReader);
//读取职位数据解析为kv类型(hashmap):,key:positionid,value:positionname
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
String[] fields = line.split("\t");
map.put(fields[0], fields[1]);
}
reader.lines();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split("\t");
//都是投递行为数据
String positionName = map.get(arr[1]);
k.set(line + "\t" + positionName);
context.write(k, NullWritable.get());
}
}
### --- DeliverBean
package com.yanqi.mr.map_join;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class DeliverBean implements Writable {
private String userId;
private String positionId;
private String date;
private String positionName;
//判断是投递数据还是职位数据标识
private String flag;
public DeliverBean() {
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getPositionId() {
return positionId;
}
public void setPositionId(String positionId) {
this.positionId = positionId;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getPositionName() {
return positionName;
}
public void setPositionName(String positionName) {
this.positionName = positionName;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(userId);
out.writeUTF(positionId);
out.writeUTF(date);
out.writeUTF(positionName);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.userId = in.readUTF();
this.positionId = in.readUTF();
this.date = in.readUTF();
this.positionName = in.readUTF();
this.flag = in.readUTF();
}
@Override
public String toString() {
return "DeliverBean{" +
"userId='" + userId + '\'' +
", positionId='" + positionId + '\'' +
", date='" + date + '\'' +
", positionName='" + positionName + '\'' +
", flag='" + flag + '\'' +
'}';
}
}
四、编译打印
### --- 编译打印
~~~ 输入输出参数配置
~~~ 编译打印
Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm’d both hands before the fire of life.It sinks, and I am ready to depart
——W.S.Landor
版权声明:本文为yanqi_vip原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。