001/*
002 * This file is part of lzo-java, an implementation of LZO in Java.
003 * https://github.com/shevek/lzo-java
004 *
005 * The Java portion of this library is:
006 * Copyright (C) 2011 Shevek <shevek@anarres.org>
007 * All Rights Reserved.
008 *
009 * This file is based on a file from hadoop-gpl-compression.
010 *
011 * This library is free software; you can redistribute it and/or
012 * modify it under the terms of the GNU General Public License 
013 * as published by the Free Software Foundation; either version 
014 * 2 of the License, or (at your option) any later version.
015 *
016 * This library is distributed in the hope that it will be useful, 
017 * but WITHOUT ANY WARRANTY; without even the implied warranty
018 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
019 * See the GNU General Public License for more details.
020 *
021 * You should have received a copy of the GNU General Public
022 * License along with the LZO library; see the file COPYING.
023 * If not, see <http://www.gnu.org/licenses/> or write to the
024 * Free Software Foundation, Inc., 51 Franklin Street, Fifth
025 * Floor, Boston, MA 02110-1301, USA.
026 */
027package org.anarres.lzo.hadoop.codec;
028
029import java.io.IOException;
030import org.anarres.lzo.LzoAlgorithm;
031import org.anarres.lzo.LzoConstraint;
032import org.anarres.lzo.LzoLibrary;
033import org.anarres.lzo.LzoTransformer;
034import org.anarres.lzo.lzo_uintp;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.apache.hadoop.io.compress.Decompressor;
038
039/**
040 * A {@link Decompressor} based on the lzo algorithm.
041 * http://www.oberhumer.com/opensource/lzo/
042 *
043 */
044public class LzoDecompressor implements Decompressor {
045
046    private static final Log LOG = LogFactory.getLog(LzoDecompressor.class);
047
048    public static enum CompressionStrategy {
049
050        /**
051         * lzo1 algorithms.
052         */
053        LZO1(LzoAlgorithm.LZO1),
054        /**
055         * lzo1a algorithms.
056         */
057        LZO1A(LzoAlgorithm.LZO1A),
058        /**
059         * lzo1b algorithms.
060         */
061        LZO1B(LzoAlgorithm.LZO1B),
062        LZO1B_SAFE(LzoAlgorithm.LZO1B, LzoConstraint.SAFETY),
063        /**
064         * lzo1c algorithms.
065         */
066        LZO1C(LzoAlgorithm.LZO1C),
067        LZO1C_SAFE(LzoAlgorithm.LZO1C, LzoConstraint.SAFETY),
068        LZO1C_ASM(LzoAlgorithm.LZO1C),
069        LZO1C_ASM_SAFE(LzoAlgorithm.LZO1C, LzoConstraint.SAFETY),
070        /**
071         * lzo1f algorithms.
072         */
073        LZO1F(LzoAlgorithm.LZO1F),
074        LZO1F_SAFE(LzoAlgorithm.LZO1F, LzoConstraint.SAFETY),
075        LZO1F_ASM_FAST(LzoAlgorithm.LZO1F),
076        LZO1F_ASM_FAST_SAFE(LzoAlgorithm.LZO1F, LzoConstraint.SAFETY),
077        /**
078         * lzo1x algorithms.
079         */
080        LZO1X(LzoAlgorithm.LZO1X),
081        LZO1X_SAFE(LzoAlgorithm.LZO1X, LzoConstraint.SAFETY),
082        LZO1X_ASM(LzoAlgorithm.LZO1X),
083        LZO1X_ASM_SAFE(LzoAlgorithm.LZO1X, LzoConstraint.SAFETY),
084        LZO1X_ASM_FAST(LzoAlgorithm.LZO1X, LzoConstraint.SPEED),
085        LZO1X_ASM_FAST_SAFE(LzoAlgorithm.LZO1X, LzoConstraint.SAFETY),
086        /**
087         * lzo1y algorithms.
088         */
089        LZO1Y(LzoAlgorithm.LZO1Y),
090        LZO1Y_SAFE(LzoAlgorithm.LZO1Y, LzoConstraint.SAFETY),
091        LZO1Y_ASM(LzoAlgorithm.LZO1Y),
092        LZO1Y_ASM_SAFE(LzoAlgorithm.LZO1Y, LzoConstraint.SAFETY),
093        LZO1Y_ASM_FAST(LzoAlgorithm.LZO1Y, LzoConstraint.SPEED),
094        LZO1Y_ASM_FAST_SAFE(LzoAlgorithm.LZO1Y, LzoConstraint.SAFETY),
095        /**
096         * lzo1z algorithms.
097         */
098        LZO1Z(LzoAlgorithm.LZO1Z),
099        LZO1Z_SAFE(LzoAlgorithm.LZO1Z, LzoConstraint.SAFETY),
100        /**
101         * lzo2a algorithms.
102         */
103        LZO2A(LzoAlgorithm.LZO2A),
104        LZO2A_SAFE(LzoAlgorithm.LZO2A, LzoConstraint.SAFETY);
105        private final LzoAlgorithm algorithm;
106        private final LzoConstraint constraint;
107
108        private CompressionStrategy(LzoAlgorithm algorithm, LzoConstraint constraint) {
109            this.algorithm = algorithm;
110            this.constraint = constraint;
111        }
112
113        private CompressionStrategy(LzoAlgorithm algorithm) {
114            this(algorithm, null);
115        }
116
117        public org.anarres.lzo.LzoDecompressor newDecompressor() {
118            return LzoLibrary.getInstance().newDecompressor(algorithm, constraint);
119        }
120    }; // CompressionStrategy
121    private final org.anarres.lzo.LzoDecompressor decompressor;
122    private byte[] outputBuffer;
123    private int outputBufferPos;
124    private final lzo_uintp outputBufferLen = new lzo_uintp();  // Also, end, since we base outputBuffer at 0.
125    // private boolean finished;   // We need this because BlockCompressorStream's state machine doesn't distinguish between no-data, and all-data-decoded.
126
127    /**
128     * Creates a new lzo decompressor.
129     *
130     * @param strategy lzo decompression algorithm
131     * @param outputBufferSize size of the output buffer
132     */
133    public LzoDecompressor(CompressionStrategy strategy, int outputBufferSize) {
134        this.decompressor = strategy.newDecompressor();
135        setOutputBufferSize(outputBufferSize);
136    }
137
138    public void setOutputBufferSize(int outputBufferSize) {
139        if (outputBuffer == null || outputBufferSize > outputBuffer.length)
140            outputBuffer = new byte[outputBufferSize];
141    }
142
143    /**
144     * Creates a new lzo decompressor.
145     */
146    public LzoDecompressor() {
147        this(CompressionStrategy.LZO1X, 64 * 1024);
148    }
149
150    private void logState(String when) {
151        LOG.info("\n");
152        LOG.info(when + " Output buffer pos=" + outputBufferPos + "; length=" + outputBufferLen);
153        // testInvariants();
154    }
155
156    @Override
157    public void setInput(byte[] b, int off, int len) {
158        if (b == null)
159            throw new NullPointerException();
160        if (off < 0 || len < 0 || off > b.length - len)
161            throw new ArrayIndexOutOfBoundsException("Illegal range in buffer: Buffer length=" + b.length + ", offset=" + off + ", length=" + len);
162        if (!needsInput())
163            throw new IllegalStateException("I don't need input: pos=" + outputBufferPos + "; len=" + outputBufferLen);
164        // logState("Before setInput");
165        // LOG.info("Decompressing " + len + " bytes at " + off);
166        outputBufferLen.value = outputBuffer.length;
167        // try {
168        try {
169            outputBufferPos = 0;
170            int code = decompressor.decompress(b, off, len, outputBuffer, outputBufferPos, outputBufferLen);
171            if (code != LzoTransformer.LZO_E_OK) {
172                logState("LZO error: " + code);
173                // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(b, off, off + len));
174                throw new IllegalArgumentException(decompressor.toErrorString(code));
175            }
176        } catch (IndexOutOfBoundsException e) {
177            logState("IndexOutOfBoundsException: " + e);
178            // FileUtils.writeByteArrayToFile(new File("bytes.out"), Arrays.copyOfRange(b, off, off + len));
179            throw e;
180        }
181        // } catch (IOException _e) {
182        // throw new RuntimeException(_e);
183        // }
184        // LOG.info(len + " -> " + outputBufferLen);
185        // logState("After setInput");
186    }
187
188    @Override
189    public void setDictionary(byte[] b, int off, int len) {
190        // nop
191    }
192
193    @Override
194    public boolean needsInput() {
195        // logState("Before needsInput");
196        return outputBufferLen.value <= 0;
197    }
198
199    @Override
200    public boolean needsDictionary() {
201        return false;
202    }
203
204    @Override
205    public boolean finished() {
206        // logState("Before finished");
207// https://github.com/hortonworks/hadoop-lzo/commit/729bcc3d0d86fefb5a9b0a76fbcdbc20bc497db8
208//              if (outputBufferLen.value == 0 && outputBufferPos == 0)
209//                      return false;
210        return outputBufferLen.value <= 0;
211        // return false;
212    }
213
214    @Override
215    public int decompress(byte[] b, int off, int len)
216            throws IOException {
217        if (b == null)
218            throw new NullPointerException();
219        if (off < 0 || len < 0 || off > b.length - len)
220            throw new ArrayIndexOutOfBoundsException("Illegal range in buffer: Buffer length=" + b.length + ", offset=" + off + ", length=" + len);
221
222        // logState("Before decompress");
223        len = Math.min(len, outputBufferLen.value);
224        System.arraycopy(outputBuffer, outputBufferPos, b, off, len);
225        outputBufferPos += len;
226        outputBufferLen.value -= len;
227
228        return len;
229    }
230
231    @Override
232    public int getRemaining() {
233        return outputBufferLen.value;
234    }
235
236    @Override
237    public void reset() {
238        outputBufferPos = 0;
239        outputBufferLen.value = 0;
240    }
241
242    @Override
243    public void end() {
244    }
245}