001/*
002 * This file is part of lzo-java, an implementation of LZO in Java.
003 * https://github.com/shevek/lzo-java
004 *
005 * The Java portion of this library is:
006 * Copyright (C) 2011 Shevek <shevek@anarres.org>
007 * All Rights Reserved.
008 *
009 * The preprocessed C portion of this library is:
010 * Copyright (C) 2006-2011 Markus Franz Xaver Johannes Oberhumer
011 * All Rights Reserved.
012 *
013 * This library is free software; you can redistribute it and/or
014 * modify it under the terms of the GNU General Public License 
015 * as published by the Free Software Foundation; either version 
016 * 2 of the License, or (at your option) any later version.
017 *
018 * This library is distributed in the hope that it will be useful, 
019 * but WITHOUT ANY WARRANTY; without even the implied warranty
020 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
021 * See the GNU General Public License for more details.
022 *
023 * You should have received a copy of the GNU General Public
024 * License along with the LZO library; see the file COPYING.
025 * If not, see <http://www.gnu.org/licenses/> or write to the
026 * Free Software Foundation, Inc., 51 Franklin Street, Fifth
027 * Floor, Boston, MA 02110-1301, USA.
028
029 * As a special exception, the copyright holders of this file
030 * give you permission to link this file with independent
031 * modules to produce an executable, regardless of the license 
032 * terms of these independent modules, and to copy and distribute
033 * the resulting executable under terms of your choice, provided
034 * that you also meet, for each linked independent module, 
035 * the terms and conditions of the license of that module. An
036 * independent module is a module which is not derived from or 
037 * based on this library or file. If you modify this file, you may 
038 * extend this exception to your version of the file, but
039 * you are not obligated to do so. If you do not wish to do so,
040 * delete this exception statement from your version.
041 */
042package org.anarres.lzo;
043
044import java.io.IOException;
045import java.io.OutputStream;
046import java.util.Arrays;
047import javax.annotation.CheckForSigned;
048import javax.annotation.Nonnegative;
049import javax.annotation.Nonnull;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052
053/**
054 *
055 * @author shevek
056 */
057public class LzoOutputStream extends OutputStream {
058
059    private static final Log LOG = LogFactory.getLog(LzoOutputStream.class.getName());
060    protected final OutputStream out;
061    private final LzoCompressor compressor; // Replace with BlockCompressor.
062    private final byte[] inputBuffer;
063    private int inputBufferLen;
064    private byte[] inputHoldoverBuffer;
065    private int inputHoldoverBufferPos;
066    private int inputHoldoverBufferLen;
067    private final byte[] outputBuffer;
068    private final lzo_uintp outputBufferLen = new lzo_uintp();  // Also, end, since we base outputBuffer at 0.
069
070    /**
071     * Creates a new compressor using the specified {@link LzoCompressor}.
072     *
073     * @param compressor lzo compression algorithm to use
074     * @param inputBufferSize size of the buffers to be used. If &lt;0, uses the default 64K.
075     */
076    public LzoOutputStream(@Nonnull OutputStream out, @Nonnull LzoCompressor compressor, @CheckForSigned int inputBufferSize) {
077        if (inputBufferSize <= 0)
078            inputBufferSize = 64 * 1024;
079        this.out = out;
080        this.compressor = compressor;
081        this.inputBuffer = new byte[inputBufferSize];
082        this.outputBuffer = new byte[inputBufferSize + compressor.getCompressionOverhead(inputBufferSize)];
083        reset();
084    }
085
086    /**
087     * Creates a new compressor with the specified compression.
088     */
089    public LzoOutputStream(@Nonnull OutputStream out, @Nonnull LzoCompressor compressor) {
090        this(out, compressor, 0);
091    }
092
093    /**
094     * Creates a new compressor with the default lzo1x_1 compression.
095     */
096    public LzoOutputStream(@Nonnull OutputStream out) {
097        this(out, LzoLibrary.getInstance().newCompressor(null, null));
098    }
099
100    @Nonnull
101    public LzoCompressor getCompressor() {
102        return compressor;
103    }
104
105    @Nonnull
106    public LzoAlgorithm getAlgorithm() {
107        return getCompressor().getAlgorithm();
108    }
109
110    @Nonnull
111    public LzoConstraint[] getConstraints() {
112        return getCompressor().getConstraints();
113    }
114
115    private void reset() {
116        inputBufferLen = 0;
117        inputHoldoverBuffer = null;
118        inputHoldoverBufferPos = -1;
119        inputHoldoverBufferLen = -1;
120        outputBufferLen.value = 0;
121    }
122
123    private void logState(@Nonnull String when) {
124        LOG.info("\n");
125        LOG.info(when + " Input buffer length=" + inputBufferLen + "/" + inputBuffer.length);
126        if (inputHoldoverBuffer == null) {
127            LOG.info(when + " Input holdover = null");
128        } else {
129            LOG.info(when + " Input holdover pos=" + inputHoldoverBufferPos + "; length=" + inputHoldoverBufferLen);
130        }
131        LOG.info(when + " Output buffer length=" + outputBufferLen + "/" + outputBuffer.length);
132        // LOG.info(when + " Read=" + inputByteCount + "; Written=" + outputByteCount + "; Finished = " + finished);
133        testInvariants();
134    }
135
136    private boolean testInvariants() {
137        if (inputHoldoverBuffer != null) {
138            if (inputBufferLen != 0 && inputBufferLen != inputBuffer.length)
139                throw new IllegalStateException("Funny input buffer length " + inputBufferLen + " with array size " + inputBuffer.length + " and holdover.");
140            if (inputHoldoverBufferPos < 0)
141                throw new IllegalStateException("Using holdover buffer, but invalid holdover position " + inputHoldoverBufferPos);
142            if (inputHoldoverBufferLen < 0)
143                throw new IllegalStateException("Using holdover buffer, but invalid holdover length " + inputHoldoverBufferLen);
144        } else {
145            if (inputHoldoverBufferPos != -1)
146                throw new IllegalStateException("No holdover buffer, but valid holdover position " + inputHoldoverBufferPos);
147            if (inputHoldoverBufferLen != -1)
148                throw new IllegalStateException("No holdover buffer, but valid holdover length " + inputHoldoverBufferLen);
149        }
150
151        if (outputBufferLen.value < 0)
152            throw new IllegalStateException("Output buffer overrun length=" + outputBufferLen);
153
154        return true;
155    }
156
157    @Override
158    public void write(int b) throws IOException {
159        write(new byte[]{(byte) b});
160    }
161
162    @Override
163    public void write(byte[] b) throws IOException {
164        write(b, 0, b.length);
165    }
166
167    @Override
168    public void write(byte[] b, int off, int len) throws IOException {
169        // logState("Before setInput");
170        if (b == null)
171            throw new NullPointerException();
172        if (off < 0 || len < 0 || off > b.length - len)
173            throw new ArrayIndexOutOfBoundsException("Illegal range in buffer: Buffer length=" + b.length + ", offset=" + off + ", length=" + len);
174        if (inputHoldoverBuffer != null)
175            throw new IllegalStateException("Cannot accept input while holdover is present.");
176
177        inputHoldoverBuffer = Arrays.copyOfRange(b, off, off + len);
178        inputHoldoverBufferPos = off;
179        inputHoldoverBufferLen = len;
180        compact();
181
182        while (inputHoldoverBuffer != null || inputBufferLen == inputBuffer.length)
183            compress();
184
185        // logState("After setInput");
186    }
187
188    @Override
189    public void flush() throws IOException {
190        while (inputHoldoverBuffer != null || inputBufferLen > 0)
191            compress();
192    }
193
194    @Override
195    public void close() throws IOException {
196        flush();
197        out.close();
198    }
199
200    private void compact() {
201        if (inputHoldoverBuffer == null) {
202            assert testInvariants();
203            return;
204        }
205
206        int remaining = inputBuffer.length - inputBufferLen;
207        if (inputHoldoverBufferLen <= remaining) {  // Possibly even 0.
208            // We can put the entire holdover into the input buffer.
209            System.arraycopy(inputHoldoverBuffer, inputHoldoverBufferPos, inputBuffer, inputBufferLen, inputHoldoverBufferLen);
210            inputBufferLen += inputHoldoverBufferLen;
211            inputHoldoverBuffer = null;
212            inputHoldoverBufferPos = -1;
213            inputHoldoverBufferLen = -1;
214        } else if (inputBufferLen == 0) {
215            // We have no input, and will run zero-copy from the holdover buffer.
216        } else {
217            // We need to complete the input buffer block using holdover.
218            System.arraycopy(inputHoldoverBuffer, inputHoldoverBufferPos, inputBuffer, inputBufferLen, remaining);
219            inputBufferLen += remaining;
220            inputHoldoverBufferPos += remaining;
221            inputHoldoverBufferLen -= remaining;
222        }
223        assert testInvariants();
224    }
225
226    private void compress() throws IOException {
227        // logState("Before compress");
228
229        byte[] compressBuffer;
230        int compressBufferPos;
231        int compressBufferLen;
232
233        // Do compression.
234        if (inputBufferLen > 0) {
235            compressBuffer = inputBuffer.clone();
236            compressBufferPos = 0;
237            compressBufferLen = inputBufferLen;
238            inputBufferLen = 0;
239        } else if (inputHoldoverBuffer != null) {
240            compressBuffer = inputHoldoverBuffer.clone();
241            compressBufferPos = inputHoldoverBufferPos;
242            // If this is ever less than inputBuffer.length, then we should have copied it into the input buffer.
243            compressBufferLen = Math.min(inputBuffer.length, inputHoldoverBufferLen);
244            assert compressBufferLen == inputBuffer.length : "Compressing less than one block of holdover.";
245            inputHoldoverBufferPos += compressBufferLen;
246            inputHoldoverBufferLen -= compressBufferLen;
247        } else {
248            throw new IllegalStateException("compress() called with no input.");
249        }
250        compact();
251
252        // A sane implementation would do this here, but Hadoop breaks if we do.
253        // inputByteCount += compressBufferLen;
254        outputBufferLen.value = outputBuffer.length;
255        try {
256            int code = compressor.compress(compressBuffer, compressBufferPos, compressBufferLen, outputBuffer, 0, outputBufferLen);
257            if (code != LzoTransformer.LZO_E_OK) {
258                logState("LZO error: " + code);
259                // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(compressBuffer, compressBufferPos, compressBufferPos + compressBufferLen));
260                throw new IllegalArgumentException(compressor.toErrorString(code));
261            }
262        } catch (IndexOutOfBoundsException e) {
263            logState("IndexOutOfBoundsException: " + e);
264            // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(compressBuffer, compressBufferPos, compressBufferPos + compressBufferLen));
265            throw new IOException(e);
266        }
267        // LOG.info(compressBufferLen + "(" + Integer.toHexString(compressBufferLen) + ") -> " + outputBufferLen + "(" + Integer.toHexString(outputBufferLen.value) + ")");
268
269        writeBlock(compressBuffer, compressBufferPos, compressBufferLen, outputBuffer, 0, outputBufferLen.value);
270    }
271
272    protected void writeBlock(@Nonnull byte[] inputData, @Nonnegative int inputPos, @Nonnegative int inputLen, @Nonnull byte[] outputData, @Nonnegative int outputPos, @Nonnegative int outputLen)
273            throws IOException {
274        writeInt(inputLen);
275        writeInt(outputLen);
276        out.write(outputData, outputPos, outputLen);
277    }
278
279    protected void writeInt(int v) throws IOException {
280        out.write((v >>> 24) & 0xFF);
281        out.write((v >>> 16) & 0xFF);
282        out.write((v >>> 8) & 0xFF);
283        out.write(v & 0xFF);
284    }
285}