一、数据倾斜解决方案
### --- 什么是数据倾斜?
~~~     数据倾斜无非就是大量的相同key被partition分配到一个分区里,
    
### --- 现象
~~~     绝大多数task执行得都非常快,但个别task执行的极慢。甚至失败!

### --- 通用解决方案:
~~~     对key增加随机数。
二、编程代码:创建项目:skew;job1
### --- 编写代码Job1

package com.yanqi.mr.skew;

import com.yanqi.mr.wc.WordCountCombiner;
import com.yanqi.mr.wc.WordCountDriver;
import com.yanqi.mr.wc.WordCountMapper;
import com.yanqi.mr.wc.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Job1 {
    public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        //提升为全局变量,避免每次执行map方法都执行此操作
        final Text word = new Text();
        final IntWritable one = new IntWritable(1);
        // LongWritable, Text-->文本偏移量,一行文本内容,map方法的输入参数,一行文本就调用一次map方法
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        1 接收到文本内容,转为String类型
            final String str = value.toString();
//        2 按照空格进行切分
            final String[] words = str.split(" ");
//        3 输出<单词,1>
            //遍历数据
            for (String s : words) {
                word.set(s);
                context.write(word, one);
            }
        }
    }
    public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        IntWritable total = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //2 遍历key对应的values,然后累加结果
            int sum = 0;
            for (IntWritable value : values) {
                int i = value.get();
                sum += 1;
            }
            // 3 直接输出当前key对应的sum值,结果就是单词出现的总次数
            total.set(sum);
            context.write(key, total);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();
        //针对reduce端输出使用snappy压缩
        final Job job = Job.getInstance(conf, "Job1");
//        2. 指定程序jar的本地路径
        job.setJarByClass(Job1.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
//        4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
//        5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        FileInputFormat.setInputPaths(job, new Path(args[0])); //指定读取数据的原始路径
//        7. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定结果数据输出路径
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}
三、配置输入输出参数
四、打印输出

五、编程代码:创建项目:skew;job2
### --- 编程代码job2

package com.yanqi.mr.skew;

import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Random;

public class Job2 {
    public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        //提升为全局变量,避免每次执行map方法都执行此操作
        final Text word = new Text();
        final IntWritable one = new IntWritable(1);
        Random random = new Random();
        // LongWritable, Text-->文本偏移量,一行文本内容,map方法的输入参数,一行文本就调用一次map方法
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        1 接收到文本内容,转为String类型
            final String str = value.toString();
//        2 按照空格进行切分
            final String[] words = str.split(" ");
            //遍历数据
            for (String s : words) {
                word.set(s + "&" + random.nextInt(context.getNumReduceTasks()));
                context.write(word, one); //单词+随机数,1
            }
        }
    }
    public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable total = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //2 遍历key对应的values,然后累加结果
            int sum = 0;
            for (IntWritable value : values) {
                int i = value.get();
                sum += 1;
            }
            // 3 直接输出当前key对应的sum值,结果就是单词出现的总次数
            total.set(sum);
            context.write(key, total);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();
        //针对reduce端输出使用snappy压缩
        final Job job = Job.getInstance(conf, "Job2");
//        2. 指定程序jar的本地路径
        job.setJarByClass(Job2.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
//        4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
//        5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setNumReduceTasks(3);
        FileInputFormat.setInputPaths(job, new Path(args[0])); //指定读取数据的原始路径
//        7. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定结果数据输出路径
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}
六、配置输入输出参数
七、打印输出

八、编程代码:创建项目:skew;job3
### --- 编程代码job3

package com.yanqi.mr.skew;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Random;

public class Job3 {
    public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        //提升为全局变量,避免每次执行map方法都执行此操作
        final Text word = new Text();
        final IntWritable num = new IntWritable();
        Random random = new Random();

        // LongWritable, Text-->文本偏移量,一行文本内容,map方法的输入参数,一行文本就调用一次map方法
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//        1 接收到文本内容,转为String类型
            final String str = value.toString();
//        2 按照空格进行切分
            final String[] words = str.split("\t");
//        3 输出<单词,数量>
            String[] fields = words[0].split("&");
            //遍历数据
            word.set(fields[0]);
            num.set(Integer.parseInt(words[1]));
            context.write(word, num);


        }
    }


    public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable total = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //2 遍历key对应的values,然后累加结果
            int sum = 0;
            for (IntWritable value : values) {
                int i = value.get();
                sum += i;
            }
            // 3 直接输出当前key对应的sum值,结果就是单词出现的总次数
            total.set(sum);
            context.write(key, total);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();
        //针对reduce端输出使用snappy压缩
        final Job job = Job.getInstance(conf, "Job3");
//        2. 指定程序jar的本地路径
        job.setJarByClass(Job3.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
//        4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
//        5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        FileInputFormat.setInputPaths(job, new Path(args[0])); //指定读取数据的原始路径
//        7. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定结果数据输出路径
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}
九、配置输入输出参数

十、打印输出

 
 
 
 
 
 
 
 
 

Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm’d both hands before the fire of life.It sinks, and I am ready to depart
                                                                                                                                                   ——W.S.Landor

 

 

版权声明:本文为yanqi_vip原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/yanqivip/p/16112036.html