001/* 002 * This file is part of lzo-java, an implementation of LZO in Java. 003 * https://github.com/shevek/lzo-java 004 * 005 * The Java portion of this library is: 006 * Copyright (C) 2011 Shevek <shevek@anarres.org> 007 * All Rights Reserved. 008 * 009 * This file is based on a file from hadoop-gpl-compression. 010 * 011 * This library is free software; you can redistribute it and/or 012 * modify it under the terms of the GNU General Public License 013 * as published by the Free Software Foundation; either version 014 * 2 of the License, or (at your option) any later version. 015 * 016 * This library is distributed in the hope that it will be useful, 017 * but WITHOUT ANY WARRANTY; without even the implied warranty 018 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 019 * See the GNU General Public License for more details. 020 * 021 * You should have received a copy of the GNU General Public 022 * License along with the LZO library; see the file COPYING. 023 * If not, see <http://www.gnu.org/licenses/> or write to the 024 * Free Software Foundation, Inc., 51 Franklin Street, Fifth 025 * Floor, Boston, MA 02110-1301, USA. 026 */ 027package org.anarres.lzo.hadoop.codec; 028 029import java.io.IOException; 030import java.io.InputStream; 031import java.io.OutputStream; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.io.compress.BlockCompressorStream; 037import org.apache.hadoop.io.compress.BlockDecompressorStream; 038import org.apache.hadoop.io.compress.CompressionCodec; 039import org.apache.hadoop.io.compress.CompressionInputStream; 040import org.apache.hadoop.io.compress.CompressionOutputStream; 041import org.apache.hadoop.io.compress.Compressor; 042import org.apache.hadoop.io.compress.Decompressor; 043 044/** 045 * A {@link org.apache.hadoop.io.compress.CompressionCodec} for a streaming 046 * <b>lzo</b> compression/decompression pair. 047 * http://www.oberhumer.com/opensource/lzo/ 048 * 049 */ 050public class LzoCodec extends Configured implements CompressionCodec { 051 052 private static final Log LOG = LogFactory.getLog(LzoCodec.class.getName()); 053 public static final String LZO_COMPRESSOR_KEY = "io.compression.codec.lzo.compressor"; 054 public static final String LZO_DECOMPRESSOR_KEY = "io.compression.codec.lzo.decompressor"; 055 public static final String LZO_COMPRESSION_LEVEL_KEY = "io.compression.codec.lzo.compression.level"; 056 public static final String LZO_BUFFER_SIZE_KEY = "io.compression.codec.lzo.buffersize"; 057 public static final int DEFAULT_LZO_BUFFER_SIZE = 256 * 1024; 058 public static final int MAX_BLOCK_SIZE = 64 * 1024 * 1024; 059 public static final int UNDEFINED_COMPRESSION_LEVEL = -999; // Constant from LzoCompressor.c 060 061 static LzoCompressor.CompressionStrategy getCompressionStrategy(Configuration conf) { 062 assert conf != null : "Configuration cannot be null!"; 063 return LzoCompressor.CompressionStrategy.valueOf( 064 conf.get(LZO_COMPRESSOR_KEY, 065 LzoCompressor.CompressionStrategy.LZO1X_1.name())); 066 } 067 068 static LzoDecompressor.CompressionStrategy getDecompressionStrategy(Configuration conf) { 069 assert conf != null : "Configuration cannot be null!"; 070 return LzoDecompressor.CompressionStrategy.valueOf( 071 conf.get(LZO_DECOMPRESSOR_KEY, 072 LzoDecompressor.CompressionStrategy.LZO1X.name())); 073 } 074 075 static int getCompressionLevel(Configuration conf) { 076 assert conf != null : "Configuration cannot be null!"; 077 return conf.getInt(LZO_COMPRESSION_LEVEL_KEY, UNDEFINED_COMPRESSION_LEVEL); 078 } 079 080 static int getBufferSize(Configuration conf) { 081 assert conf != null : "Configuration cannot be null!"; 082 return conf.getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE); 083 } 084 085 public static void setCompressionStrategy(Configuration conf, 086 LzoCompressor.CompressionStrategy strategy) { 087 assert conf != null : "Configuration cannot be null!"; 088 conf.set(LZO_COMPRESSOR_KEY, strategy.name()); 089 } 090 091 public static void setDecompressionStrategy(Configuration conf, 092 LzoDecompressor.CompressionStrategy strategy) { 093 assert conf != null : "Configuration cannot be null!"; 094 conf.set(LZO_DECOMPRESSOR_KEY, strategy.name()); 095 } 096 097 public static void setCompressionLevel(Configuration conf, int compressionLevel) { 098 assert conf != null : "Configuration cannot be null!"; 099 conf.setInt(LZO_COMPRESSION_LEVEL_KEY, compressionLevel); 100 } 101 102 public static void setBufferSize(Configuration conf, int bufferSize) { 103 assert conf != null : "Configuration cannot be null!"; 104 conf.setInt(LZO_BUFFER_SIZE_KEY, bufferSize); 105 } 106 107 @Override 108 public CompressionOutputStream createOutputStream(OutputStream out) 109 throws IOException { 110 return createOutputStream(out, createCompressor()); 111 } 112 113 @Override 114 public CompressionOutputStream createOutputStream(OutputStream out, 115 Compressor compressor) throws IOException { 116 /** 117 * <b>http://www.oberhumer.com/opensource/lzo/lzofaq.php</b> 118 * 119 * How much can my data expand during compression ? 120 * ================================================ 121 * LZO will expand incompressible data by a little amount. 122 * I still haven't computed the exact values, but I suggest using 123 * these formulas for a worst-case expansion calculation: 124 * 125 * Algorithm LZO1, LZO1A, LZO1B, LZO1C, LZO1F, LZO1X, LZO1Y, LZO1Z: 126 * ---------------------------------------------------------------- 127 * output_block_size = input_block_size + (input_block_size / 16) + 64 + 3 128 * 129 * This is about 106% for a large block size. 130 * 131 * Algorithm LZO2A: 132 * ---------------- 133 * output_block_size = input_block_size + (input_block_size / 8) + 128 + 3 134 */ 135 // Create the lzo output-stream 136 Configuration conf = getConf(); 137 LzoCompressor.CompressionStrategy strategy = getCompressionStrategy(conf); 138 int bufferSize = getBufferSize(conf); 139 int compressionOverhead = strategy.name().contains("LZO1") 140 ? (bufferSize >> 4) + 64 + 3 141 : (bufferSize >> 3) + 128 + 3; 142 143 return new BlockCompressorStream(out, compressor, bufferSize, 144 compressionOverhead); 145 } 146 147 @Override 148 public Class<? extends Compressor> getCompressorType() { 149 return LzoCompressor.class; 150 } 151 152 @Override 153 public Compressor createCompressor() { 154 Configuration conf = getConf(); 155 LzoCompressor.CompressionStrategy strategy = getCompressionStrategy(conf); 156 int bufferSize = getBufferSize(conf); 157 return new LzoCompressor(strategy, bufferSize); 158 } 159 160 @Override 161 public CompressionInputStream createInputStream(InputStream in) 162 throws IOException { 163 return createInputStream(in, createDecompressor()); 164 } 165 166 @Override 167 public CompressionInputStream createInputStream(InputStream in, 168 Decompressor decompressor) 169 throws IOException { 170 Configuration conf = getConf(); 171 return new BlockDecompressorStream(in, decompressor, getBufferSize(conf)); 172 } 173 174 @Override 175 public Class<? extends Decompressor> getDecompressorType() { 176 return LzoDecompressor.class; 177 } 178 179 @Override 180 public Decompressor createDecompressor() { 181 Configuration conf = getConf(); 182 LzoDecompressor.CompressionStrategy strategy = getDecompressionStrategy(conf); 183 int bufferSize = getBufferSize(conf); 184 return new LzoDecompressor(strategy, bufferSize); 185 } 186 187 /** 188 * Get the default filename extension for this kind of compression. 189 * @return the extension including the '.' 190 */ 191 @Override 192 public String getDefaultExtension() { 193 return ".lzo_deflate"; 194 } 195}