001package com.hadoop.compression.lzo;
002
003import com.hadoop.mapreduce.LzoIndexOutputFormat;
004import com.hadoop.mapreduce.LzoSplitInputFormat;
005import java.io.IOException;
006import java.util.ArrayList;
007import java.util.Arrays;
008import java.util.List;
009import org.apache.commons.logging.Log;
010import org.apache.commons.logging.LogFactory;
011import org.apache.hadoop.conf.Configuration;
012import org.apache.hadoop.conf.Configured;
013import org.apache.hadoop.fs.FileStatus;
014import org.apache.hadoop.fs.FileSystem;
015import org.apache.hadoop.fs.Path;
016import org.apache.hadoop.fs.PathFilter;
017import org.apache.hadoop.io.LongWritable;
018import org.apache.hadoop.mapreduce.Job;
019import org.apache.hadoop.mapreduce.Mapper;
020import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
021import org.apache.hadoop.util.Tool;
022import org.apache.hadoop.util.ToolRunner;
023
024public class DistributedLzoIndexer extends Configured implements Tool {
025
026    private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class);
027    private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();
028
029    private final PathFilter nonTemporaryFilter = new PathFilter() {
030        @Override
031        public boolean accept(Path path) {
032            return !path.toString().endsWith("/_temporary");
033        }
034    };
035
036    private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
037        try {
038            FileSystem fs = path.getFileSystem(getConf());
039            FileStatus fileStatus = fs.getFileStatus(path);
040
041            if (fileStatus.isDir()) {
042                FileStatus[] children = fs.listStatus(path, pathFilter);
043                for (FileStatus childStatus : children) {
044                    walkPath(childStatus.getPath(), pathFilter, accumulator);
045                }
046            } else if (path.toString().endsWith(LZO_EXTENSION)) {
047                Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
048                if (fs.exists(lzoIndexPath)) {
049          // If the index exists and is of nonzero size, we're already done.
050                    // We re-index a file with a zero-length index, because every file has at least one block.
051                    if (fs.getFileStatus(lzoIndexPath).getLen() > 0) {
052                        LOG.info("[SKIP] LZO index file already exists for " + path);
053                        return;
054                    } else {
055                        LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
056                        accumulator.add(path);
057                    }
058                } else {
059                    // If no index exists, we need to index the file.
060                    LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
061                    accumulator.add(path);
062                }
063            }
064        } catch (IOException ioe) {
065            LOG.warn("Error walking path: " + path, ioe);
066        }
067    }
068
069    @Override
070    public int run(String[] args) throws Exception {
071        if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
072            printUsage();
073            ToolRunner.printGenericCommandUsage(System.err);
074            return -1;
075        }
076
077        List<Path> inputPaths = new ArrayList<Path>();
078        for (String strPath : args) {
079            walkPath(new Path(strPath), nonTemporaryFilter, inputPaths);
080        }
081
082        if (inputPaths.isEmpty()) {
083            System.err.println("No input paths found - perhaps all "
084                    + ".lzo files have already been indexed.");
085            return 0;
086        }
087
088        Configuration conf = new Configuration();
089        Job job = new Job(conf);
090        job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args));
091
092        job.setOutputKeyClass(Path.class);
093        job.setOutputValueClass(LongWritable.class);
094
095    // The LzoIndexOutputFormat doesn't currently work with speculative execution.
096        // Patches welcome.
097        job.getConfiguration().setBoolean(
098                "mapred.map.tasks.speculative.execution", false);
099
100        job.setJarByClass(DistributedLzoIndexer.class);
101        job.setInputFormatClass(LzoSplitInputFormat.class);
102        job.setOutputFormatClass(LzoIndexOutputFormat.class);
103        job.setNumReduceTasks(0);
104        job.setMapperClass(Mapper.class);
105
106        for (Path p : inputPaths) {
107            FileInputFormat.addInputPath(job, p);
108        }
109
110        return job.waitForCompletion(true) ? 0 : 1;
111    }
112
113    public static void main(String[] args) throws Exception {
114        int exitCode = ToolRunner.run(new DistributedLzoIndexer(), args);
115        System.exit(exitCode);
116    }
117
118    public static void printUsage() {
119        System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]");
120    }
121}