在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数
近期開始使用MapReduce,发现网上大部分样例都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理就可以。对于文本数据处理,这个类还是能满足一部分应用场景。可是假设要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。
本文以一个简单的应用场景为例:对依照二进制格式存储的整数做频数统计。当然,也能够在此基础上实现排序之类的其它应用。实现该应用的主要难点就是怎样处理输入数据。參考《权威指南·第三版》得知须要继承FileInputFormat这个类,并实现下面三个方法:
class MyInputFormat extends FileInputFormat<Type1, Type2> { /* * 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块 */ protected boolean isSplitable(Configuration conf, Path path) { } /* * MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。 * 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中 * 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。 * */ public List<InputSplit> getSplits(Configuration conf) throws IOException { } /* * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context). * 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑: * public void run(Context context) throws IOException, InterruptedException { * setup(context); * 调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数 * while (context.nextKeyValue()) { * map(context.getCurrentKey(), context.getCurrentValue(), context); * } * cleanup(context); * } **/ public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } }
在RecordReader函数中实现下面几个接口:
public class BinRecordReader extends RecordReader<LongWritable, IntWritable> { /*关闭文件流 * */ public void close() {} /* * 获取处理进度 **/ public float getProgress() {} /* * 获取当前的Key * */ public LongWritable getCurrentKey() throws IOException, InterruptedException {} /* 获取当前的Value * */ public IntWritable getCurrentValue() throws IOException,InterruptedException {} /* * 进行初始化工作,打开文件流,依据分块信息设置起始位置和长度等等 * */ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {} /*生成下一个键值对 **/ public boolean nextKeyValue() throws IOException, InterruptedException { } }
下面为是三个文件的代码,首先是BinInputFormat.java的代码:
package org.apache.hadoop.examples; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.examples.BinRecordReader; class BinInputFormat extends FileInputFormat<LongWritable, IntWritable> { private static final double SPLIT_SLOP=1.1; /* * 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块 */ protected boolean isSplitable(Configuration conf, Path path) { return true; } /* * MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。 * 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中 * 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。 * */ public List<InputSplit> getSplits(Configuration conf) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); long minSplitSize = conf.getLong("mapred.min.split.size",1); long maxSplitSize = conf.getLong("mapred.max.split.size", 1); long blockSize = conf.getLong("dfs.block.size",1); long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize)); FileSystem fs = FileSystem.get(conf); String path = conf.get(INPUT_DIR); FileStatus[] files = fs.listStatus(new Path(path)); for (int fileIndex = 0; fileIndex < files.length; fileIndex++) { FileStatus file = files[fileIndex]; System.out.println("input file: " + file.getPath().toString()); long length = file.getLen(); FileSystem fsin = file.getPath().getFileSystem(conf); BlockLocation[] blkLocations = fsin.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(conf, file.getPath())) { long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(file.getPath(), length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(file.getPath(), length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else if (length != 0) { splits.add(new FileSplit(file.getPath(), 0, length, blkLocations[0].getHosts())); } else { //Create empty hosts array for zero length files splits.add(new FileSplit(file.getPath(), 0, length, new String[0])); } } return splits; } /* * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context). * 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑: * public void run(Context context) throws IOException, InterruptedException { * setup(context); * 调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数 * while (context.nextKeyValue()) { * map(context.getCurrentKey(), context.getCurrentValue(), context); * } * cleanup(context); * } **/ public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub BinRecordReader reader = new BinRecordReader(); reader.initialize(split,context); return reader; } }
下面为BinRecordReader.java的代码:
package org.apache.hadoop.examples; import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordReader; /** * Return a single record (filename, "") where the filename is taken from * the file split. */ public class BinRecordReader extends RecordReader<LongWritable, IntWritable> { private FSDataInputStream inputStream = null; private long start,end,pos; private Configuration conf = null; private FileSplit fileSplit = null; private LongWritable key = new LongWritable(); private IntWritable value = new IntWritable(); private boolean processed = false; public BinRecordReader() throws IOException { } /*关闭文件流 * */ public void close() { try { if(inputStream != null) inputStream.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /* * 获取处理进度 **/ public float getProgress() { return ((processed == true)? 1.0f : 0.0f); } /* * 获取当前的Key * */ public LongWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } /* 获取当前的Value * */ public IntWritable getCurrentValue() throws IOException,InterruptedException { // TODO Auto-generated method stub return value; } /* * 进行初始化工作,打开文件流,依据分块信息设置起始位置和长度等等 * */ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub fileSplit = (FileSplit)inputSplit; conf = context.getConfiguration(); this.start = fileSplit.getStart(); this.end = this.start + fileSplit.getLength(); try{ Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); this.inputStream = fs.open(path); inputStream.seek(this.start); this.pos = this.start; } catch(IOException e) { e.printStackTrace(); } } /*生成下一个键值对 **/ public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub if(this.pos < this.end) { key.set(this.pos); value.set(Integer.reverseBytes(inputStream.readInt())); this.pos = inputStream.getPos(); return true; } else { processed = true; return false; } } }
下面是主文件BinCount.java的代码
package org.apache.hadoop.examples; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.examples.BinInputFormat; public class IntCount { public static class TokenizerMapper extends Mapper<LongWritable, IntWritable, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text intNum = new Text(); public void map(LongWritable key, IntWritable value, Context context ) throws IOException, InterruptedException { intNum.set(Integer.toString(value.get())); context.write(intNum, one); } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { System.out.println("testing1"); Configuration conf = new Configuration(); String[] newArgs = new String[]{"hdfs://localhost:9000/read","hdfs://localhost:9000/data/wc_output21"}; String[] otherArgs = new GenericOptionsParser(conf, newArgs).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "IntCount"); job.setJarByClass(IntCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); //设置自己定义的输入类 job.setInputFormatClass(BinInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
接着我们用一段C语言生成二进制格式存储的文件,C语言代码例如以下:
#include<stdio.h> int main(){ FILE * fp = fopen("tmpfile","wb"); int i,j; for(i=0;i<10;i++) { for(j=0;j<10;j++) fwrite(&j,sizeof(int),1,fp); } fclose(fp); return 0; }
将生成的文件复制到/read/下,接着启动IntCount这个MapReduce程序,打开执行结果: