001package com.hadoop.mapreduce;
002
003import com.hadoop.compression.lzo.LzoIndex;
004import java.io.IOException;
005import org.apache.commons.logging.Log;
006import org.apache.commons.logging.LogFactory;
007import org.apache.hadoop.fs.FSDataOutputStream;
008import org.apache.hadoop.fs.FileSystem;
009import org.apache.hadoop.fs.Path;
010import org.apache.hadoop.io.LongWritable;
011import org.apache.hadoop.mapreduce.RecordWriter;
012import org.apache.hadoop.mapreduce.TaskAttemptContext;
013
014public class LzoIndexRecordWriter extends RecordWriter<Path, LongWritable> {
015
016    private static final Log LOG = LogFactory.getLog(LzoIndexRecordWriter.class);
017
018    private FSDataOutputStream outputStream;
019    private final TaskAttemptContext context;
020
021    private FileSystem fs;
022    private Path inputPath;
023    private Path tmpIndexPath;
024    private Path realIndexPath;
025
026    public LzoIndexRecordWriter(TaskAttemptContext taskAttemptContext) {
027        context = taskAttemptContext;
028    }
029
030    @Override
031    public void write(Path path, LongWritable offset) throws IOException, InterruptedException {
032        if (outputStream == null) {
033            // Set up the output file on the first record.
034            LOG.info("Setting up output stream to write index file for " + path);
035            outputStream = setupOutputFile(path);
036        }
037        offset.write(outputStream);
038    }
039
040    @Override
041    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
042        if (outputStream != null) {
043            // Close the output stream so that the tmp file is synced, then move it.
044            outputStream.close();
045
046            LOG.info("In close, now renaming " + tmpIndexPath + " to final location " + realIndexPath);
047            // Rename, indexing completed.
048            fs.rename(tmpIndexPath, realIndexPath);
049        }
050    }
051
052    private FSDataOutputStream setupOutputFile(Path path) throws IOException {
053        fs = path.getFileSystem(context.getConfiguration());
054        inputPath = path;
055
056    // For /a/b/c.lzo, tmpIndexPath = /a/b/c.lzo.index.tmp,
057        // and it is moved to realIndexPath = /a/b/c.lzo.index upon completion.
058        tmpIndexPath = path.suffix(LzoIndex.LZO_TMP_INDEX_SUFFIX);
059        realIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
060
061        // Delete the old index files if they exist.
062        fs.delete(tmpIndexPath, false);
063        fs.delete(realIndexPath, false);
064
065        return fs.create(tmpIndexPath, false);
066    }
067}