001/*
002 * This file is part of Hadoop-Gpl-Compression.
003 *
004 * Hadoop-Gpl-Compression is free software: you can redistribute it
005 * and/or modify it under the terms of the GNU General Public License
006 * as published by the Free Software Foundation, either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * Hadoop-Gpl-Compression is distributed in the hope that it will be
010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012 * GNU General Public License for more details.
013 *
014 * You should have received a copy of the GNU General Public License
015 * along with Hadoop-Gpl-Compression.  If not, see
016 * <http://www.gnu.org/licenses/>.
017 */
018package com.hadoop.mapred;
019
020import com.hadoop.compression.lzo.LzoIndex;
021import com.hadoop.compression.lzo.LzopCodec;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.io.LongWritable;
033import org.apache.hadoop.io.Text;
034import org.apache.hadoop.mapred.FileInputFormat;
035import org.apache.hadoop.mapred.FileSplit;
036import org.apache.hadoop.mapred.InputSplit;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapred.RecordReader;
039import org.apache.hadoop.mapred.Reporter;
040
041/**
042 * This class conforms to the old (org.apache.hadoop.mapred.*) hadoop API style 
043 * which is deprecated but still required in places.  Streaming, for example, 
044 * does a check that the given input format is a descendant of 
045 * org.apache.hadoop.mapred.InputFormat, which any InputFormat-derived class
046 * from the new API fails.  In order for streaming to work, you must use
047 * com.hadoop.mapred.DeprecatedLzoTextInputFormat, not 
048 * com.hadoop.mapreduce.LzoTextInputFormat.  The classes attempt to be alike in
049 * every other respect.
050 */
051@SuppressWarnings("deprecation")
052public class DeprecatedLzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
053
054    public static final String LZO_INDEX_SUFFIX = ".index";
055    private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
056
057    @Override
058    protected FileStatus[] listStatus(JobConf conf) throws IOException {
059        List<FileStatus> files = new ArrayList<FileStatus>(Arrays.asList(super.listStatus(conf)));
060
061        String fileExtension = new LzopCodec().getDefaultExtension();
062
063        Iterator<FileStatus> it = files.iterator();
064        while (it.hasNext()) {
065            FileStatus fileStatus = it.next();
066            Path file = fileStatus.getPath();
067
068            if (!file.toString().endsWith(fileExtension)) {
069                // Get rid of non-LZO files.
070                it.remove();
071            } else {
072                FileSystem fs = file.getFileSystem(conf);
073                LzoIndex index = LzoIndex.readIndex(fs, file);
074                indexes.put(file, index);
075            }
076        }
077
078        return files.toArray(new FileStatus[]{});
079    }
080
081    @Override
082    protected boolean isSplitable(FileSystem fs, Path filename) {
083        LzoIndex index = indexes.get(filename);
084        return !index.isEmpty();
085    }
086
087    @Override
088    public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
089        FileSplit[] splits = (FileSplit[]) super.getSplits(conf, numSplits);
090        // Find new starts/ends of the filesplit that align with the LZO blocks.
091
092        List<FileSplit> result = new ArrayList<FileSplit>();
093
094        for (FileSplit fileSplit : splits) {
095            Path file = fileSplit.getPath();
096            FileSystem fs = file.getFileSystem(conf);
097            LzoIndex index = indexes.get(file);
098            if (index == null) {
099                throw new IOException("Index not found for " + file);
100            }
101            if (index.isEmpty()) {
102                // Empty index, keep it as is.
103                result.add(fileSplit);
104                continue;
105            }
106
107            long start = fileSplit.getStart();
108            long end = start + fileSplit.getLength();
109
110            long lzoStart = index.alignSliceStartToIndex(start, end);
111            long lzoEnd = index.alignSliceEndToIndex(end, fs.getFileStatus(file).getLen());
112
113            if (lzoStart != LzoIndex.NOT_FOUND && lzoEnd != LzoIndex.NOT_FOUND) {
114                result.add(new FileSplit(file, lzoStart, lzoEnd - lzoStart, fileSplit.getLocations()));
115            }
116        }
117
118        return result.toArray(new FileSplit[result.size()]);
119    }
120
121    @Override
122    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
123            JobConf conf, Reporter reporter) throws IOException {
124        reporter.setStatus(split.toString());
125        return new DeprecatedLzoLineRecordReader(conf, (FileSplit) split);
126    }
127
128}