001package com.hadoop.mapreduce; 002 003import java.io.EOFException; 004import java.io.IOException; 005import org.anarres.lzo.LzopInputStream; 006import org.apache.commons.logging.Log; 007import org.apache.commons.logging.LogFactory; 008import org.apache.hadoop.conf.Configurable; 009import org.apache.hadoop.conf.Configuration; 010import org.apache.hadoop.fs.FSDataInputStream; 011import org.apache.hadoop.fs.FileSystem; 012import org.apache.hadoop.fs.Path; 013import org.apache.hadoop.io.LongWritable; 014import org.apache.hadoop.io.compress.CompressionCodec; 015import org.apache.hadoop.io.compress.CompressionCodecFactory; 016import org.apache.hadoop.mapreduce.InputSplit; 017import org.apache.hadoop.mapreduce.RecordReader; 018import org.apache.hadoop.mapreduce.TaskAttemptContext; 019import org.apache.hadoop.mapreduce.lib.input.FileSplit; 020 021public class LzoSplitRecordReader extends RecordReader<Path, LongWritable> { 022 023 private static final Log LOG = LogFactory.getLog(LzoSplitRecordReader.class); 024 025 private final int LOG_EVERY_N_BLOCKS = 1000; 026 027 private final LongWritable curValue = new LongWritable(-1); 028 private FSDataInputStream rawInputStream; 029 private TaskAttemptContext context; 030 031 private int numBlocksRead = 0; 032 private int numDecompressedChecksums = -1; 033 private int numCompressedChecksums = -1; 034 private long totalFileSize = 0; 035 private Path lzoFile; 036 037 @Override 038 public void initialize(InputSplit genericSplit, TaskAttemptContext taskAttemptContext) throws IOException { 039 context = taskAttemptContext; 040 FileSplit fileSplit = (FileSplit) genericSplit; 041 lzoFile = fileSplit.getPath(); 042 // The LzoSplitInputFormat is not splittable, so the split length is the whole file. 043 totalFileSize = fileSplit.getLength(); 044 045 // Jump through some hoops to create the lzo codec. 046 Configuration conf = context.getConfiguration(); 047 CompressionCodecFactory factory = new CompressionCodecFactory(conf); 048 CompressionCodec codec = factory.getCodec(lzoFile); 049 ((Configurable) codec).setConf(conf); 050 051 FileSystem fs = lzoFile.getFileSystem(conf); 052 rawInputStream = fs.open(lzoFile); 053 054 // Creating the LzopInputStream here just reads the lzo header for us, nothing more. 055 // We do the rest of our input off of the raw stream is. 056 LzopInputStream lzis = new LzopInputStream(rawInputStream); 057 058 // This must be called AFTER createInputStream is called, because createInputStream 059 // is what reads the header, which has the checksum information. Otherwise getChecksumsCount 060 // erroneously returns zero, and all block offsets will be wrong. 061 numCompressedChecksums = lzis.getCompressedChecksumCount(); 062 numDecompressedChecksums = lzis.getUncompressedChecksumCount(); 063 } 064 065 @Override 066 public boolean nextKeyValue() throws IOException { 067 int uncompressedBlockSize = rawInputStream.readInt(); 068 if (uncompressedBlockSize == 0) { 069 // An uncompressed block size of zero means end of file. 070 return false; 071 } else if (uncompressedBlockSize < 0) { 072 throw new EOFException("Could not read uncompressed block size at position " 073 + rawInputStream.getPos() + " in file " + lzoFile); 074 } 075 076 int compressedBlockSize = rawInputStream.readInt(); 077 if (compressedBlockSize <= 0) { 078 throw new EOFException("Could not read compressed block size at position " 079 + rawInputStream.getPos() + " in file " + lzoFile); 080 } 081 082 // See LzopInputStream.getCompressedData 083 boolean isUncompressedBlock = (uncompressedBlockSize == compressedBlockSize); 084 int numChecksumsToSkip = isUncompressedBlock 085 ? numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums; 086 087 // Get the current position. Since we've read two ints, the current block started 8 bytes ago. 088 long pos = rawInputStream.getPos(); 089 curValue.set(pos - 8); 090 // Seek beyond the checksums and beyond the block data to the beginning of the next block. 091 rawInputStream.seek(pos + compressedBlockSize + (4 * numChecksumsToSkip)); 092 ++numBlocksRead; 093 094 // Log some progress every so often. 095 if (numBlocksRead % LOG_EVERY_N_BLOCKS == 0) { 096 LOG.info("Reading block " + numBlocksRead + " at pos " + pos + " of " + totalFileSize + ". Read is " 097 + (100.0 * getProgress()) + "% done. "); 098 } 099 100 return true; 101 } 102 103 @Override 104 public Path getCurrentKey() { 105 return lzoFile; 106 } 107 108 @Override 109 public LongWritable getCurrentValue() { 110 return curValue; 111 } 112 113 @Override 114 public float getProgress() throws IOException { 115 if (totalFileSize == 0) { 116 return 0.0f; 117 } else { 118 return (float) rawInputStream.getPos() / totalFileSize; 119 } 120 } 121 122 @Override 123 public void close() throws IOException { 124 LOG.info("Closing input stream after reading " + numBlocksRead + " blocks from " + lzoFile); 125 rawInputStream.close(); 126 } 127}