一、[InputFormat机制之自定义outputFormat]:OutputFormat
### --- OutputFormat
~~~     OutputFormat:是MapReduce输出数据的基类,
~~~     所有MapReduce的数据输出都实现了OutputFormat抽象类。
~~~     下面我们介绍几种常见的OutputFormat子类

### --- TextOutputFormat
~~~     默认的输出格式是TextOutputFormat,它把每条记录写为文本行。
~~~     它的键和值可以是任意类型,因为TextOutputFormat调用toString()方 法把它们转换为字符串。

### --- SequenceFileOutputFormat
~~~     将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,
~~~     这是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
二、自定义OutputFormat
### --- 需求分析

~~~     要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,
~~~     这类输出需求可以通过自定义OutputFormat来实现。
三、实现步骤
### --- 实现步骤

~~~     自定义一个类继承FileOutputFormat。
~~~     改写RecordWriter,改写输出数据的方法write()。
### --- 需求
~~~     网络请求日志数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.yanqi.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
~~~     # 输出结果

yanqi.log
http://www.yanqi.com
### --- other.log

http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
四、编程实现
### --- 创建项目:output
~~~     Mapper

package com.yanqi.mr.output;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value, NullWritable.get());
    }
}
### --- Reducer

package com.yanqi.mr.output;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;

public class OutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}
### --- OutputFormat

package com.yanqi.mr.output;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> {
    //写出数据的对象
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        //定义写出数据的路径信息,并获取到输出流传入writer对象中
        final Configuration conf = context.getConfiguration();
        final FileSystem fs = FileSystem.get(conf);
        //定义输出的路径
        final FSDataOutputStream yanqiOut = fs.create(new Path("e:/yanqi.log"));
        final FSDataOutputStream otherOut = fs.create(new Path("e:/other.log"));
        CustomWriter customWriter = new CustomWriter(yanqiOut, otherOut);
        return customWriter;
    }
}
### --- RecordWriter

package com.yanqi.mr.output;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;


public class CustomWriter extends RecordWriter<Text, NullWritable> {
    //定义成员变量
    private FSDataOutputStream yanqiOut;
    private FSDataOutputStream otherOut;

    //定义构造方法接收两个输出流


    public CustomWriter(FSDataOutputStream yanqiOut, FSDataOutputStream otherOut) {
        this.yanqiOut = yanqiOut;
        this.otherOut = otherOut;
    }

    //写出数据的逻辑,控制写出的路径
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        //写出数据需要输出流
        final String line = key.toString();
        if (line.contains("yanqi")) {
            yanqiOut.write(line.getBytes());
            yanqiOut.write("\r\n".getBytes());
        } else {
            otherOut.write(line.getBytes());
            otherOut.write("\r\n".getBytes());
        }
    }

    //关闭,释放资源
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {

        IOUtils.closeStream(yanqiOut);
        IOUtils.closeStream(otherOut);
    }
}
### --- Driver:验证结果是否已把数据分别输出到不同的目录中!!

package com.yanqi.mr.output;

import com.yanqi.mr.wc.WordCountDriver;
import com.yanqi.mr.wc.WordCountMapper;
import com.yanqi.mr.wc.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OutputDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              /*
        1. 获取配置文件对象,获取job对象实例
        2. 指定程序jar的本地路径
        3. 指定Mapper/Reducer类
        4. 指定Mapper输出的kv数据类型
        5. 指定最终输出的kv数据类型
        6. 指定job处理的原始数据路径
        7. 指定job输出结果路径
        8. 提交作业
         */
//        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();

        final Job job = Job.getInstance(conf, "OutputDriver");
//        2. 指定程序jar的本地路径
        job.setJarByClass(OutputDriver.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(OutputMapper.class);
        job.setReducerClass(OutputReducer.class);
//        4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
//        5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //指定使用自定义outputformat
        job.setOutputFormatClass(CustomOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("E:\\data\\input")); //指定读取数据的原始路径
//        7. 指定job输出结果路径,因为mr默认要输出一个success等标识文件
        FileOutputFormat.setOutputPath(job, new Path("E:\\data\\output")); //指定结果数据输出路径
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}
五、编译打印
### --- 编译打印

~~~     配置输出输出参数
~~~     定义输入输出参数文件地址
~~~     编译打印

 
 
 
 
 
 
 
 
 

Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm’d both hands before the fire of life.It sinks, and I am ready to depart
                                                                                                                                                   ——W.S.Landor

 

 

版权声明:本文为yanqi_vip原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/yanqivip/p/16112051.html