001/*
002 * This file is part of Hadoop-Gpl-Compression.
003 *
004 * Hadoop-Gpl-Compression is free software: you can redistribute it
005 * and/or modify it under the terms of the GNU General Public License
006 * as published by the Free Software Foundation, either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * Hadoop-Gpl-Compression is distributed in the hope that it will be
010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012 * GNU General Public License for more details.
013 *
014 * You should have received a copy of the GNU General Public License
015 * along with Hadoop-Gpl-Compression.  If not, see
016 * <http://www.gnu.org/licenses/>.
017 */
018package com.hadoop.compression.lzo;
019
020import java.io.IOException;
021import java.net.URI;
022import java.text.DecimalFormat;
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029
030public class LzoIndexer {
031
032    private static final Log LOG = LogFactory.getLog(LzoIndexer.class);
033
034    private final Configuration conf_;
035    private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();
036    private final String INDENT_STRING = "  ";
037    private final DecimalFormat df_;
038
039    public LzoIndexer(Configuration conf) {
040        conf_ = conf;
041        df_ = new DecimalFormat("#0.00");
042    }
043
044    /**
045     * Index the file given by lzoUri in its default filesystem.
046     * 
047     * @param lzoPath The file to index.
048     * @throws IOException
049     */
050    public void index(Path lzoPath) throws IOException {
051        indexInternal(lzoPath, 0);
052    }
053
054    /**
055     * Return indented space for pretty printing.
056     * 
057     * @param nestingLevel The indentation level.
058     * @return Indented space for the given indentation level.
059     */
060    private String getNesting(int nestingLevel) {
061        StringBuilder sb = new StringBuilder();
062        for (int i = 0; i < nestingLevel; i++) {
063            sb.append(INDENT_STRING);
064        }
065        return sb.toString();
066    }
067
068    /**
069     * Lzo index a given path, calling recursively to index directories when encountered.
070     * Files are only indexed if they end in .lzo and have no existing .lzo.index file.
071     * 
072     * @param lzoPath The base path to index.
073     * @param nestingLevel For pretty printing, the nesting level.
074     * @throws IOException
075     */
076    private void indexInternal(Path lzoPath, int nestingLevel) throws IOException {
077        FileSystem fs = FileSystem.get(URI.create(lzoPath.toString()), conf_);
078        FileStatus fileStatus = fs.getFileStatus(lzoPath);
079
080        // Recursively walk
081        if (fileStatus.isDir()) {
082            LOG.info(getNesting(nestingLevel) + "LZO Indexing directory " + lzoPath + "...");
083            FileStatus[] statuses = fs.listStatus(lzoPath);
084            for (FileStatus childStatus : statuses) {
085                indexInternal(childStatus.getPath(), nestingLevel + 1);
086            }
087        } else if (lzoPath.toString().endsWith(LZO_EXTENSION)) {
088            Path lzoIndexPath = new Path(lzoPath.toString() + LzoIndex.LZO_INDEX_SUFFIX);
089            if (fs.exists(lzoIndexPath)) {
090                LOG.info(getNesting(nestingLevel) + "[SKIP] LZO index file already exists for " + lzoPath + "\n");
091            } else {
092                long startTime = System.currentTimeMillis();
093                long fileSize = fileStatus.getLen();
094
095                LOG.info(getNesting(nestingLevel) + "[INDEX] LZO Indexing file " + lzoPath + ", size "
096                        + df_.format(fileSize / (1024.0 * 1024.0 * 1024.0)) + " GB...");
097                if (indexSingleFile(fs, lzoPath)) {
098                    long indexSize = fs.getFileStatus(lzoIndexPath).getLen();
099                    double elapsed = (System.currentTimeMillis() - startTime) / 1000.0;
100                    LOG.info(getNesting(nestingLevel) + "Completed LZO Indexing in " + df_.format(elapsed) + " seconds ("
101                            + df_.format(fileSize / (1024.0 * 1024.0 * elapsed)) + " MB/s).  Index size is "
102                            + df_.format(indexSize / 1024.0) + " KB.\n");
103                }
104            }
105        }
106    }
107
108    /**
109     * Create an lzo index for a single file in HDFS.
110     * @param fs The filesystem object.
111     * @param lzoPath The path to index (must be a file, not a directory).
112     * @return true if indexing succeeded.
113     */
114    private boolean indexSingleFile(FileSystem fs, Path lzoPath) {
115        try {
116            LzoIndex.createIndex(fs, lzoPath);
117            return true;
118        } catch (IOException e) {
119            LOG.error("Error indexing " + lzoPath, e);
120            return false;
121        }
122    }
123
124    /**
125     * Run the LzoIndexer on each argument passed via stdin.  The files should be HDFS locations.
126     */
127    public static void main(String[] args) {
128        if (args.length == 0) {
129            printUsage();
130            System.exit(1);
131        }
132
133        LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());
134        for (String arg : args) {
135            try {
136                lzoIndexer.index(new Path(arg));
137            } catch (IOException e) {
138                LOG.error("Error indexing " + arg, e);
139            }
140        }
141    }
142
143    public static void printUsage() {
144        System.out.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.LzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]");
145    }
146}