001/* 002 * This file is part of lzo-java, an implementation of LZO in Java. 003 * https://github.com/shevek/lzo-java 004 * 005 * The Java portion of this library is: 006 * Copyright (C) 2011 Shevek <shevek@anarres.org> 007 * All Rights Reserved. 008 * 009 * This file is based on a file from hadoop-gpl-compression. 010 * 011 * This library is free software; you can redistribute it and/or 012 * modify it under the terms of the GNU General Public License 013 * as published by the Free Software Foundation; either version 014 * 2 of the License, or (at your option) any later version. 015 * 016 * This library is distributed in the hope that it will be useful, 017 * but WITHOUT ANY WARRANTY; without even the implied warranty 018 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 019 * See the GNU General Public License for more details. 020 * 021 * You should have received a copy of the GNU General Public 022 * License along with the LZO library; see the file COPYING. 023 * If not, see <http://www.gnu.org/licenses/> or write to the 024 * Free Software Foundation, Inc., 51 Franklin Street, Fifth 025 * Floor, Boston, MA 02110-1301, USA. 026 */ 027package org.anarres.lzo.hadoop.codec; 028 029import java.io.IOException; 030import org.anarres.lzo.LzoAlgorithm; 031import org.anarres.lzo.LzoConstraint; 032import org.anarres.lzo.LzoLibrary; 033import org.anarres.lzo.LzoTransformer; 034import org.anarres.lzo.SuppressWarnings; 035import org.anarres.lzo.lzo_uintp; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.io.compress.Compressor; 040 041/** 042 * A {@link Compressor} based on the lzo algorithm. 043 * http://www.oberhumer.com/opensource/lzo/ 044 * 045 */ 046public class LzoCompressor implements Compressor { 047 048 private static final Log LOG = LogFactory.getLog(LzoCompressor.class); 049 050 /** 051 * The compression algorithm for lzo library. 052 */ 053 public static enum CompressionStrategy { 054 055 /** 056 * lzo1 algorithms. 057 */ 058 LZO1(LzoAlgorithm.LZO1), 059 LZO1_99(LzoAlgorithm.LZO1, LzoConstraint.COMPRESSION), 060 /** 061 * lzo1a algorithms. 062 */ 063 LZO1A(LzoAlgorithm.LZO1), 064 LZO1A_99(LzoAlgorithm.LZO1, LzoConstraint.COMPRESSION), 065 /** 066 * lzo1b algorithms. 067 */ 068 LZO1B(LzoAlgorithm.LZO1), 069 LZO1B_BEST_COMPRESSION(LzoAlgorithm.LZO1, LzoConstraint.COMPRESSION), 070 LZO1B_BEST_SPEED(LzoAlgorithm.LZO1, LzoConstraint.SPEED), 071 LZO1B_1(LzoAlgorithm.LZO1B), 072 LZO1B_2(LzoAlgorithm.LZO1B), 073 LZO1B_3(LzoAlgorithm.LZO1B), 074 LZO1B_4(LzoAlgorithm.LZO1B), 075 LZO1B_5(LzoAlgorithm.LZO1B), 076 LZO1B_6(LzoAlgorithm.LZO1B), 077 LZO1B_7(LzoAlgorithm.LZO1B), 078 LZO1B_8(LzoAlgorithm.LZO1B), 079 LZO1B_9(LzoAlgorithm.LZO1B), 080 LZO1B_99(LzoAlgorithm.LZO1B, LzoConstraint.COMPRESSION), 081 LZO1B_999(LzoAlgorithm.LZO1B, LzoConstraint.COMPRESSION), 082 /** 083 * lzo1c algorithms. 084 */ 085 LZO1C(LzoAlgorithm.LZO1C), 086 LZO1C_BEST_COMPRESSION(LzoAlgorithm.LZO1C, LzoConstraint.COMPRESSION), 087 LZO1C_BEST_SPEED(LzoAlgorithm.LZO1C, LzoConstraint.SPEED), 088 LZO1C_1(LzoAlgorithm.LZO1C), 089 LZO1C_2(LzoAlgorithm.LZO1C), 090 LZO1C_3(LzoAlgorithm.LZO1C), 091 LZO1C_4(LzoAlgorithm.LZO1C), 092 LZO1C_5(LzoAlgorithm.LZO1C), 093 LZO1C_6(LzoAlgorithm.LZO1C), 094 LZO1C_7(LzoAlgorithm.LZO1C), 095 LZO1C_8(LzoAlgorithm.LZO1C), 096 LZO1C_9(LzoAlgorithm.LZO1C), 097 LZO1C_99(LzoAlgorithm.LZO1C, LzoConstraint.COMPRESSION), 098 LZO1C_999(LzoAlgorithm.LZO1C, LzoConstraint.COMPRESSION), 099 /** 100 * lzo1f algorithms. 101 */ 102 LZO1F_1(LzoAlgorithm.LZO1F), 103 LZO1F_999(LzoAlgorithm.LZO1F, LzoConstraint.COMPRESSION), 104 /** 105 * lzo1x algorithms. 106 */ 107 LZO1X_1(LzoAlgorithm.LZO1X), 108 LZO1X_11(LzoAlgorithm.LZO1X, LzoConstraint.MEMORY), 109 LZO1X_12(LzoAlgorithm.LZO1X), 110 LZO1X_15(LzoAlgorithm.LZO1X), 111 LZO1X_999(LzoAlgorithm.LZO1X, LzoConstraint.COMPRESSION), 112 /** 113 * lzo1y algorithms. 114 */ 115 LZO1Y_1(LzoAlgorithm.LZO1Y), 116 LZO1Y_999(LzoAlgorithm.LZO1Y, LzoConstraint.COMPRESSION), 117 /** 118 * lzo1z algorithms. 119 */ 120 LZO1Z_999(LzoAlgorithm.LZO1Z, LzoConstraint.COMPRESSION), 121 /** 122 * lzo2a algorithms. 123 */ 124 LZO2A_999(LzoAlgorithm.LZO2A, LzoConstraint.COMPRESSION); 125 private final LzoAlgorithm algorithm; 126 private final LzoConstraint constraint; 127 128 private CompressionStrategy(LzoAlgorithm algorithm, LzoConstraint constraint) { 129 this.algorithm = algorithm; 130 this.constraint = constraint; 131 } 132 133 private CompressionStrategy(LzoAlgorithm algorithm) { 134 this(algorithm, null); 135 } 136 137 public org.anarres.lzo.LzoCompressor newCompressor() { 138 return LzoLibrary.getInstance().newCompressor(algorithm, constraint); 139 } 140 }; // CompressionStrategy 141 private final org.anarres.lzo.LzoCompressor compressor; // The lzo compression algorithm. 142 private final byte[] inputBuffer; 143 private int inputBufferLen; 144 private byte[] inputHoldoverBuffer; 145 private int inputHoldoverBufferPos; 146 private int inputHoldoverBufferLen; 147 private final byte[] outputBuffer; 148 private int outputBufferPos; 149 private final lzo_uintp outputBufferLen = new lzo_uintp(); // Also, end, since we base outputBuffer at 0. 150 private int inputByteCount; 151 private int outputByteCount; 152 private boolean finished; // Interaction with a brain-damaged contract from BlockCompressorStream. 153 154 /** 155 * Creates a new compressor using the specified {@link CompressionStrategy}. 156 * 157 * @param strategy lzo compression algorithm to use 158 * @param outputBufferSize size of the output buffer to be used. 159 */ 160 public LzoCompressor(CompressionStrategy strategy, int outputBufferSize) { 161 this.compressor = strategy.newCompressor(); 162 this.inputBuffer = new byte[outputBufferSize]; 163 this.outputBuffer = new byte[outputBufferSize + (outputBufferSize >> 3) + 256]; 164 reset(); 165 } 166 167 /** 168 * Creates a new compressor with the default lzo1x_1 compression. 169 */ 170 public LzoCompressor() { 171 this(CompressionStrategy.LZO1X_1, 64 * 1024); 172 } 173 174 private void logState(String when) { 175 LOG.info("\n"); 176 LOG.info(when + " Input buffer length=" + inputBufferLen + "/" + inputBuffer.length); 177 if (inputHoldoverBuffer == null) { 178 LOG.info(when + " Input holdover = null"); 179 } else { 180 LOG.info(when + " Input holdover pos=" + inputHoldoverBufferPos + "; length=" + inputHoldoverBufferLen); 181 } 182 LOG.info(when + " Output buffer pos=" + outputBufferPos + "; length=" + outputBufferLen + "/" + outputBuffer.length); 183 // LOG.info(when + " Read=" + inputByteCount + "; Written=" + outputByteCount + "; Finished = " + finished); 184 testInvariants(); 185 } 186 187 private boolean testInvariants() { 188 if (inputHoldoverBuffer != null) { 189 if (inputBufferLen != 0 && inputBufferLen != inputBuffer.length) 190 throw new IllegalStateException("Funny input buffer length " + inputBufferLen + " with array size " + inputBuffer.length + " and holdover."); 191 if (inputHoldoverBufferPos < 0) 192 throw new IllegalStateException("Using holdover buffer, but invalid holdover position " + inputHoldoverBufferPos); 193 if (inputHoldoverBufferLen < 0) 194 throw new IllegalStateException("Using holdover buffer, but invalid holdover length " + inputHoldoverBufferLen); 195 } else { 196 if (inputHoldoverBufferPos != -1) 197 throw new IllegalStateException("No holdover buffer, but valid holdover position " + inputHoldoverBufferPos); 198 if (inputHoldoverBufferLen != -1) 199 throw new IllegalStateException("No holdover buffer, but valid holdover length " + inputHoldoverBufferLen); 200 } 201 202 if (outputBufferLen.value < 0) 203 throw new IllegalStateException("Output buffer overrun pos=" + outputBufferPos + "; len=" + outputBufferLen); 204 205 return true; 206 } 207 208 /** 209 * {@inheritDoc} 210 * 211 * WARNING: This method retains a pointer to the user's buffer. 212 */ 213 @Override 214 @SuppressWarnings("EI_EXPOSE_REP2") 215 public void setInput(byte[] b, int off, int len) { 216 // logState("Before setInput"); 217 if (b == null) 218 throw new NullPointerException(); 219 if (off < 0 || len < 0 || off > b.length - len) 220 throw new ArrayIndexOutOfBoundsException("Illegal range in buffer: Buffer length=" + b.length + ", offset=" + off + ", length=" + len); 221 if (inputHoldoverBuffer != null) 222 throw new IllegalStateException("Cannot accept input while holdover is present."); 223 224 inputHoldoverBuffer = b; 225 inputHoldoverBufferPos = off; 226 inputHoldoverBufferLen = len; 227 compact(); 228 229 inputByteCount += len; // Unfortunately, we have to do this here. This is so, so, so wrong. 230 231 // logState("After setInput"); 232 } 233 234 @Override 235 public void setDictionary(byte[] b, int off, int len) { 236 // nop 237 } 238 239 /** {@inheritDoc} */ 240 @Override 241 public boolean needsInput() { 242 compact(); 243 if (inputHoldoverBuffer != null) 244 return false; 245 return inputBufferLen < inputBuffer.length; 246 } 247 248 @Override 249 public void finish() { 250 finished = true; 251 } 252 253 @Override 254 public boolean finished() { 255 assert testInvariants(); 256 return finished && outputBufferLen.value == 0 && inputBufferLen == 0 && inputHoldoverBuffer == null; 257 } 258 259 private void compact() { 260 if (inputHoldoverBuffer == null) { 261 assert testInvariants(); 262 return; 263 } 264 265 int remaining = inputBuffer.length - inputBufferLen; 266 if (inputHoldoverBufferLen <= remaining) { 267 // We can put the entire holdover into the input buffer. 268 System.arraycopy(inputHoldoverBuffer, inputHoldoverBufferPos, inputBuffer, inputBufferLen, inputHoldoverBufferLen); 269 inputBufferLen += inputHoldoverBufferLen; 270 inputHoldoverBuffer = null; 271 inputHoldoverBufferPos = -1; 272 inputHoldoverBufferLen = -1; 273 } else if (inputBufferLen == 0) { 274 // We have no input, and will run zero-copy from the holdover buffer. 275 } else { 276 // We need to complete the input buffer block using holdover. 277 System.arraycopy(inputHoldoverBuffer, inputHoldoverBufferPos, inputBuffer, inputBufferLen, remaining); 278 inputBufferLen += remaining; 279 inputHoldoverBufferPos += remaining; 280 inputHoldoverBufferLen -= remaining; 281 } 282 assert testInvariants(); 283 } 284 285 @Override 286 public int compress(byte[] b, int off, int len) throws IOException { 287 // logState("Before compress"); 288 if (b == null) 289 throw new NullPointerException(); 290 if (off < 0 || len < 0 || off > b.length - len) 291 throw new ArrayIndexOutOfBoundsException("Illegal range in buffer: Buffer length=" + b.length + ", offset=" + off + ", length=" + len); 292 293 if (outputBufferLen.value == 0) { 294 byte[] compressBuffer; 295 int compressBufferPos; 296 int compressBufferLen; 297 298 // Do compression. 299 if (inputBufferLen > 0) { 300 compressBuffer = inputBuffer; 301 compressBufferPos = 0; 302 compressBufferLen = inputBufferLen; 303 inputBufferLen = 0; 304 } else if (inputHoldoverBuffer != null) { 305 compressBuffer = inputHoldoverBuffer; 306 compressBufferPos = inputHoldoverBufferPos; 307 // If this is ever less than inputBuffer.length, then we should have copied it into the input buffer. 308 compressBufferLen = Math.min(inputBuffer.length, inputHoldoverBufferLen); 309 assert compressBufferLen == inputBuffer.length : "Compressing less than one block of holdover."; 310 inputHoldoverBufferPos += compressBufferLen; 311 inputHoldoverBufferLen -= compressBufferLen; 312 } else { 313 throw new IllegalStateException("compress() called with no input."); 314 } 315 compact(); 316 317 // A sane implementation would do this here, but Hadoop breaks if we do. 318 // inputByteCount += compressBufferLen; 319 outputBufferPos = 0; 320 outputBufferLen.value = outputBuffer.length; 321 try { 322 int code = compressor.compress(compressBuffer, compressBufferPos, compressBufferLen, outputBuffer, outputBufferPos, outputBufferLen); 323 if (code != LzoTransformer.LZO_E_OK) { 324 logState("LZO error: " + code); 325 // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(compressBuffer, compressBufferPos, compressBufferPos + compressBufferLen)); 326 throw new IllegalArgumentException(compressor.toErrorString(code)); 327 } 328 } catch (IndexOutOfBoundsException e) { 329 logState("IndexOutOfBoundsException: " + e); 330 // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(compressBuffer, compressBufferPos, compressBufferPos + compressBufferLen)); 331 throw new IOException(e); 332 } 333 // LOG.info(compressBufferLen + "(" + Integer.toHexString(compressBufferLen) + ") -> " + outputBufferLen + "(" + Integer.toHexString(outputBufferLen.value) + ")"); 334 } 335 336 len = Math.min(len, outputBufferLen.value); 337 System.arraycopy(outputBuffer, outputBufferPos, b, off, len); 338 outputBufferPos += len; 339 outputBufferLen.value -= len; 340 341 outputByteCount += len; 342 343 // logState("After compress; len=" + len); 344 return len; 345 } 346 347 // This method is called from the constructor, and must not be overridden. 348 private void _reset() { 349 inputByteCount = 0; 350 outputByteCount = 0; 351 inputBufferLen = 0; 352 inputHoldoverBuffer = null; 353 inputHoldoverBufferPos = -1; 354 inputHoldoverBufferLen = -1; 355 outputBufferPos = 0; 356 outputBufferLen.value = 0; 357 finished = false; 358 } 359 360 @Override 361 public void reset() { 362 _reset(); 363 } 364 365 @Override 366 public synchronized void reinit(Configuration conf) { 367 _reset(); 368 } 369 370 /** 371 * Return number of bytes given to this compressor since last reset. 372 */ 373 @Override 374 public synchronized long getBytesRead() { 375 return inputByteCount; 376 } 377 378 /** 379 * Return number of bytes consumed by callers of compress since last reset. 380 */ 381 @Override 382 public long getBytesWritten() { 383 return outputByteCount; 384 } 385 386 /** 387 * Noop. 388 */ 389 @Override 390 public void end() { 391 // nop 392 } 393}