001/*
002 * This file is part of Hadoop-Gpl-Compression.
003 *
004 * Hadoop-Gpl-Compression is free software: you can redistribute it
005 * and/or modify it under the terms of the GNU General Public License
006 * as published by the Free Software Foundation, either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * Hadoop-Gpl-Compression is distributed in the hope that it will be
010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012 * GNU General Public License for more details.
013 *
014 * You should have received a copy of the GNU General Public License
015 * along with Hadoop-Gpl-Compression.  If not, see
016 * <http://www.gnu.org/licenses/>.
017 */
018package com.hadoop.mapreduce;
019
020import com.hadoop.compression.lzo.LzoIndex;
021import com.hadoop.compression.lzo.LzopCodec;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.io.LongWritable;
033import org.apache.hadoop.io.Text;
034import org.apache.hadoop.mapreduce.InputFormat;
035import org.apache.hadoop.mapreduce.InputSplit;
036import org.apache.hadoop.mapreduce.JobContext;
037import org.apache.hadoop.mapreduce.RecordReader;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
040import org.apache.hadoop.mapreduce.lib.input.FileSplit;
041
042/**
043 * An {@link InputFormat} for lzop compressed text files. Files are broken into
044 * lines. Either linefeed or carriage-return are used to signal end of line.
045 * Keys are the position in the file, and values are the line of text.
046 */
047public class LzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
048
049    private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
050
051    @Override
052    protected List<FileStatus> listStatus(JobContext job) throws IOException {
053        List<FileStatus> files = super.listStatus(job);
054
055        String fileExtension = new LzopCodec().getDefaultExtension();
056        Configuration conf = job.getConfiguration();
057
058        for (Iterator<FileStatus> iterator = files.iterator(); iterator.hasNext();) {
059            FileStatus fileStatus = iterator.next();
060            Path file = fileStatus.getPath();
061            FileSystem fs = file.getFileSystem(conf);
062
063            if (!file.toString().endsWith(fileExtension)) {
064                //get rid of non lzo files
065                iterator.remove();
066            } else {
067                //read the index file
068                LzoIndex index = LzoIndex.readIndex(fs, file);
069                indexes.put(file, index);
070            }
071        }
072
073        return files;
074    }
075
076    @Override
077    protected boolean isSplitable(JobContext context, Path filename) {
078        LzoIndex index = indexes.get(filename);
079        return !index.isEmpty();
080    }
081
082    @Override
083    public List<InputSplit> getSplits(JobContext job) throws IOException {
084        List<InputSplit> splits = super.getSplits(job);
085        Configuration conf = job.getConfiguration();
086    // find new start/ends of the filesplit that aligns
087        // with the lzo blocks
088
089        List<InputSplit> result = new ArrayList<InputSplit>();
090
091        for (InputSplit genericSplit : splits) {
092            // load the index
093            FileSplit fileSplit = (FileSplit) genericSplit;
094            Path file = fileSplit.getPath();
095            FileSystem fs = file.getFileSystem(conf);
096            LzoIndex index = indexes.get(file);
097            if (index == null) {
098                throw new IOException("Index not found for " + file);
099            }
100
101            if (index.isEmpty()) {
102                // empty index, keep as is
103                result.add(fileSplit);
104                continue;
105            }
106
107            long start = fileSplit.getStart();
108            long end = start + fileSplit.getLength();
109
110            long lzoStart = index.alignSliceStartToIndex(start, end);
111            long lzoEnd = index.alignSliceEndToIndex(end, fs.getFileStatus(file).getLen());
112
113            if (lzoStart != LzoIndex.NOT_FOUND && lzoEnd != LzoIndex.NOT_FOUND) {
114                result.add(new FileSplit(file, lzoStart, lzoEnd - lzoStart, fileSplit.getLocations()));
115            }
116        }
117
118        return result;
119    }
120
121    @Override
122    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
123            TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
124
125        return new LzoLineRecordReader();
126    }
127}