001/*
002 * This file is part of lzo-java, an implementation of LZO in Java.
003 * https://github.com/shevek/lzo-java
004 *
005 * The Java portion of this library is:
006 * Copyright (C) 2011 Shevek <shevek@anarres.org>
007 * All Rights Reserved.
008 *
009 * This file is based on a file from hadoop-gpl-compression.
010 *
011 * This library is free software; you can redistribute it and/or
012 * modify it under the terms of the GNU General Public License 
013 * as published by the Free Software Foundation; either version 
014 * 2 of the License, or (at your option) any later version.
015 *
016 * This library is distributed in the hope that it will be useful, 
017 * but WITHOUT ANY WARRANTY; without even the implied warranty
018 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
019 * See the GNU General Public License for more details.
020 *
021 * You should have received a copy of the GNU General Public
022 * License along with the LZO library; see the file COPYING.
023 * If not, see <http://www.gnu.org/licenses/> or write to the
024 * Free Software Foundation, Inc., 51 Franklin Street, Fifth
025 * Floor, Boston, MA 02110-1301, USA.
026 */
027package org.anarres.lzo.hadoop.codec;
028
029import java.io.IOException;
030import org.anarres.lzo.LzoAlgorithm;
031import org.anarres.lzo.LzoConstraint;
032import org.anarres.lzo.LzoLibrary;
033import org.anarres.lzo.LzoTransformer;
034import org.anarres.lzo.SuppressWarnings;
035import org.anarres.lzo.lzo_uintp;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.io.compress.Compressor;
040
041/**
042 * A {@link Compressor} based on the lzo algorithm.
043 * http://www.oberhumer.com/opensource/lzo/
044 *
045 */
046public class LzoCompressor implements Compressor {
047
048    private static final Log LOG = LogFactory.getLog(LzoCompressor.class);
049
050    /**
051     * The compression algorithm for lzo library.
052     */
053    public static enum CompressionStrategy {
054
055        /**
056         * lzo1 algorithms.
057         */
058        LZO1(LzoAlgorithm.LZO1),
059        LZO1_99(LzoAlgorithm.LZO1, LzoConstraint.COMPRESSION),
060        /**
061         * lzo1a algorithms.
062         */
063        LZO1A(LzoAlgorithm.LZO1),
064        LZO1A_99(LzoAlgorithm.LZO1, LzoConstraint.COMPRESSION),
065        /**
066         * lzo1b algorithms.
067         */
068        LZO1B(LzoAlgorithm.LZO1),
069        LZO1B_BEST_COMPRESSION(LzoAlgorithm.LZO1, LzoConstraint.COMPRESSION),
070        LZO1B_BEST_SPEED(LzoAlgorithm.LZO1, LzoConstraint.SPEED),
071        LZO1B_1(LzoAlgorithm.LZO1B),
072        LZO1B_2(LzoAlgorithm.LZO1B),
073        LZO1B_3(LzoAlgorithm.LZO1B),
074        LZO1B_4(LzoAlgorithm.LZO1B),
075        LZO1B_5(LzoAlgorithm.LZO1B),
076        LZO1B_6(LzoAlgorithm.LZO1B),
077        LZO1B_7(LzoAlgorithm.LZO1B),
078        LZO1B_8(LzoAlgorithm.LZO1B),
079        LZO1B_9(LzoAlgorithm.LZO1B),
080        LZO1B_99(LzoAlgorithm.LZO1B, LzoConstraint.COMPRESSION),
081        LZO1B_999(LzoAlgorithm.LZO1B, LzoConstraint.COMPRESSION),
082        /**
083         * lzo1c algorithms.
084         */
085        LZO1C(LzoAlgorithm.LZO1C),
086        LZO1C_BEST_COMPRESSION(LzoAlgorithm.LZO1C, LzoConstraint.COMPRESSION),
087        LZO1C_BEST_SPEED(LzoAlgorithm.LZO1C, LzoConstraint.SPEED),
088        LZO1C_1(LzoAlgorithm.LZO1C),
089        LZO1C_2(LzoAlgorithm.LZO1C),
090        LZO1C_3(LzoAlgorithm.LZO1C),
091        LZO1C_4(LzoAlgorithm.LZO1C),
092        LZO1C_5(LzoAlgorithm.LZO1C),
093        LZO1C_6(LzoAlgorithm.LZO1C),
094        LZO1C_7(LzoAlgorithm.LZO1C),
095        LZO1C_8(LzoAlgorithm.LZO1C),
096        LZO1C_9(LzoAlgorithm.LZO1C),
097        LZO1C_99(LzoAlgorithm.LZO1C, LzoConstraint.COMPRESSION),
098        LZO1C_999(LzoAlgorithm.LZO1C, LzoConstraint.COMPRESSION),
099        /**
100         * lzo1f algorithms.
101         */
102        LZO1F_1(LzoAlgorithm.LZO1F),
103        LZO1F_999(LzoAlgorithm.LZO1F, LzoConstraint.COMPRESSION),
104        /**
105         * lzo1x algorithms.
106         */
107        LZO1X_1(LzoAlgorithm.LZO1X),
108        LZO1X_11(LzoAlgorithm.LZO1X, LzoConstraint.MEMORY),
109        LZO1X_12(LzoAlgorithm.LZO1X),
110        LZO1X_15(LzoAlgorithm.LZO1X),
111        LZO1X_999(LzoAlgorithm.LZO1X, LzoConstraint.COMPRESSION),
112        /**
113         * lzo1y algorithms.
114         */
115        LZO1Y_1(LzoAlgorithm.LZO1Y),
116        LZO1Y_999(LzoAlgorithm.LZO1Y, LzoConstraint.COMPRESSION),
117        /**
118         * lzo1z algorithms.
119         */
120        LZO1Z_999(LzoAlgorithm.LZO1Z, LzoConstraint.COMPRESSION),
121        /**
122         * lzo2a algorithms.
123         */
124        LZO2A_999(LzoAlgorithm.LZO2A, LzoConstraint.COMPRESSION);
125        private final LzoAlgorithm algorithm;
126        private final LzoConstraint constraint;
127
128        private CompressionStrategy(LzoAlgorithm algorithm, LzoConstraint constraint) {
129            this.algorithm = algorithm;
130            this.constraint = constraint;
131        }
132
133        private CompressionStrategy(LzoAlgorithm algorithm) {
134            this(algorithm, null);
135        }
136
137        public org.anarres.lzo.LzoCompressor newCompressor() {
138            return LzoLibrary.getInstance().newCompressor(algorithm, constraint);
139        }
140    }; // CompressionStrategy
141    private final org.anarres.lzo.LzoCompressor compressor; // The lzo compression algorithm.
142    private final byte[] inputBuffer;
143    private int inputBufferLen;
144    private byte[] inputHoldoverBuffer;
145    private int inputHoldoverBufferPos;
146    private int inputHoldoverBufferLen;
147    private final byte[] outputBuffer;
148    private int outputBufferPos;
149    private final lzo_uintp outputBufferLen = new lzo_uintp();  // Also, end, since we base outputBuffer at 0.
150    private int inputByteCount;
151    private int outputByteCount;
152    private boolean finished;   // Interaction with a brain-damaged contract from BlockCompressorStream.
153
154    /**
155     * Creates a new compressor using the specified {@link CompressionStrategy}.
156     *
157     * @param strategy lzo compression algorithm to use
158     * @param outputBufferSize size of the output buffer to be used.
159     */
160    public LzoCompressor(CompressionStrategy strategy, int outputBufferSize) {
161        this.compressor = strategy.newCompressor();
162        this.inputBuffer = new byte[outputBufferSize];
163        this.outputBuffer = new byte[outputBufferSize + (outputBufferSize >> 3) + 256];
164        reset();
165    }
166
167    /**
168     * Creates a new compressor with the default lzo1x_1 compression.
169     */
170    public LzoCompressor() {
171        this(CompressionStrategy.LZO1X_1, 64 * 1024);
172    }
173
174    private void logState(String when) {
175        LOG.info("\n");
176        LOG.info(when + " Input buffer length=" + inputBufferLen + "/" + inputBuffer.length);
177        if (inputHoldoverBuffer == null) {
178            LOG.info(when + " Input holdover = null");
179        } else {
180            LOG.info(when + " Input holdover pos=" + inputHoldoverBufferPos + "; length=" + inputHoldoverBufferLen);
181        }
182        LOG.info(when + " Output buffer pos=" + outputBufferPos + "; length=" + outputBufferLen + "/" + outputBuffer.length);
183        // LOG.info(when + " Read=" + inputByteCount + "; Written=" + outputByteCount + "; Finished = " + finished);
184        testInvariants();
185    }
186
187    private boolean testInvariants() {
188        if (inputHoldoverBuffer != null) {
189            if (inputBufferLen != 0 && inputBufferLen != inputBuffer.length)
190                throw new IllegalStateException("Funny input buffer length " + inputBufferLen + " with array size " + inputBuffer.length + " and holdover.");
191            if (inputHoldoverBufferPos < 0)
192                throw new IllegalStateException("Using holdover buffer, but invalid holdover position " + inputHoldoverBufferPos);
193            if (inputHoldoverBufferLen < 0)
194                throw new IllegalStateException("Using holdover buffer, but invalid holdover length " + inputHoldoverBufferLen);
195        } else {
196            if (inputHoldoverBufferPos != -1)
197                throw new IllegalStateException("No holdover buffer, but valid holdover position " + inputHoldoverBufferPos);
198            if (inputHoldoverBufferLen != -1)
199                throw new IllegalStateException("No holdover buffer, but valid holdover length " + inputHoldoverBufferLen);
200        }
201
202        if (outputBufferLen.value < 0)
203            throw new IllegalStateException("Output buffer overrun pos=" + outputBufferPos + "; len=" + outputBufferLen);
204
205        return true;
206    }
207
208    /**
209     * {@inheritDoc}
210     *
211     * WARNING: This method retains a pointer to the user's buffer.
212     */
213    @Override
214    @SuppressWarnings("EI_EXPOSE_REP2")
215    public void setInput(byte[] b, int off, int len) {
216        // logState("Before setInput");
217        if (b == null)
218            throw new NullPointerException();
219        if (off < 0 || len < 0 || off > b.length - len)
220            throw new ArrayIndexOutOfBoundsException("Illegal range in buffer: Buffer length=" + b.length + ", offset=" + off + ", length=" + len);
221        if (inputHoldoverBuffer != null)
222            throw new IllegalStateException("Cannot accept input while holdover is present.");
223
224        inputHoldoverBuffer = b;
225        inputHoldoverBufferPos = off;
226        inputHoldoverBufferLen = len;
227        compact();
228
229        inputByteCount += len;  // Unfortunately, we have to do this here. This is so, so, so wrong.
230
231        // logState("After setInput");
232    }
233
234    @Override
235    public void setDictionary(byte[] b, int off, int len) {
236        // nop
237    }
238
239    /** {@inheritDoc} */
240    @Override
241    public boolean needsInput() {
242        compact();
243        if (inputHoldoverBuffer != null)
244            return false;
245        return inputBufferLen < inputBuffer.length;
246    }
247
248    @Override
249    public void finish() {
250        finished = true;
251    }
252
253    @Override
254    public boolean finished() {
255        assert testInvariants();
256        return finished && outputBufferLen.value == 0 && inputBufferLen == 0 && inputHoldoverBuffer == null;
257    }
258
259    private void compact() {
260        if (inputHoldoverBuffer == null) {
261            assert testInvariants();
262            return;
263        }
264
265        int remaining = inputBuffer.length - inputBufferLen;
266        if (inputHoldoverBufferLen <= remaining) {
267            // We can put the entire holdover into the input buffer.
268            System.arraycopy(inputHoldoverBuffer, inputHoldoverBufferPos, inputBuffer, inputBufferLen, inputHoldoverBufferLen);
269            inputBufferLen += inputHoldoverBufferLen;
270            inputHoldoverBuffer = null;
271            inputHoldoverBufferPos = -1;
272            inputHoldoverBufferLen = -1;
273        } else if (inputBufferLen == 0) {
274            // We have no input, and will run zero-copy from the holdover buffer.
275        } else {
276            // We need to complete the input buffer block using holdover.
277            System.arraycopy(inputHoldoverBuffer, inputHoldoverBufferPos, inputBuffer, inputBufferLen, remaining);
278            inputBufferLen += remaining;
279            inputHoldoverBufferPos += remaining;
280            inputHoldoverBufferLen -= remaining;
281        }
282        assert testInvariants();
283    }
284
285    @Override
286    public int compress(byte[] b, int off, int len) throws IOException {
287        // logState("Before compress");
288        if (b == null)
289            throw new NullPointerException();
290        if (off < 0 || len < 0 || off > b.length - len)
291            throw new ArrayIndexOutOfBoundsException("Illegal range in buffer: Buffer length=" + b.length + ", offset=" + off + ", length=" + len);
292
293        if (outputBufferLen.value == 0) {
294            byte[] compressBuffer;
295            int compressBufferPos;
296            int compressBufferLen;
297
298            // Do compression.
299            if (inputBufferLen > 0) {
300                compressBuffer = inputBuffer;
301                compressBufferPos = 0;
302                compressBufferLen = inputBufferLen;
303                inputBufferLen = 0;
304            } else if (inputHoldoverBuffer != null) {
305                compressBuffer = inputHoldoverBuffer;
306                compressBufferPos = inputHoldoverBufferPos;
307                // If this is ever less than inputBuffer.length, then we should have copied it into the input buffer.
308                compressBufferLen = Math.min(inputBuffer.length, inputHoldoverBufferLen);
309                assert compressBufferLen == inputBuffer.length : "Compressing less than one block of holdover.";
310                inputHoldoverBufferPos += compressBufferLen;
311                inputHoldoverBufferLen -= compressBufferLen;
312            } else {
313                throw new IllegalStateException("compress() called with no input.");
314            }
315            compact();
316
317            // A sane implementation would do this here, but Hadoop breaks if we do.
318            // inputByteCount += compressBufferLen;
319            outputBufferPos = 0;
320            outputBufferLen.value = outputBuffer.length;
321            try {
322                int code = compressor.compress(compressBuffer, compressBufferPos, compressBufferLen, outputBuffer, outputBufferPos, outputBufferLen);
323                if (code != LzoTransformer.LZO_E_OK) {
324                    logState("LZO error: " + code);
325                    // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(compressBuffer, compressBufferPos, compressBufferPos + compressBufferLen));
326                    throw new IllegalArgumentException(compressor.toErrorString(code));
327                }
328            } catch (IndexOutOfBoundsException e) {
329                logState("IndexOutOfBoundsException: " + e);
330                // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(compressBuffer, compressBufferPos, compressBufferPos + compressBufferLen));
331                throw new IOException(e);
332            }
333            // LOG.info(compressBufferLen + "(" + Integer.toHexString(compressBufferLen) + ") -> " + outputBufferLen + "(" + Integer.toHexString(outputBufferLen.value) + ")");
334        }
335
336        len = Math.min(len, outputBufferLen.value);
337        System.arraycopy(outputBuffer, outputBufferPos, b, off, len);
338        outputBufferPos += len;
339        outputBufferLen.value -= len;
340
341        outputByteCount += len;
342
343        // logState("After compress; len=" + len);
344        return len;
345    }
346
347    // This method is called from the constructor, and must not be overridden.
348    private void _reset() {
349        inputByteCount = 0;
350        outputByteCount = 0;
351        inputBufferLen = 0;
352        inputHoldoverBuffer = null;
353        inputHoldoverBufferPos = -1;
354        inputHoldoverBufferLen = -1;
355        outputBufferPos = 0;
356        outputBufferLen.value = 0;
357        finished = false;
358    }
359
360    @Override
361    public void reset() {
362        _reset();
363    }
364
365    @Override
366    public synchronized void reinit(Configuration conf) {
367        _reset();
368    }
369
370    /**
371     * Return number of bytes given to this compressor since last reset.
372     */
373    @Override
374    public synchronized long getBytesRead() {
375        return inputByteCount;
376    }
377
378    /**
379     * Return number of bytes consumed by callers of compress since last reset.
380     */
381    @Override
382    public long getBytesWritten() {
383        return outputByteCount;
384    }
385
386    /**
387     * Noop.
388     */
389    @Override
390    public void end() {
391        // nop
392    }
393}