001package com.hadoop.mapreduce;
002
003import java.io.EOFException;
004import java.io.IOException;
005import org.anarres.lzo.LzopInputStream;
006import org.apache.commons.logging.Log;
007import org.apache.commons.logging.LogFactory;
008import org.apache.hadoop.conf.Configurable;
009import org.apache.hadoop.conf.Configuration;
010import org.apache.hadoop.fs.FSDataInputStream;
011import org.apache.hadoop.fs.FileSystem;
012import org.apache.hadoop.fs.Path;
013import org.apache.hadoop.io.LongWritable;
014import org.apache.hadoop.io.compress.CompressionCodec;
015import org.apache.hadoop.io.compress.CompressionCodecFactory;
016import org.apache.hadoop.mapreduce.InputSplit;
017import org.apache.hadoop.mapreduce.RecordReader;
018import org.apache.hadoop.mapreduce.TaskAttemptContext;
019import org.apache.hadoop.mapreduce.lib.input.FileSplit;
020
021public class LzoSplitRecordReader extends RecordReader<Path, LongWritable> {
022
023    private static final Log LOG = LogFactory.getLog(LzoSplitRecordReader.class);
024
025    private final int LOG_EVERY_N_BLOCKS = 1000;
026
027    private final LongWritable curValue = new LongWritable(-1);
028    private FSDataInputStream rawInputStream;
029    private TaskAttemptContext context;
030
031    private int numBlocksRead = 0;
032    private int numDecompressedChecksums = -1;
033    private int numCompressedChecksums = -1;
034    private long totalFileSize = 0;
035    private Path lzoFile;
036
037    @Override
038    public void initialize(InputSplit genericSplit, TaskAttemptContext taskAttemptContext) throws IOException {
039        context = taskAttemptContext;
040        FileSplit fileSplit = (FileSplit) genericSplit;
041        lzoFile = fileSplit.getPath();
042        // The LzoSplitInputFormat is not splittable, so the split length is the whole file.
043        totalFileSize = fileSplit.getLength();
044
045        // Jump through some hoops to create the lzo codec.
046        Configuration conf = context.getConfiguration();
047        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
048        CompressionCodec codec = factory.getCodec(lzoFile);
049        ((Configurable) codec).setConf(conf);
050
051        FileSystem fs = lzoFile.getFileSystem(conf);
052        rawInputStream = fs.open(lzoFile);
053
054    // Creating the LzopInputStream here just reads the lzo header for us, nothing more.
055        // We do the rest of our input off of the raw stream is.
056        LzopInputStream lzis = new LzopInputStream(rawInputStream);
057
058    // This must be called AFTER createInputStream is called, because createInputStream
059        // is what reads the header, which has the checksum information.  Otherwise getChecksumsCount
060        // erroneously returns zero, and all block offsets will be wrong.
061        numCompressedChecksums = lzis.getCompressedChecksumCount();
062        numDecompressedChecksums = lzis.getUncompressedChecksumCount();
063    }
064
065    @Override
066    public boolean nextKeyValue() throws IOException {
067        int uncompressedBlockSize = rawInputStream.readInt();
068        if (uncompressedBlockSize == 0) {
069            // An uncompressed block size of zero means end of file.
070            return false;
071        } else if (uncompressedBlockSize < 0) {
072            throw new EOFException("Could not read uncompressed block size at position "
073                    + rawInputStream.getPos() + " in file " + lzoFile);
074        }
075
076        int compressedBlockSize = rawInputStream.readInt();
077        if (compressedBlockSize <= 0) {
078            throw new EOFException("Could not read compressed block size at position "
079                    + rawInputStream.getPos() + " in file " + lzoFile);
080        }
081
082        // See LzopInputStream.getCompressedData
083        boolean isUncompressedBlock = (uncompressedBlockSize == compressedBlockSize);
084        int numChecksumsToSkip = isUncompressedBlock
085                ? numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums;
086
087        // Get the current position.  Since we've read two ints, the current block started 8 bytes ago.
088        long pos = rawInputStream.getPos();
089        curValue.set(pos - 8);
090        // Seek beyond the checksums and beyond the block data to the beginning of the next block.
091        rawInputStream.seek(pos + compressedBlockSize + (4 * numChecksumsToSkip));
092        ++numBlocksRead;
093
094        // Log some progress every so often.
095        if (numBlocksRead % LOG_EVERY_N_BLOCKS == 0) {
096            LOG.info("Reading block " + numBlocksRead + " at pos " + pos + " of " + totalFileSize + ". Read is "
097                    + (100.0 * getProgress()) + "% done. ");
098        }
099
100        return true;
101    }
102
103    @Override
104    public Path getCurrentKey() {
105        return lzoFile;
106    }
107
108    @Override
109    public LongWritable getCurrentValue() {
110        return curValue;
111    }
112
113    @Override
114    public float getProgress() throws IOException {
115        if (totalFileSize == 0) {
116            return 0.0f;
117        } else {
118            return (float) rawInputStream.getPos() / totalFileSize;
119        }
120    }
121
122    @Override
123    public void close() throws IOException {
124        LOG.info("Closing input stream after reading " + numBlocksRead + " blocks from " + lzoFile);
125        rawInputStream.close();
126    }
127}