001package com.hadoop.mapreduce; 002 003import com.hadoop.compression.lzo.LzoIndex; 004import java.io.IOException; 005import org.apache.commons.logging.Log; 006import org.apache.commons.logging.LogFactory; 007import org.apache.hadoop.fs.FSDataOutputStream; 008import org.apache.hadoop.fs.FileSystem; 009import org.apache.hadoop.fs.Path; 010import org.apache.hadoop.io.LongWritable; 011import org.apache.hadoop.mapreduce.RecordWriter; 012import org.apache.hadoop.mapreduce.TaskAttemptContext; 013 014public class LzoIndexRecordWriter extends RecordWriter<Path, LongWritable> { 015 016 private static final Log LOG = LogFactory.getLog(LzoIndexRecordWriter.class); 017 018 private FSDataOutputStream outputStream; 019 private final TaskAttemptContext context; 020 021 private FileSystem fs; 022 private Path inputPath; 023 private Path tmpIndexPath; 024 private Path realIndexPath; 025 026 public LzoIndexRecordWriter(TaskAttemptContext taskAttemptContext) { 027 context = taskAttemptContext; 028 } 029 030 @Override 031 public void write(Path path, LongWritable offset) throws IOException, InterruptedException { 032 if (outputStream == null) { 033 // Set up the output file on the first record. 034 LOG.info("Setting up output stream to write index file for " + path); 035 outputStream = setupOutputFile(path); 036 } 037 offset.write(outputStream); 038 } 039 040 @Override 041 public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { 042 if (outputStream != null) { 043 // Close the output stream so that the tmp file is synced, then move it. 044 outputStream.close(); 045 046 LOG.info("In close, now renaming " + tmpIndexPath + " to final location " + realIndexPath); 047 // Rename, indexing completed. 048 fs.rename(tmpIndexPath, realIndexPath); 049 } 050 } 051 052 private FSDataOutputStream setupOutputFile(Path path) throws IOException { 053 fs = path.getFileSystem(context.getConfiguration()); 054 inputPath = path; 055 056 // For /a/b/c.lzo, tmpIndexPath = /a/b/c.lzo.index.tmp, 057 // and it is moved to realIndexPath = /a/b/c.lzo.index upon completion. 058 tmpIndexPath = path.suffix(LzoIndex.LZO_TMP_INDEX_SUFFIX); 059 realIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX); 060 061 // Delete the old index files if they exist. 062 fs.delete(tmpIndexPath, false); 063 fs.delete(realIndexPath, false); 064 065 return fs.create(tmpIndexPath, false); 066 } 067}