FileInputFormat - tneduts
MapReduce框架要处理数据的文件类型 FileInputFormat这个类决定。
TextInputFormat是框架默认的文件类型,可以处理Text文件类型,如果你要处理的文件类型不是Text,
譬如说是Xml或DB,你就需要自己实现或用库中已有的类型。
FileInputFormat的主要方法之一getSplits完成的功能是获取job要处理的路径文件所在的block信息。
数据结构:FileInputSplit 存储了文件的位置信息,如Host,所属文件信息,开始offset,还有长度信息。
- public class FileSplit extends InputSplit implements Writable {
- private Path file;
- private long start;
- private long length;
- private String[] hosts;
- private SplitLocationInfo[] hostInfos;
- …
- }
方法介绍:
blockSize:块大小
minSize:最小分片大小,由参数mapred.min.split.size设置,默认为1
maxSize:最大分片大小,由参数mapred.max.split.size设置,默认Long.MAX-VALUE
计算splitsize的方法:Math.max(minSize,Math.min(maxSize,blockSize)
FileInputFormat的另一个重要方法是CreateRecordReader.在这个方法里面会用到前面方法所获取到的InpustSplit.这个RecordReader会用来去读取数据,传递给maptask去执行处理。
当InputSplit尺寸大于block并且其对应的所有block(包含副本)不在同一个节点上时,Map Task不可能完全实现数据的本地化,
也就是说,总有一部分数据需要从远程节点上读取,因此得出,当使用基于FileInputFormat实现InputFormat时,为了提高数据本地性,应该尽量使InputSplit大小与block大小一致。
因为不同的文件,在上传的时候可以具体指定blocksize,若不指定则使用系统默认的blocksize,所以在代码中它使用的是file.getblocksize().
若文件的blocksize是32M,我们的文件是70M,而且文件是可以切分的,则系统是如何分片的呢?(根据源代码进行分析)
如果我们的minsize=1,maxsize=128,则计算得到的splitsize=32M,每一个block一个inputsplit.
如果我们的minsize=64,maxsize=128,则计算得到的splitsize=64M, 但因为不满足70/64>1.1的情况,所以还是只会分成一个fileinputsplit,这一个inputsplit包含了两个block的信息。
试想一下,如果还拆分成两个inputsplit让两个map task去做,第二个maptask只获取一点点的数据,利用率不高。
若我们的文件是xml文件类型,不管我们的文件是多大,都只能分给一个InputSplit去处理,因为它的isSplitable=false,xml不能切开处理,那样数据就会乱掉。
- /**
- * Generate the list of files and make them into FileSplits.
- * @param job the job context
- * @throws IOException
- */
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- Stopwatch sw = new Stopwatch().start();
- long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
- long maxSize = getMaxSplitSize(job);
- // generate splits
- List<InputSplit> splits = new ArrayList<InputSplit>();
- List<FileStatus> files = listStatus(job);
- for (FileStatus file: files) {
- Path path = file.getPath();
- long length = file.getLen();
- if (length != 0) {
- BlockLocation[] blkLocations;
- if (file instanceof LocatedFileStatus) {
- blkLocations = ((LocatedFileStatus) file).getBlockLocations();
- } else {
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- blkLocations = fs.getFileBlockLocations(file, 0, length);
- }
- if (isSplitable(job, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(blockSize, minSize, maxSize);
- long bytesRemaining = length;
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
- splits.add(makeSplit(path, length-bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts(),
- blkLocations[blkIndex].getCachedHosts()));
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining != 0) {
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
- splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkIndex].getHosts(),
- blkLocations[blkIndex].getCachedHosts()));
- }
- } else { // not splitable
- splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
- blkLocations[0].getCachedHosts()));
- }
- } else {
- //Create empty hosts array for zero length files
- splits.add(makeSplit(path, 0, length, new String[0]));
- }
- }
- // Save the number of input files for metrics/loadgen
- job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
- sw.stop();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Total # of splits generated by getSplits: " + splits.size()
- + ", TimeTaken: " + sw.elapsedMillis());
- }
- return splits;
- }