一、MR reduce端Join分析:
### --- MR reduce端Join分析:

~~~     [Map端join_实现分析]
~~~     [Map端join_代码实现]
~~~     [Map端join_程序验证] 
~~~     # 缺点:
~~~     这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,
~~~     map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
~~~     # 解决方案: 
~~~     map端join实现方式
二、MR map端join
### --- 需求分析

~~~     适用于关联表中有小表的情形;
~~~     可以将小表分发到所有的map节点,这样,
~~~     map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,
~~~     可以大大提高join操作的并发度,加快处理速度
三、编程实现
### --- 代码实现、代码实现
~~~     在Mapper的setup阶段,将文件读取到缓存集合中
~~~     在驱动函数中加载缓存。
~~~     缓存普通文件到Task运行节点。

job.addCacheFile(new URI("file:///e:/cache/position.txt"));
### --- 创建程序:map_join
~~~     Driver

package com.yanqi.mr.map_join;

import com.yanqi.mr.reduce_join.ReduceJoinReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {

//        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();
        final Job job = Job.getInstance(conf, "MapJoinDriver");
//        2. 指定程序jar的本地路径
        job.setJarByClass(MapJoinDriver.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(MapJoinMapper.class);
//        4. 指定最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

//5. 指定job读取数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0])); //指定读取数据的原始路径
//        6. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定结果数据输出路径
        //设置加载缓存文件
        job.addCacheFile(new URI("file:///E:/map_join/cache/position.txt"));
        //设置reducetask数量为0
        job.setNumReduceTasks(0);
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}
### --- Mapper

package com.yanqi.mr.map_join;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;

/*
使用map端join完成投递行为与职位数据的关联
map端缓存所有的职位数据
map方法读取的文件数据是投递行为数据
基于投递行为数据的positionid去缓存中查询出positionname,输出即可。
这个job中无需reducetask,setnumreducetask为0
 */
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    HashMap<String, String> map = new HashMap<>();
    Text k = new Text();

    //加载职位数据
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //读取缓存文件
        InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream("position.txt"), "UTF-8");
        BufferedReader reader = new BufferedReader(inputStreamReader);
        //读取职位数据解析为kv类型(hashmap):,key:positionid,value:positionname

        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            String[] fields = line.split("\t");
            map.put(fields[0], fields[1]);
        }
        reader.lines();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] arr = line.split("\t");
        //都是投递行为数据
        String positionName = map.get(arr[1]);
        k.set(line + "\t" + positionName);
        context.write(k, NullWritable.get());
    }
}
### --- DeliverBean

package com.yanqi.mr.map_join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DeliverBean implements Writable {

    private String userId;
    private String positionId;
    private String date;
    private String positionName;
    //判断是投递数据还是职位数据标识
    private String flag;

    public DeliverBean() {
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getPositionId() {
        return positionId;
    }

    public void setPositionId(String positionId) {
        this.positionId = positionId;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getPositionName() {
        return positionName;
    }

    public void setPositionName(String positionName) {
        this.positionName = positionName;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(userId);
        out.writeUTF(positionId);
        out.writeUTF(date);
        out.writeUTF(positionName);
        out.writeUTF(flag);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.userId = in.readUTF();
        this.positionId = in.readUTF();
        this.date = in.readUTF();
        this.positionName = in.readUTF();
        this.flag = in.readUTF();
    }

    @Override
    public String toString() {
        return "DeliverBean{" +
                "userId='" + userId + '\'' +
                ", positionId='" + positionId + '\'' +
                ", date='" + date + '\'' +
                ", positionName='" + positionName + '\'' +
                ", flag='" + flag + '\'' +
                '}';
    }
}
四、编译打印
### --- 编译打印

~~~     输入输出参数配置
~~~     编译打印

 
 
 
 
 
 
 
 
 

Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm’d both hands before the fire of life.It sinks, and I am ready to depart
                                                                                                                                                   ——W.S.Landor

 

 

版权声明:本文为yanqi_vip原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/yanqivip/p/16112035.html