001/* 002 * This file is part of lzo-java, an implementation of LZO in Java. 003 * https://github.com/shevek/lzo-java 004 * 005 * The Java portion of this library is: 006 * Copyright (C) 2011 Shevek <shevek@anarres.org> 007 * All Rights Reserved. 008 * 009 * The preprocessed C portion of this library is: 010 * Copyright (C) 2006-2011 Markus Franz Xaver Johannes Oberhumer 011 * All Rights Reserved. 012 * 013 * This library is free software; you can redistribute it and/or 014 * modify it under the terms of the GNU General Public License 015 * as published by the Free Software Foundation; either version 016 * 2 of the License, or (at your option) any later version. 017 * 018 * This library is distributed in the hope that it will be useful, 019 * but WITHOUT ANY WARRANTY; without even the implied warranty 020 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 021 * See the GNU General Public License for more details. 022 * 023 * You should have received a copy of the GNU General Public 024 * License along with the LZO library; see the file COPYING. 025 * If not, see <http://www.gnu.org/licenses/> or write to the 026 * Free Software Foundation, Inc., 51 Franklin Street, Fifth 027 * Floor, Boston, MA 02110-1301, USA. 028 029 * As a special exception, the copyright holders of this file 030 * give you permission to link this file with independent 031 * modules to produce an executable, regardless of the license 032 * terms of these independent modules, and to copy and distribute 033 * the resulting executable under terms of your choice, provided 034 * that you also meet, for each linked independent module, 035 * the terms and conditions of the license of that module. An 036 * independent module is a module which is not derived from or 037 * based on this library or file. If you modify this file, you may 038 * extend this exception to your version of the file, but 039 * you are not obligated to do so. If you do not wish to do so, 040 * delete this exception statement from your version. 041 */ 042package org.anarres.lzo; 043 044import java.io.IOException; 045import java.io.OutputStream; 046import java.util.Arrays; 047import javax.annotation.CheckForSigned; 048import javax.annotation.Nonnegative; 049import javax.annotation.Nonnull; 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052 053/** 054 * 055 * @author shevek 056 */ 057public class LzoOutputStream extends OutputStream { 058 059 private static final Log LOG = LogFactory.getLog(LzoOutputStream.class.getName()); 060 protected final OutputStream out; 061 private final LzoCompressor compressor; // Replace with BlockCompressor. 062 private final byte[] inputBuffer; 063 private int inputBufferLen; 064 private byte[] inputHoldoverBuffer; 065 private int inputHoldoverBufferPos; 066 private int inputHoldoverBufferLen; 067 private final byte[] outputBuffer; 068 private final lzo_uintp outputBufferLen = new lzo_uintp(); // Also, end, since we base outputBuffer at 0. 069 070 /** 071 * Creates a new compressor using the specified {@link LzoCompressor}. 072 * 073 * @param compressor lzo compression algorithm to use 074 * @param inputBufferSize size of the buffers to be used. If <0, uses the default 64K. 075 */ 076 public LzoOutputStream(@Nonnull OutputStream out, @Nonnull LzoCompressor compressor, @CheckForSigned int inputBufferSize) { 077 if (inputBufferSize <= 0) 078 inputBufferSize = 64 * 1024; 079 this.out = out; 080 this.compressor = compressor; 081 this.inputBuffer = new byte[inputBufferSize]; 082 this.outputBuffer = new byte[inputBufferSize + compressor.getCompressionOverhead(inputBufferSize)]; 083 reset(); 084 } 085 086 /** 087 * Creates a new compressor with the specified compression. 088 */ 089 public LzoOutputStream(@Nonnull OutputStream out, @Nonnull LzoCompressor compressor) { 090 this(out, compressor, 0); 091 } 092 093 /** 094 * Creates a new compressor with the default lzo1x_1 compression. 095 */ 096 public LzoOutputStream(@Nonnull OutputStream out) { 097 this(out, LzoLibrary.getInstance().newCompressor(null, null)); 098 } 099 100 @Nonnull 101 public LzoCompressor getCompressor() { 102 return compressor; 103 } 104 105 @Nonnull 106 public LzoAlgorithm getAlgorithm() { 107 return getCompressor().getAlgorithm(); 108 } 109 110 @Nonnull 111 public LzoConstraint[] getConstraints() { 112 return getCompressor().getConstraints(); 113 } 114 115 private void reset() { 116 inputBufferLen = 0; 117 inputHoldoverBuffer = null; 118 inputHoldoverBufferPos = -1; 119 inputHoldoverBufferLen = -1; 120 outputBufferLen.value = 0; 121 } 122 123 private void logState(@Nonnull String when) { 124 LOG.info("\n"); 125 LOG.info(when + " Input buffer length=" + inputBufferLen + "/" + inputBuffer.length); 126 if (inputHoldoverBuffer == null) { 127 LOG.info(when + " Input holdover = null"); 128 } else { 129 LOG.info(when + " Input holdover pos=" + inputHoldoverBufferPos + "; length=" + inputHoldoverBufferLen); 130 } 131 LOG.info(when + " Output buffer length=" + outputBufferLen + "/" + outputBuffer.length); 132 // LOG.info(when + " Read=" + inputByteCount + "; Written=" + outputByteCount + "; Finished = " + finished); 133 testInvariants(); 134 } 135 136 private boolean testInvariants() { 137 if (inputHoldoverBuffer != null) { 138 if (inputBufferLen != 0 && inputBufferLen != inputBuffer.length) 139 throw new IllegalStateException("Funny input buffer length " + inputBufferLen + " with array size " + inputBuffer.length + " and holdover."); 140 if (inputHoldoverBufferPos < 0) 141 throw new IllegalStateException("Using holdover buffer, but invalid holdover position " + inputHoldoverBufferPos); 142 if (inputHoldoverBufferLen < 0) 143 throw new IllegalStateException("Using holdover buffer, but invalid holdover length " + inputHoldoverBufferLen); 144 } else { 145 if (inputHoldoverBufferPos != -1) 146 throw new IllegalStateException("No holdover buffer, but valid holdover position " + inputHoldoverBufferPos); 147 if (inputHoldoverBufferLen != -1) 148 throw new IllegalStateException("No holdover buffer, but valid holdover length " + inputHoldoverBufferLen); 149 } 150 151 if (outputBufferLen.value < 0) 152 throw new IllegalStateException("Output buffer overrun length=" + outputBufferLen); 153 154 return true; 155 } 156 157 @Override 158 public void write(int b) throws IOException { 159 write(new byte[]{(byte) b}); 160 } 161 162 @Override 163 public void write(byte[] b) throws IOException { 164 write(b, 0, b.length); 165 } 166 167 @Override 168 public void write(byte[] b, int off, int len) throws IOException { 169 // logState("Before setInput"); 170 if (b == null) 171 throw new NullPointerException(); 172 if (off < 0 || len < 0 || off > b.length - len) 173 throw new ArrayIndexOutOfBoundsException("Illegal range in buffer: Buffer length=" + b.length + ", offset=" + off + ", length=" + len); 174 if (inputHoldoverBuffer != null) 175 throw new IllegalStateException("Cannot accept input while holdover is present."); 176 177 inputHoldoverBuffer = Arrays.copyOfRange(b, off, off + len); 178 inputHoldoverBufferPos = off; 179 inputHoldoverBufferLen = len; 180 compact(); 181 182 while (inputHoldoverBuffer != null || inputBufferLen == inputBuffer.length) 183 compress(); 184 185 // logState("After setInput"); 186 } 187 188 @Override 189 public void flush() throws IOException { 190 while (inputHoldoverBuffer != null || inputBufferLen > 0) 191 compress(); 192 } 193 194 @Override 195 public void close() throws IOException { 196 flush(); 197 out.close(); 198 } 199 200 private void compact() { 201 if (inputHoldoverBuffer == null) { 202 assert testInvariants(); 203 return; 204 } 205 206 int remaining = inputBuffer.length - inputBufferLen; 207 if (inputHoldoverBufferLen <= remaining) { // Possibly even 0. 208 // We can put the entire holdover into the input buffer. 209 System.arraycopy(inputHoldoverBuffer, inputHoldoverBufferPos, inputBuffer, inputBufferLen, inputHoldoverBufferLen); 210 inputBufferLen += inputHoldoverBufferLen; 211 inputHoldoverBuffer = null; 212 inputHoldoverBufferPos = -1; 213 inputHoldoverBufferLen = -1; 214 } else if (inputBufferLen == 0) { 215 // We have no input, and will run zero-copy from the holdover buffer. 216 } else { 217 // We need to complete the input buffer block using holdover. 218 System.arraycopy(inputHoldoverBuffer, inputHoldoverBufferPos, inputBuffer, inputBufferLen, remaining); 219 inputBufferLen += remaining; 220 inputHoldoverBufferPos += remaining; 221 inputHoldoverBufferLen -= remaining; 222 } 223 assert testInvariants(); 224 } 225 226 private void compress() throws IOException { 227 // logState("Before compress"); 228 229 byte[] compressBuffer; 230 int compressBufferPos; 231 int compressBufferLen; 232 233 // Do compression. 234 if (inputBufferLen > 0) { 235 compressBuffer = inputBuffer.clone(); 236 compressBufferPos = 0; 237 compressBufferLen = inputBufferLen; 238 inputBufferLen = 0; 239 } else if (inputHoldoverBuffer != null) { 240 compressBuffer = inputHoldoverBuffer.clone(); 241 compressBufferPos = inputHoldoverBufferPos; 242 // If this is ever less than inputBuffer.length, then we should have copied it into the input buffer. 243 compressBufferLen = Math.min(inputBuffer.length, inputHoldoverBufferLen); 244 assert compressBufferLen == inputBuffer.length : "Compressing less than one block of holdover."; 245 inputHoldoverBufferPos += compressBufferLen; 246 inputHoldoverBufferLen -= compressBufferLen; 247 } else { 248 throw new IllegalStateException("compress() called with no input."); 249 } 250 compact(); 251 252 // A sane implementation would do this here, but Hadoop breaks if we do. 253 // inputByteCount += compressBufferLen; 254 outputBufferLen.value = outputBuffer.length; 255 try { 256 int code = compressor.compress(compressBuffer, compressBufferPos, compressBufferLen, outputBuffer, 0, outputBufferLen); 257 if (code != LzoTransformer.LZO_E_OK) { 258 logState("LZO error: " + code); 259 // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(compressBuffer, compressBufferPos, compressBufferPos + compressBufferLen)); 260 throw new IllegalArgumentException(compressor.toErrorString(code)); 261 } 262 } catch (IndexOutOfBoundsException e) { 263 logState("IndexOutOfBoundsException: " + e); 264 // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(compressBuffer, compressBufferPos, compressBufferPos + compressBufferLen)); 265 throw new IOException(e); 266 } 267 // LOG.info(compressBufferLen + "(" + Integer.toHexString(compressBufferLen) + ") -> " + outputBufferLen + "(" + Integer.toHexString(outputBufferLen.value) + ")"); 268 269 writeBlock(compressBuffer, compressBufferPos, compressBufferLen, outputBuffer, 0, outputBufferLen.value); 270 } 271 272 protected void writeBlock(@Nonnull byte[] inputData, @Nonnegative int inputPos, @Nonnegative int inputLen, @Nonnull byte[] outputData, @Nonnegative int outputPos, @Nonnegative int outputLen) 273 throws IOException { 274 writeInt(inputLen); 275 writeInt(outputLen); 276 out.write(outputData, outputPos, outputLen); 277 } 278 279 protected void writeInt(int v) throws IOException { 280 out.write((v >>> 24) & 0xFF); 281 out.write((v >>> 16) & 0xFF); 282 out.write((v >>> 8) & 0xFF); 283 out.write(v & 0xFF); 284 } 285}