import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter;
import org.apache.hadoop.mapreduce.security.TokenCache;

import com.google.common.base.Charsets;

public class MyFileinput extends FileInputFormat<LongWritable, Text> {

	private static final PathFilter hiddenFileFilter = new PathFilter() {
		public boolean accept(Path p) {
			String name = p.getName();
			return ((!(name.startsWith("_"))) && (!(name.startsWith("."))));
		}
	};
	
	// 遍历文件列表, 过滤掉_ .开头的文件(可以自定义过滤)
	protected List<FileStatus> listStatus(JobContext job) throws IOException {
		System.out.println("*********************");
		List result = new ArrayList();
		Path[] dirs = getInputPaths(job);
		System.out.println("dirs" + dirs);
		System.out.println("dirs length = " + dirs.length);
		for(Path p: dirs){
			System.out.println("Path loop " + p);
		}

		if (dirs.length == 0) {
			throw new IOException("No input paths specified in job");
		}

		TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
				job.getConfiguration());

		List errors = new ArrayList();

		List filters = new ArrayList();
		filters.add(hiddenFileFilter);
		PathFilter jobFilter = getInputPathFilter(job);
		if (jobFilter != null) {
			filters.add(jobFilter);
		}

		// 过滤函数,可以拓展
		PathFilter inputFilter = new MultiPathFilter(filters);

		for (int i = 0; i < dirs.length; ++i) {
			Path p = dirs[i];
			FileSystem fs = p.getFileSystem(job.getConfiguration());
			FileStatus[] matches = fs.globStatus(p, inputFilter);
			System.out.println("matches=" + matches);
			for(FileStatus match: matches){
				System.out.println("loop matches" + match.getPath());
			}
			
			if (matches == null)
				errors.add(new IOException("Input path does not exist: " + p));
			else if (matches.length == 0)
				errors.add(new IOException("Input Pattern " + p
						+ " matches 0 files"));
			else {
				for (FileStatus globStat : matches) {
					System.out.println("globStat " + globStat);
					if (globStat.isDirectory())
						for (FileStatus stat : fs.listStatus(
								globStat.getPath(), inputFilter)) {
							result.add(stat);
						}
					else {
						result.add(globStat);
					}
				}
			}
		}

		if (!(errors.isEmpty())) {
			throw new InvalidInputException(errors);
		}
		// LOG.info("Total input paths to process : " + result.size());
		return result;
	}
	
	// 计算分片大小,返回一个分片列表
	public List<InputSplit> getSplits(JobContext job) throws IOException {
		long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
		long maxSize = getMaxSplitSize(job);
		
		System.out.print("minSize " + minSize);
		System.out.print("maxSize " + maxSize);
		
		List splits = new ArrayList();
		// 获取输入目录下的文件列表(过滤文件)
		List<FileStatus> files = listStatus(job);
		for (FileStatus file : files) {
			Path path = file.getPath();
			long length = file.getLen();
			System.out.println("path: " + path+ " file len = " + length);
			if (length != 0L) {
				// 通过路径找到块列表 
				FileSystem fs = path.getFileSystem(job.getConfiguration());
				BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
						0L, length);
				
				if (isSplitable(job, path)) {
					long blockSize = file.getBlockSize();
					System.out.println("blockSize:" + blockSize);
					long splitSize = computeSplitSize(blockSize, minSize,
							maxSize);
					System.out.println("splitSize :" + splitSize);

					long bytesRemaining = length;
					System.out.println("bytesRemaining :" + bytesRemaining);
					
					System.out.println(bytesRemaining / splitSize);
					// 定义为1.1D, 为避免一个分片过小, 也需要启动一个MAP来运行
					// 最后剩余的文件大小只要不超过分片大小的1.1倍都会放入一个分片
					while (bytesRemaining / splitSize > 1.1D) {
						int blkIndex = getBlockIndex(blkLocations, length
								- bytesRemaining);
						System.out.println("blkIndex :" + blkIndex);
						
						// 添加到分片分片列表
						splits.add(makeSplit(path, length - bytesRemaining,
								splitSize, blkLocations[blkIndex].getHosts()));

						bytesRemaining -= splitSize;
					}
					
					// 文件尾 
					if (bytesRemaining != 0L) {
						Long remain = length - bytesRemaining;
						System.out.println("文件尾大小" + bytesRemaining);
						int blkIndex = getBlockIndex(blkLocations, length
								- bytesRemaining);
						splits.add(makeSplit(path, length - bytesRemaining,
								bytesRemaining,
								blkLocations[blkIndex].getHosts()));
					}
				} else {
					splits.add(makeSplit(path, 0L, length,
							blkLocations[0].getHosts()));
				}
			} else {
				// 测试文件大小为0, 也会启动一个map
				splits.add(makeSplit(path, 0L, length, new String[0]));
			}
		}
			
		job.getConfiguration().setLong(
				"mapreduce.input.fileinputformat.numinputfiles", files.size());
		// LOG.debug("Total # of splits: " + splits.size());
		return splits;
	}

	private static class MultiPathFilter implements PathFilter {
		private List<PathFilter> filters;

		public MultiPathFilter(List<PathFilter> filters) {
			this.filters = filters;
		}

		public boolean accept(Path path) {
			for (PathFilter filter : this.filters) {
				if (!(filter.accept(path))) {
					return false;
				}
			}
			return true;
		}
	}
	
	// 文件内容读取, 默认按行读取 
	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context) {
		String delimiter = context.getConfiguration().get(
				"textinputformat.record.delimiter");
		
		System.out.println("delimiter ==" + delimiter);
		// 默认为空
		byte[] recordDelimiterBytes = null;
		if (null != delimiter)
			recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
		
		return new LineRecordReader(recordDelimiterBytes);
	}
}

主要功能是计算分片和按照分片给MAP任务读取内容

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext paramJobContext)
            throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(
            InputSplit paramInputSplit,
            TaskAttemptContext paramTaskAttemptContext) throws IOException,
            InterruptedException;
}

从顶层的派生类提供的接口差不多也能看出来。

 

最简单的Informat实现, 然后我们只要实现RecordReader就可以了

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import com.google.common.base.Charsets;

public class MySimpleInformat<V> extends FileInputFormat<LongWritable, V>
{    
	protected boolean isSplitable(JobContext context, Path filename) {
		// 是否需要分片
		return false;
	}    
    
    @Override
    public RecordReader<LongWritable, V> createRecordReader(
            InputSplit split, TaskAttemptContext context) {
        String delimiter = context.getConfiguration().get(
                "textinputformat.record.delimiter");
         
        System.out.println("delimiter ==" + delimiter);
        // 默认为空
        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
         
        return (RecordReader<LongWritable, V>) new LineRecordReader(recordDelimiterBytes);
    }
}

 

版权声明:本文为chengxin1982原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/chengxin1982/p/3849142.html