001/*
002 * This file is part of Hadoop-Gpl-Compression.
003 *
004 * Hadoop-Gpl-Compression is free software: you can redistribute it
005 * and/or modify it under the terms of the GNU General Public License
006 * as published by the Free Software Foundation, either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * Hadoop-Gpl-Compression is distributed in the hope that it will be
010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012 * GNU General Public License for more details.
013 *
014 * You should have received a copy of the GNU General Public License
015 * along with Hadoop-Gpl-Compression.  If not, see
016 * <http://www.gnu.org/licenses/>.
017 */
018package com.hadoop.mapred;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FSDataInputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.io.LongWritable;
026import org.apache.hadoop.io.Text;
027import org.apache.hadoop.io.compress.CompressionCodec;
028import org.apache.hadoop.io.compress.CompressionCodecFactory;
029import org.apache.hadoop.mapred.FileSplit;
030import org.apache.hadoop.mapred.RecordReader;
031import org.apache.hadoop.util.LineReader;
032
033@SuppressWarnings("deprecation")
034public class DeprecatedLzoLineRecordReader implements RecordReader<LongWritable, Text> {
035
036    private CompressionCodecFactory codecFactory = null;
037    private long start;
038    private long pos;
039    private final long end;
040    private final LineReader in;
041    private final FSDataInputStream fileIn;
042
043    DeprecatedLzoLineRecordReader(Configuration conf, FileSplit split) throws IOException {
044        start = split.getStart();
045        end = start + split.getLength();
046        final Path file = split.getPath();
047
048        FileSystem fs = file.getFileSystem(conf);
049        codecFactory = new CompressionCodecFactory(conf);
050        final CompressionCodec codec = codecFactory.getCodec(file);
051        if (codec == null) {
052            throw new IOException("No LZO codec found, cannot run.");
053        }
054
055        // Open the file and seek to the next split.
056        fileIn = fs.open(file);
057        // Create input stream and read the file header.
058        in = new LineReader(codec.createInputStream(fileIn), conf);
059        if (start != 0) {
060            fileIn.seek(start);
061
062            // Read and ignore the first line.
063            in.readLine(new Text());
064            start = fileIn.getPos();
065        }
066
067        pos = start;
068    }
069
070    public LongWritable createKey() {
071        return new LongWritable();
072    }
073
074    public Text createValue() {
075        return new Text();
076    }
077
078    public boolean next(LongWritable key, Text value) throws IOException {
079    // Since the LZOP codec reads everything in LZO blocks, we can't stop if pos == end.
080        // Instead, wait for the next block to be read in when pos will be > end.
081        while (pos <= end) {
082            key.set(pos);
083
084            int newSize = in.readLine(value);
085            if (newSize == 0) {
086                return false;
087            }
088            pos = fileIn.getPos();
089            return true;
090        }
091        return false;
092    }
093
094    public float getProgress() throws IOException {
095        if (start == end) {
096            return 0.0f;
097        } else {
098            return Math.min(1.0f, (pos - start) / (float) (end - start));
099        }
100    }
101
102    public synchronized long getPos() throws IOException {
103        return pos;
104    }
105
106    public synchronized void close() throws IOException {
107        if (in != null) {
108            in.close();
109        }
110    }
111}