001/* 002 * This file is part of Hadoop-Gpl-Compression. 003 * 004 * Hadoop-Gpl-Compression is free software: you can redistribute it 005 * and/or modify it under the terms of the GNU General Public License 006 * as published by the Free Software Foundation, either version 3 of 007 * the License, or (at your option) any later version. 008 * 009 * Hadoop-Gpl-Compression is distributed in the hope that it will be 010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty 011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 012 * GNU General Public License for more details. 013 * 014 * You should have received a copy of the GNU General Public License 015 * along with Hadoop-Gpl-Compression. If not, see 016 * <http://www.gnu.org/licenses/>. 017 */ 018package com.hadoop.compression.lzo; 019 020import java.io.IOException; 021import java.net.URI; 022import java.text.DecimalFormat; 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029 030public class LzoIndexer { 031 032 private static final Log LOG = LogFactory.getLog(LzoIndexer.class); 033 034 private final Configuration conf_; 035 private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); 036 private final String INDENT_STRING = " "; 037 private final DecimalFormat df_; 038 039 public LzoIndexer(Configuration conf) { 040 conf_ = conf; 041 df_ = new DecimalFormat("#0.00"); 042 } 043 044 /** 045 * Index the file given by lzoUri in its default filesystem. 046 * 047 * @param lzoPath The file to index. 048 * @throws IOException 049 */ 050 public void index(Path lzoPath) throws IOException { 051 indexInternal(lzoPath, 0); 052 } 053 054 /** 055 * Return indented space for pretty printing. 056 * 057 * @param nestingLevel The indentation level. 058 * @return Indented space for the given indentation level. 059 */ 060 private String getNesting(int nestingLevel) { 061 StringBuilder sb = new StringBuilder(); 062 for (int i = 0; i < nestingLevel; i++) { 063 sb.append(INDENT_STRING); 064 } 065 return sb.toString(); 066 } 067 068 /** 069 * Lzo index a given path, calling recursively to index directories when encountered. 070 * Files are only indexed if they end in .lzo and have no existing .lzo.index file. 071 * 072 * @param lzoPath The base path to index. 073 * @param nestingLevel For pretty printing, the nesting level. 074 * @throws IOException 075 */ 076 private void indexInternal(Path lzoPath, int nestingLevel) throws IOException { 077 FileSystem fs = FileSystem.get(URI.create(lzoPath.toString()), conf_); 078 FileStatus fileStatus = fs.getFileStatus(lzoPath); 079 080 // Recursively walk 081 if (fileStatus.isDir()) { 082 LOG.info(getNesting(nestingLevel) + "LZO Indexing directory " + lzoPath + "..."); 083 FileStatus[] statuses = fs.listStatus(lzoPath); 084 for (FileStatus childStatus : statuses) { 085 indexInternal(childStatus.getPath(), nestingLevel + 1); 086 } 087 } else if (lzoPath.toString().endsWith(LZO_EXTENSION)) { 088 Path lzoIndexPath = new Path(lzoPath.toString() + LzoIndex.LZO_INDEX_SUFFIX); 089 if (fs.exists(lzoIndexPath)) { 090 LOG.info(getNesting(nestingLevel) + "[SKIP] LZO index file already exists for " + lzoPath + "\n"); 091 } else { 092 long startTime = System.currentTimeMillis(); 093 long fileSize = fileStatus.getLen(); 094 095 LOG.info(getNesting(nestingLevel) + "[INDEX] LZO Indexing file " + lzoPath + ", size " 096 + df_.format(fileSize / (1024.0 * 1024.0 * 1024.0)) + " GB..."); 097 if (indexSingleFile(fs, lzoPath)) { 098 long indexSize = fs.getFileStatus(lzoIndexPath).getLen(); 099 double elapsed = (System.currentTimeMillis() - startTime) / 1000.0; 100 LOG.info(getNesting(nestingLevel) + "Completed LZO Indexing in " + df_.format(elapsed) + " seconds (" 101 + df_.format(fileSize / (1024.0 * 1024.0 * elapsed)) + " MB/s). Index size is " 102 + df_.format(indexSize / 1024.0) + " KB.\n"); 103 } 104 } 105 } 106 } 107 108 /** 109 * Create an lzo index for a single file in HDFS. 110 * @param fs The filesystem object. 111 * @param lzoPath The path to index (must be a file, not a directory). 112 * @return true if indexing succeeded. 113 */ 114 private boolean indexSingleFile(FileSystem fs, Path lzoPath) { 115 try { 116 LzoIndex.createIndex(fs, lzoPath); 117 return true; 118 } catch (IOException e) { 119 LOG.error("Error indexing " + lzoPath, e); 120 return false; 121 } 122 } 123 124 /** 125 * Run the LzoIndexer on each argument passed via stdin. The files should be HDFS locations. 126 */ 127 public static void main(String[] args) { 128 if (args.length == 0) { 129 printUsage(); 130 System.exit(1); 131 } 132 133 LzoIndexer lzoIndexer = new LzoIndexer(new Configuration()); 134 for (String arg : args) { 135 try { 136 lzoIndexer.index(new Path(arg)); 137 } catch (IOException e) { 138 LOG.error("Error indexing " + arg, e); 139 } 140 } 141 } 142 143 public static void printUsage() { 144 System.out.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.LzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]"); 145 } 146}