OutputFormat主要是用来指定MR程序的最终的输出数据格式 。

默认使用的是TextOutputFormat,默认是将数据一行写一条数据,并且把数据放到指定的输出目录下,以 part-r-xxxxx数字开头。并且默认情况下有几个ReduceTask就有几个结果文件产生

  1. 定义MyOutputFormat继承FileOutputFormat<T>,泛型传入的是Reducer的输出类型
  2. 重写里面的getRecordWriter()方法,这个方法需要返回一个RecordWriter对象。

    这个方法里面定义了最终文件输出到什么地方

  3. 创建一个RecordWriter对象,继承RecordWriter<T>,重写里面的两个方法:write()、close()。其中write()方法中需要定义想要将文件输出到什么地方去,在这个方法中定义输出数据地址和输出数据格式
  4. 在Driver中通过job.setOutputFormatClass()指定我们使用的是哪个OutputFormat实现类

注意】如果设置了分区,并且指定了ReduceTask的数量,那么根据以前所学的有多少个ReduceTask就会生成多少个结果文件,是因为默认使用的是TextOutputFormat实现类,这个实现类就是几个ReduceTask就有几个结果文件。但是如果我们自定义了OutputFormat,那么结果文件只有我们指明的地址,没有其他。

需求:将手机流量数据根据总流向升序输出到MySQL数据库中

代码:

  1. FlowOutputInformat.java

    1. public class FlowOutputFormat extends FileOutputFormat<FlowBean, NullWritable> {
    2. @Override
    3. public RecordWriter<FlowBean, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    4. return new MyRecordWriter();
    5. }
    6. }
  2. MyRecordWriter.java

    1. public class MyRecordWriter extends RecordWriter<FlowBean, NullWritable> {
    2. /**
    3. * 需要在这个方法中定义输出格式、输出数据地址
    4. * @param flowBean:Reduce阶段输出数据Key值
    5. * @param nullWritable:Reduce阶段输出value值
    6. */
    7. @SneakyThrows
    8. @Override
    9. public void write(FlowBean flowBean, NullWritable nullWritable) throws IOException, InterruptedException {
    10. Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/sx_bigdata?serverTimezone=UTC", "root", "root");
    11. PreparedStatement preparedStatement = connection.prepareStatement("insert into phone_flow values (?, ?, ?, ?)");
    12. preparedStatement.setString(1, flowBean.getPhone());
    13. preparedStatement.setInt(2, flowBean.getUpFlow());
    14. preparedStatement.setInt(3, flowBean.getDownFlow());
    15. preparedStatement.setInt(4, flowBean.getSumFlow());
    16. int i = preparedStatement.executeUpdate();
    17. if (i > 0) {
    18. System.out.println("添加成功!");
    19. } else {
    20. System.out.println("添加失败!");
    21. }
    22. connection.close();
    23. preparedStatement.close();
    24. }
    25. @Override
    26. public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    27. }
  3. FlowDriver.java

    1. job.setOutputFormatClass(FlowOutputFormat.class);

需求:将单词计数案例结果输出到本地,其中首字母为大写字母存储在/upper.txt目录下,首字母为小写字母存储在/lower.txt目录下

代码:

  1. MyOutputFormat.java

    1. public class MyOutputFormat extends FileOutputFormat<Text, LongWritable> {
    2. @SneakyThrows
    3. @Override
    4. public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    5. return new MyRecordWriter(taskAttemptContext);
    6. }
    7. }
  2. MyRecordWriter.java

    1. public class MyRecordWriter extends RecordWriter<Text, LongWritable> {
    2. FSDataOutputStream fsDataOutputStream1;
    3. FSDataOutputStream fsDataOutputStream2;
    4. public MyRecordWriter(TaskAttemptContext taskAttemptContext) throws Exception {
    5. Configuration configuration = taskAttemptContext.getConfiguration();
    6. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), configuration, "root");
    7. Path out1 = new Path("/test/school/upper.txt");
    8. Path out2 = new Path("/test/school/lower.txt");
    9. if (fs.exists(out1)) {
    10. fs.delete(out1, true);
    11. }
    12. if (fs.exists(out2)) {
    13. fs.delete(out2, true);
    14. }
    15. fsDataOutputStream1 = fs.create(out1);
    16. fsDataOutputStream2 = fs.create(out2);
    17. }
    18. @Override
    19. public void write(Text text, LongWritable longWritable) throws IOException, InterruptedException {
    20. char firstWord = text.toString().charAt(0);
    21. String line = text + "\t" + longWritable.get() + "\r\n";
    22. if (Character.isUpperCase(firstWord)) {
    23. fsDataOutputStream1.write(line.getBytes());
    24. } else {
    25. fsDataOutputStream2.write(line.getBytes());
    26. }
    27. }
    28. @Override
    29. public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    30. if (fsDataOutputStream1 != null) {
    31. fsDataOutputStream1.close();
    32. }
    33. if (fsDataOutputStream2 != null) {
    34. fsDataOutputStream2.close();
    35. }
    36. }
    37. }
  3. FlowDriver.java

    1. job.setOutputFormatClass(MyOutputFormat.class);

版权声明:本文为zyd-994264926326原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/zyd-994264926326/p/15136571.html