1. public class FileSplit extends InputSplit implements Writable {
  2. private Path file;
  3. private long start;
  4. private long length;
  5. private String[] hosts;
  6. private SplitLocationInfo[] hostInfos;
  7. }

当InputSplit尺寸大于block并且其对应的所有block(包含副本)不在同一个节点上时,Map Task不可能完全实现数据的本地化,

也就是说,总有一部分数据需要从远程节点上读取,因此得出,当使用基于FileInputFormat实现InputFormat时,为了提高数据本地性,应该尽量使InputSplit大小与block大小一致。

因为不同的文件,在上传的时候可以具体指定blocksize,若不指定则使用系统默认的blocksize,所以在代码中它使用的是file.getblocksize().

若文件的blocksize是32M,我们的文件是70M,而且文件是可以切分的,则系统是如何分片的呢?(根据源代码进行分析)

如果我们的minsize=1,maxsize=128,则计算得到的splitsize=32M,每一个block一个inputsplit.

如果我们的minsize=64,maxsize=128,则计算得到的splitsize=64M, 但因为不满足70/64>1.1的情况,所以还是只会分成一个fileinputsplit,这一个inputsplit包含了两个block的信息。

试想一下,如果还拆分成两个inputsplit让两个map task去做,第二个maptask只获取一点点的数据,利用率不高。

若我们的文件是xml文件类型,不管我们的文件是多大,都只能分给一个InputSplit去处理,因为它的isSplitable=false,xml不能切开处理,那样数据就会乱掉。

  1. /**
  2. * Generate the list of files and make them into FileSplits.
  3. * @param job the job context
  4. * @throws IOException
  5. */
  6. public List<InputSplit> getSplits(JobContext job) throws IOException {
  7. Stopwatch sw = new Stopwatch().start();
  8. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  9. long maxSize = getMaxSplitSize(job);
  10. // generate splits
  11. List<InputSplit> splits = new ArrayList<InputSplit>();
  12. List<FileStatus> files = listStatus(job);
  13. for (FileStatus file: files) {
  14. Path path = file.getPath();
  15. long length = file.getLen();
  16. if (length != 0) {
  17. BlockLocation[] blkLocations;
  18. if (file instanceof LocatedFileStatus) {
  19. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  20. } else {
  21. FileSystem fs = path.getFileSystem(job.getConfiguration());
  22. blkLocations = fs.getFileBlockLocations(file, 0, length);
  23. }
  24. if (isSplitable(job, path)) {
  25. long blockSize = file.getBlockSize();
  26. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  27. long bytesRemaining = length;
  28. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  29. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  30. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  31. blkLocations[blkIndex].getHosts(),
  32. blkLocations[blkIndex].getCachedHosts()));
  33. bytesRemaining -= splitSize;
  34. }
  35. if (bytesRemaining != 0) {
  36. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  37. splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  38. blkLocations[blkIndex].getHosts(),
  39. blkLocations[blkIndex].getCachedHosts()));
  40. }
  41. } else { // not splitable
  42. splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
  43. blkLocations[0].getCachedHosts()));
  44. }
  45. } else {
  46. //Create empty hosts array for zero length files
  47. splits.add(makeSplit(path, 0, length, new String[0]));
  48. }
  49. }
  50. // Save the number of input files for metrics/loadgen
  51. job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  52. sw.stop();
  53. if (LOG.isDebugEnabled()) {
  54. LOG.debug("Total # of splits generated by getSplits: " + splits.size()
  55. + ", TimeTaken: " + sw.elapsedMillis());
  56. }
  57. return splits;
  58. }

版权声明:本文为huaxiaoyao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/huaxiaoyao/p/4297178.html