001/* 002 * This file is part of Hadoop-Gpl-Compression. 003 * 004 * Hadoop-Gpl-Compression is free software: you can redistribute it 005 * and/or modify it under the terms of the GNU General Public License 006 * as published by the Free Software Foundation, either version 3 of 007 * the License, or (at your option) any later version. 008 * 009 * Hadoop-Gpl-Compression is distributed in the hope that it will be 010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty 011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 012 * GNU General Public License for more details. 013 * 014 * You should have received a copy of the GNU General Public License 015 * along with Hadoop-Gpl-Compression. If not, see 016 * <http://www.gnu.org/licenses/>. 017 */ 018package com.hadoop.compression.lzo; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.util.Arrays; 023import org.anarres.lzo.LzopInputStream; 024import org.apache.hadoop.conf.Configurable; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FSDataInputStream; 027import org.apache.hadoop.fs.FSDataOutputStream; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.io.compress.CompressionCodec; 031import org.apache.hadoop.io.compress.CompressionCodecFactory; 032 033/** 034 * Represents the lzo index. 035 */ 036public class LzoIndex { 037 038 public static final String LZO_INDEX_SUFFIX = ".index"; 039 public static final String LZO_TMP_INDEX_SUFFIX = ".index.tmp"; 040 public static final long NOT_FOUND = -1; 041 042 private long[] blockPositions_; 043 044 /** 045 * Create an empty index, typically indicating no index file exists. 046 */ 047 public LzoIndex() { 048 } 049 050 /** 051 * Create an index specifying the number of LZO blocks in the file. 052 * @param blocks The number of blocks in the LZO file the index is representing. 053 */ 054 public LzoIndex(int blocks) { 055 blockPositions_ = new long[blocks]; 056 } 057 058 /** 059 * Set the position for the block. 060 * 061 * @param blockNumber Block to set pos for. 062 * @param pos Position. 063 */ 064 public void set(int blockNumber, long pos) { 065 blockPositions_[blockNumber] = pos; 066 } 067 068 /** 069 * Get the total number of blocks in the index file. 070 */ 071 public int getNumberOfBlocks() { 072 return blockPositions_.length; 073 } 074 075 /** 076 * Get the block offset for a given block. 077 * @param block 078 * @return the byte offset into the file where this block starts. It is the developer's 079 * responsibility to call getNumberOfBlocks() to know appropriate bounds on the parameter. 080 * The argument block should satisfy 0 <= block < getNumberOfBlocks(). 081 */ 082 public long getPosition(int block) { 083 return blockPositions_[block]; 084 } 085 086 /** 087 * Find the next lzo block start from the given position. 088 * 089 * @param pos The position to start looking from. 090 * @return Either the start position of the block or -1 if it couldn't be found. 091 */ 092 public long findNextPosition(long pos) { 093 int block = Arrays.binarySearch(blockPositions_, pos); 094 095 if (block >= 0) { 096 // direct hit on a block start position 097 return blockPositions_[block]; 098 } else { 099 block = Math.abs(block) - 1; 100 if (block > blockPositions_.length - 1) { 101 return NOT_FOUND; 102 } 103 return blockPositions_[block]; 104 } 105 } 106 107 /** 108 * Return true if the index has no blocks set. 109 * 110 * @return true if the index has no blocks set. 111 */ 112 public boolean isEmpty() { 113 return blockPositions_ == null || blockPositions_.length == 0; 114 } 115 116 /** 117 * Nudge a given file slice start to the nearest LZO block start no earlier than 118 * the current slice start. 119 * 120 * @param start The current slice start 121 * @param end The current slice end 122 * @return The smallest block offset in the index between [start, end), or 123 * NOT_FOUND if there is none such. 124 */ 125 public long alignSliceStartToIndex(long start, long end) { 126 if (start != 0) { 127 // find the next block position from 128 // the start of the split 129 long newStart = findNextPosition(start); 130 if (newStart == NOT_FOUND || newStart >= end) { 131 return NOT_FOUND; 132 } 133 start = newStart; 134 } 135 return start; 136 } 137 138 /** 139 * Nudge a given file slice end to the nearest LZO block end no earlier than 140 * the current slice end. 141 * 142 * @param end The current slice end 143 * @param fileSize The size of the file, i.e. the max end position. 144 * @return The smallest block offset in the index between [end, fileSize]. 145 */ 146 public long alignSliceEndToIndex(long end, long fileSize) { 147 long newEnd = findNextPosition(end); 148 if (newEnd != NOT_FOUND) { 149 end = newEnd; 150 } else { 151 // didn't find the next position 152 // we have hit the end of the file 153 end = fileSize; 154 } 155 return end; 156 } 157 158 /** 159 * Read the index of the lzo file. 160 161 * @param fs The index file is on this file system. 162 * @param lzoFile the file whose index we are reading -- NOT the index file itself. That is, 163 * pass in filename.lzo, not filename.lzo.index, for this parameter. 164 * @throws IOException 165 */ 166 public static LzoIndex readIndex(FileSystem fs, Path lzoFile) throws IOException { 167 FSDataInputStream indexIn = null; 168 try { 169 Path indexFile = lzoFile.suffix(LZO_INDEX_SUFFIX); 170 if (!fs.exists(indexFile)) { 171 // return empty index, fall back to the unsplittable mode 172 return new LzoIndex(); 173 } 174 175 long indexLen = fs.getFileStatus(indexFile).getLen(); 176 int blocks = (int) (indexLen / 8); 177 LzoIndex index = new LzoIndex(blocks); 178 indexIn = fs.open(indexFile); 179 for (int i = 0; i < blocks; i++) { 180 index.set(i, indexIn.readLong()); 181 } 182 return index; 183 } finally { 184 if (indexIn != null) { 185 indexIn.close(); 186 } 187 } 188 } 189 190 /** 191 * Index an lzo file to allow the input format to split them into separate map 192 * jobs. 193 * 194 * @param fs File system that contains the file. 195 * @param lzoFile the lzo file to index. For filename.lzo, the created index file will be 196 * filename.lzo.index. 197 * @throws IOException 198 */ 199 public static void createIndex(FileSystem fs, Path lzoFile) 200 throws IOException { 201 202 Configuration conf = fs.getConf(); 203 CompressionCodecFactory factory = new CompressionCodecFactory(conf); 204 CompressionCodec codec = factory.getCodec(lzoFile); 205 if (null == codec) { 206 throw new IOException("Could not find codec for file " + lzoFile 207 + " - you may need to add the LZO codec to your io.compression.codecs " 208 + "configuration in core-site.xml"); 209 } 210 ((Configurable) codec).setConf(conf); 211 212 FSDataInputStream is = null; 213 FSDataOutputStream os = null; 214 Path outputFile = lzoFile.suffix(LZO_INDEX_SUFFIX); 215 Path tmpOutputFile = lzoFile.suffix(LZO_TMP_INDEX_SUFFIX); 216 217 // Track whether an exception was thrown or not, so we know to either 218 // delete the tmp index file on failure, or rename it to the new index file on success. 219 boolean indexingSucceeded = false; 220 try { 221 is = fs.open(lzoFile); 222 os = fs.create(tmpOutputFile); 223 // Solely for reading the header 224 LzopInputStream lzis = new LzopInputStream(is); 225 int numCompressedChecksums = lzis.getCompressedChecksumCount(); 226 int numDecompressedChecksums = lzis.getUncompressedChecksumCount(); 227 228 while (true) { 229 // read and ignore, we just want to get to the next int 230 int uncompressedBlockSize = is.readInt(); 231 if (uncompressedBlockSize == 0) { 232 break; 233 } else if (uncompressedBlockSize < 0) { 234 throw new EOFException(); 235 } 236 237 int compressedBlockSize = is.readInt(); 238 if (compressedBlockSize <= 0) { 239 throw new IOException("Could not read compressed block size"); 240 } 241 242 // See LzopInputStream.getCompressedData 243 boolean isUncompressedBlock = (uncompressedBlockSize == compressedBlockSize); 244 int numChecksumsToSkip = isUncompressedBlock 245 ? numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums; 246 long pos = is.getPos(); 247 // write the pos of the block start 248 os.writeLong(pos - 8); 249 // seek to the start of the next block, skip any checksums 250 is.seek(pos + compressedBlockSize + (4 * numChecksumsToSkip)); 251 } 252 // If we're here, indexing was successful. 253 indexingSucceeded = true; 254 } finally { 255 // Close any open streams. 256 if (is != null) { 257 is.close(); 258 } 259 260 if (os != null) { 261 os.close(); 262 } 263 264 if (!indexingSucceeded) { 265 // If indexing didn't succeed (i.e. an exception was thrown), clean up after ourselves. 266 fs.delete(tmpOutputFile, false); 267 } else { 268 // Otherwise, rename filename.lzo.index.tmp to filename.lzo.index. 269 fs.rename(tmpOutputFile, outputFile); 270 } 271 } 272 } 273}