001/*
002 * This file is part of lzo-java, an implementation of LZO in Java.
003 * https://github.com/shevek/lzo-java
004 *
005 * The Java portion of this library is:
006 * Copyright (C) 2011 Shevek <shevek@anarres.org>
007 * All Rights Reserved.
008 *
009 * This file is based on a file from hadoop-gpl-compression.
010 *
011 * This library is free software; you can redistribute it and/or
012 * modify it under the terms of the GNU General Public License 
013 * as published by the Free Software Foundation; either version 
014 * 2 of the License, or (at your option) any later version.
015 *
016 * This library is distributed in the hope that it will be useful, 
017 * but WITHOUT ANY WARRANTY; without even the implied warranty
018 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
019 * See the GNU General Public License for more details.
020 *
021 * You should have received a copy of the GNU General Public
022 * License along with the LZO library; see the file COPYING.
023 * If not, see <http://www.gnu.org/licenses/> or write to the
024 * Free Software Foundation, Inc., 51 Franklin Street, Fifth
025 * Floor, Boston, MA 02110-1301, USA.
026 */
027package org.anarres.lzo.hadoop.codec;
028
029import java.io.IOException;
030import java.io.InputStream;
031import java.io.OutputStream;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.io.compress.BlockCompressorStream;
037import org.apache.hadoop.io.compress.BlockDecompressorStream;
038import org.apache.hadoop.io.compress.CompressionCodec;
039import org.apache.hadoop.io.compress.CompressionInputStream;
040import org.apache.hadoop.io.compress.CompressionOutputStream;
041import org.apache.hadoop.io.compress.Compressor;
042import org.apache.hadoop.io.compress.Decompressor;
043
044/**
045 * A {@link org.apache.hadoop.io.compress.CompressionCodec} for a streaming
046 * <b>lzo</b> compression/decompression pair.
047 * http://www.oberhumer.com/opensource/lzo/
048 * 
049 */
050public class LzoCodec extends Configured implements CompressionCodec {
051
052    private static final Log LOG = LogFactory.getLog(LzoCodec.class.getName());
053    public static final String LZO_COMPRESSOR_KEY = "io.compression.codec.lzo.compressor";
054    public static final String LZO_DECOMPRESSOR_KEY = "io.compression.codec.lzo.decompressor";
055    public static final String LZO_COMPRESSION_LEVEL_KEY = "io.compression.codec.lzo.compression.level";
056    public static final String LZO_BUFFER_SIZE_KEY = "io.compression.codec.lzo.buffersize";
057    public static final int DEFAULT_LZO_BUFFER_SIZE = 256 * 1024;
058    public static final int MAX_BLOCK_SIZE = 64 * 1024 * 1024;
059    public static final int UNDEFINED_COMPRESSION_LEVEL = -999;  // Constant from LzoCompressor.c
060
061    static LzoCompressor.CompressionStrategy getCompressionStrategy(Configuration conf) {
062        assert conf != null : "Configuration cannot be null!";
063        return LzoCompressor.CompressionStrategy.valueOf(
064                conf.get(LZO_COMPRESSOR_KEY,
065                        LzoCompressor.CompressionStrategy.LZO1X_1.name()));
066    }
067
068    static LzoDecompressor.CompressionStrategy getDecompressionStrategy(Configuration conf) {
069        assert conf != null : "Configuration cannot be null!";
070        return LzoDecompressor.CompressionStrategy.valueOf(
071                conf.get(LZO_DECOMPRESSOR_KEY,
072                        LzoDecompressor.CompressionStrategy.LZO1X.name()));
073    }
074
075    static int getCompressionLevel(Configuration conf) {
076        assert conf != null : "Configuration cannot be null!";
077        return conf.getInt(LZO_COMPRESSION_LEVEL_KEY, UNDEFINED_COMPRESSION_LEVEL);
078    }
079
080    static int getBufferSize(Configuration conf) {
081        assert conf != null : "Configuration cannot be null!";
082        return conf.getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE);
083    }
084
085    public static void setCompressionStrategy(Configuration conf,
086            LzoCompressor.CompressionStrategy strategy) {
087        assert conf != null : "Configuration cannot be null!";
088        conf.set(LZO_COMPRESSOR_KEY, strategy.name());
089    }
090
091    public static void setDecompressionStrategy(Configuration conf,
092            LzoDecompressor.CompressionStrategy strategy) {
093        assert conf != null : "Configuration cannot be null!";
094        conf.set(LZO_DECOMPRESSOR_KEY, strategy.name());
095    }
096
097    public static void setCompressionLevel(Configuration conf, int compressionLevel) {
098        assert conf != null : "Configuration cannot be null!";
099        conf.setInt(LZO_COMPRESSION_LEVEL_KEY, compressionLevel);
100    }
101
102    public static void setBufferSize(Configuration conf, int bufferSize) {
103        assert conf != null : "Configuration cannot be null!";
104        conf.setInt(LZO_BUFFER_SIZE_KEY, bufferSize);
105    }
106
107    @Override
108    public CompressionOutputStream createOutputStream(OutputStream out)
109            throws IOException {
110        return createOutputStream(out, createCompressor());
111    }
112
113    @Override
114    public CompressionOutputStream createOutputStream(OutputStream out,
115            Compressor compressor) throws IOException {
116        /**
117         * <b>http://www.oberhumer.com/opensource/lzo/lzofaq.php</b>
118         *
119         * How much can my data expand during compression ?
120         * ================================================
121         * LZO will expand incompressible data by a little amount.
122         * I still haven't computed the exact values, but I suggest using
123         * these formulas for a worst-case expansion calculation:
124         *
125         * Algorithm LZO1, LZO1A, LZO1B, LZO1C, LZO1F, LZO1X, LZO1Y, LZO1Z:
126         * ----------------------------------------------------------------
127         * output_block_size = input_block_size + (input_block_size / 16) + 64 + 3
128         *
129         * This is about 106% for a large block size.
130         *
131         * Algorithm LZO2A:
132         * ----------------
133         * output_block_size = input_block_size + (input_block_size / 8) + 128 + 3
134         */
135        // Create the lzo output-stream
136        Configuration conf = getConf();
137        LzoCompressor.CompressionStrategy strategy = getCompressionStrategy(conf);
138        int bufferSize = getBufferSize(conf);
139        int compressionOverhead = strategy.name().contains("LZO1")
140                ? (bufferSize >> 4) + 64 + 3
141                : (bufferSize >> 3) + 128 + 3;
142
143        return new BlockCompressorStream(out, compressor, bufferSize,
144                compressionOverhead);
145    }
146
147    @Override
148    public Class<? extends Compressor> getCompressorType() {
149        return LzoCompressor.class;
150    }
151
152    @Override
153    public Compressor createCompressor() {
154        Configuration conf = getConf();
155        LzoCompressor.CompressionStrategy strategy = getCompressionStrategy(conf);
156        int bufferSize = getBufferSize(conf);
157        return new LzoCompressor(strategy, bufferSize);
158    }
159
160    @Override
161    public CompressionInputStream createInputStream(InputStream in)
162            throws IOException {
163        return createInputStream(in, createDecompressor());
164    }
165
166    @Override
167    public CompressionInputStream createInputStream(InputStream in,
168            Decompressor decompressor)
169            throws IOException {
170        Configuration conf = getConf();
171        return new BlockDecompressorStream(in, decompressor, getBufferSize(conf));
172    }
173
174    @Override
175    public Class<? extends Decompressor> getDecompressorType() {
176        return LzoDecompressor.class;
177    }
178
179    @Override
180    public Decompressor createDecompressor() {
181        Configuration conf = getConf();
182        LzoDecompressor.CompressionStrategy strategy = getDecompressionStrategy(conf);
183        int bufferSize = getBufferSize(conf);
184        return new LzoDecompressor(strategy, bufferSize);
185    }
186
187    /**
188     * Get the default filename extension for this kind of compression.
189     * @return the extension including the '.'
190     */
191    @Override
192    public String getDefaultExtension() {
193        return ".lzo_deflate";
194    }
195}