001package com.hadoop.compression.lzo; 002 003import com.hadoop.mapreduce.LzoIndexOutputFormat; 004import com.hadoop.mapreduce.LzoSplitInputFormat; 005import java.io.IOException; 006import java.util.ArrayList; 007import java.util.Arrays; 008import java.util.List; 009import org.apache.commons.logging.Log; 010import org.apache.commons.logging.LogFactory; 011import org.apache.hadoop.conf.Configuration; 012import org.apache.hadoop.conf.Configured; 013import org.apache.hadoop.fs.FileStatus; 014import org.apache.hadoop.fs.FileSystem; 015import org.apache.hadoop.fs.Path; 016import org.apache.hadoop.fs.PathFilter; 017import org.apache.hadoop.io.LongWritable; 018import org.apache.hadoop.mapreduce.Job; 019import org.apache.hadoop.mapreduce.Mapper; 020import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 021import org.apache.hadoop.util.Tool; 022import org.apache.hadoop.util.ToolRunner; 023 024public class DistributedLzoIndexer extends Configured implements Tool { 025 026 private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class); 027 private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); 028 029 private final PathFilter nonTemporaryFilter = new PathFilter() { 030 @Override 031 public boolean accept(Path path) { 032 return !path.toString().endsWith("/_temporary"); 033 } 034 }; 035 036 private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) { 037 try { 038 FileSystem fs = path.getFileSystem(getConf()); 039 FileStatus fileStatus = fs.getFileStatus(path); 040 041 if (fileStatus.isDir()) { 042 FileStatus[] children = fs.listStatus(path, pathFilter); 043 for (FileStatus childStatus : children) { 044 walkPath(childStatus.getPath(), pathFilter, accumulator); 045 } 046 } else if (path.toString().endsWith(LZO_EXTENSION)) { 047 Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX); 048 if (fs.exists(lzoIndexPath)) { 049 // If the index exists and is of nonzero size, we're already done. 050 // We re-index a file with a zero-length index, because every file has at least one block. 051 if (fs.getFileStatus(lzoIndexPath).getLen() > 0) { 052 LOG.info("[SKIP] LZO index file already exists for " + path); 053 return; 054 } else { 055 LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)"); 056 accumulator.add(path); 057 } 058 } else { 059 // If no index exists, we need to index the file. 060 LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)"); 061 accumulator.add(path); 062 } 063 } 064 } catch (IOException ioe) { 065 LOG.warn("Error walking path: " + path, ioe); 066 } 067 } 068 069 @Override 070 public int run(String[] args) throws Exception { 071 if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) { 072 printUsage(); 073 ToolRunner.printGenericCommandUsage(System.err); 074 return -1; 075 } 076 077 List<Path> inputPaths = new ArrayList<Path>(); 078 for (String strPath : args) { 079 walkPath(new Path(strPath), nonTemporaryFilter, inputPaths); 080 } 081 082 if (inputPaths.isEmpty()) { 083 System.err.println("No input paths found - perhaps all " 084 + ".lzo files have already been indexed."); 085 return 0; 086 } 087 088 Configuration conf = new Configuration(); 089 Job job = new Job(conf); 090 job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args)); 091 092 job.setOutputKeyClass(Path.class); 093 job.setOutputValueClass(LongWritable.class); 094 095 // The LzoIndexOutputFormat doesn't currently work with speculative execution. 096 // Patches welcome. 097 job.getConfiguration().setBoolean( 098 "mapred.map.tasks.speculative.execution", false); 099 100 job.setJarByClass(DistributedLzoIndexer.class); 101 job.setInputFormatClass(LzoSplitInputFormat.class); 102 job.setOutputFormatClass(LzoIndexOutputFormat.class); 103 job.setNumReduceTasks(0); 104 job.setMapperClass(Mapper.class); 105 106 for (Path p : inputPaths) { 107 FileInputFormat.addInputPath(job, p); 108 } 109 110 return job.waitForCompletion(true) ? 0 : 1; 111 } 112 113 public static void main(String[] args) throws Exception { 114 int exitCode = ToolRunner.run(new DistributedLzoIndexer(), args); 115 System.exit(exitCode); 116 } 117 118 public static void printUsage() { 119 System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]"); 120 } 121}