继承FileInputFormat类来理解 FileInputFormat类
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.InvalidInputException; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; //import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter; import org.apache.hadoop.mapreduce.security.TokenCache; import com.google.common.base.Charsets; public class MyFileinput extends FileInputFormat<LongWritable, Text> { private static final PathFilter hiddenFileFilter = new PathFilter() { public boolean accept(Path p) { String name = p.getName(); return ((!(name.startsWith("_"))) && (!(name.startsWith(".")))); } }; // 遍历文件列表, 过滤掉_ .开头的文件(可以自定义过滤) protected List<FileStatus> listStatus(JobContext job) throws IOException { System.out.println("*********************"); List result = new ArrayList(); Path[] dirs = getInputPaths(job); System.out.println("dirs" + dirs); System.out.println("dirs length = " + dirs.length); for(Path p: dirs){ System.out.println("Path loop " + p); } if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); List errors = new ArrayList(); List filters = new ArrayList(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } // 过滤函数,可以拓展 PathFilter inputFilter = new MultiPathFilter(filters); for (int i = 0; i < dirs.length; ++i) { Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); FileStatus[] matches = fs.globStatus(p, inputFilter); System.out.println("matches=" + matches); for(FileStatus match: matches){ System.out.println("loop matches" + match.getPath()); } if (matches == null) errors.add(new IOException("Input path does not exist: " + p)); else if (matches.length == 0) errors.add(new IOException("Input Pattern " + p + " matches 0 files")); else { for (FileStatus globStat : matches) { System.out.println("globStat " + globStat); if (globStat.isDirectory()) for (FileStatus stat : fs.listStatus( globStat.getPath(), inputFilter)) { result.add(stat); } else { result.add(globStat); } } } } if (!(errors.isEmpty())) { throw new InvalidInputException(errors); } // LOG.info("Total input paths to process : " + result.size()); return result; } // 计算分片大小,返回一个分片列表 public List<InputSplit> getSplits(JobContext job) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); System.out.print("minSize " + minSize); System.out.print("maxSize " + maxSize); List splits = new ArrayList(); // 获取输入目录下的文件列表(过滤文件) List<FileStatus> files = listStatus(job); for (FileStatus file : files) { Path path = file.getPath(); long length = file.getLen(); System.out.println("path: " + path+ " file len = " + length); if (length != 0L) { // 通过路径找到块列表 FileSystem fs = path.getFileSystem(job.getConfiguration()); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0L, length); if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); System.out.println("blockSize:" + blockSize); long splitSize = computeSplitSize(blockSize, minSize, maxSize); System.out.println("splitSize :" + splitSize); long bytesRemaining = length; System.out.println("bytesRemaining :" + bytesRemaining); System.out.println(bytesRemaining / splitSize); // 定义为1.1D, 为避免一个分片过小, 也需要启动一个MAP来运行 // 最后剩余的文件大小只要不超过分片大小的1.1倍都会放入一个分片 while (bytesRemaining / splitSize > 1.1D) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); System.out.println("blkIndex :" + blkIndex); // 添加到分片分片列表 splits.add(makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } // 文件尾 if (bytesRemaining != 0L) { Long remain = length - bytesRemaining; System.out.println("文件尾大小" + bytesRemaining); int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts())); } } else { splits.add(makeSplit(path, 0L, length, blkLocations[0].getHosts())); } } else { // 测试文件大小为0, 也会启动一个map splits.add(makeSplit(path, 0L, length, new String[0])); } } job.getConfiguration().setLong( "mapreduce.input.fileinputformat.numinputfiles", files.size()); // LOG.debug("Total # of splits: " + splits.size()); return splits; } private static class MultiPathFilter implements PathFilter { private List<PathFilter> filters; public MultiPathFilter(List<PathFilter> filters) { this.filters = filters; } public boolean accept(Path path) { for (PathFilter filter : this.filters) { if (!(filter.accept(path))) { return false; } } return true; } } // 文件内容读取, 默认按行读取 @Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); System.out.println("delimiter ==" + delimiter); // 默认为空 byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); } }
主要功能是计算分片和按照分片给MAP任务读取内容
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext paramJobContext)
throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(
InputSplit paramInputSplit,
TaskAttemptContext paramTaskAttemptContext) throws IOException,
InterruptedException;
}
从顶层的派生类提供的接口差不多也能看出来。
最简单的Informat实现, 然后我们只要实现RecordReader就可以了
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import com.google.common.base.Charsets; public class MySimpleInformat<V> extends FileInputFormat<LongWritable, V> { protected boolean isSplitable(JobContext context, Path filename) { // 是否需要分片 return false; } @Override public RecordReader<LongWritable, V> createRecordReader( InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); System.out.println("delimiter ==" + delimiter); // 默认为空 byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return (RecordReader<LongWritable, V>) new LineRecordReader(recordDelimiterBytes); } }