001/*
002 * This file is part of Hadoop-Gpl-Compression.
003 *
004 * Hadoop-Gpl-Compression is free software: you can redistribute it
005 * and/or modify it under the terms of the GNU General Public License
006 * as published by the Free Software Foundation, either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * Hadoop-Gpl-Compression is distributed in the hope that it will be
010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012 * GNU General Public License for more details.
013 *
014 * You should have received a copy of the GNU General Public License
015 * along with Hadoop-Gpl-Compression.  If not, see
016 * <http://www.gnu.org/licenses/>.
017 */
018package com.hadoop.compression.lzo;
019
020import java.io.EOFException;
021import java.io.IOException;
022import java.util.Arrays;
023import org.anarres.lzo.LzopInputStream;
024import org.apache.hadoop.conf.Configurable;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.io.compress.CompressionCodec;
031import org.apache.hadoop.io.compress.CompressionCodecFactory;
032
033/**
034 * Represents the lzo index.
035 */
036public class LzoIndex {
037
038    public static final String LZO_INDEX_SUFFIX = ".index";
039    public static final String LZO_TMP_INDEX_SUFFIX = ".index.tmp";
040    public static final long NOT_FOUND = -1;
041
042    private long[] blockPositions_;
043
044    /**
045     * Create an empty index, typically indicating no index file exists.
046     */
047    public LzoIndex() {
048    }
049
050    /**
051     * Create an index specifying the number of LZO blocks in the file.
052     * @param blocks The number of blocks in the LZO file the index is representing.
053     */
054    public LzoIndex(int blocks) {
055        blockPositions_ = new long[blocks];
056    }
057
058    /**
059     * Set the position for the block.
060     *
061     * @param blockNumber Block to set pos for.
062     * @param pos Position.
063     */
064    public void set(int blockNumber, long pos) {
065        blockPositions_[blockNumber] = pos;
066    }
067
068    /**
069     * Get the total number of blocks in the index file.
070     */
071    public int getNumberOfBlocks() {
072        return blockPositions_.length;
073    }
074
075    /**
076     * Get the block offset for a given block.
077     * @param block
078     * @return the byte offset into the file where this block starts.  It is the developer's
079     * responsibility to call getNumberOfBlocks() to know appropriate bounds on the parameter.
080     * The argument block should satisfy 0 <= block < getNumberOfBlocks().
081     */
082    public long getPosition(int block) {
083        return blockPositions_[block];
084    }
085
086    /**
087     * Find the next lzo block start from the given position.
088     *
089     * @param pos The position to start looking from.
090     * @return Either the start position of the block or -1 if it couldn't be found.
091     */
092    public long findNextPosition(long pos) {
093        int block = Arrays.binarySearch(blockPositions_, pos);
094
095        if (block >= 0) {
096            // direct hit on a block start position
097            return blockPositions_[block];
098        } else {
099            block = Math.abs(block) - 1;
100            if (block > blockPositions_.length - 1) {
101                return NOT_FOUND;
102            }
103            return blockPositions_[block];
104        }
105    }
106
107    /**
108     * Return true if the index has no blocks set.
109     *
110     * @return true if the index has no blocks set.
111     */
112    public boolean isEmpty() {
113        return blockPositions_ == null || blockPositions_.length == 0;
114    }
115
116    /**
117     * Nudge a given file slice start to the nearest LZO block start no earlier than
118     * the current slice start.
119     *
120     * @param start The current slice start
121     * @param end The current slice end
122     * @return The smallest block offset in the index between [start, end), or
123     *         NOT_FOUND if there is none such.
124     */
125    public long alignSliceStartToIndex(long start, long end) {
126        if (start != 0) {
127      // find the next block position from
128            // the start of the split
129            long newStart = findNextPosition(start);
130            if (newStart == NOT_FOUND || newStart >= end) {
131                return NOT_FOUND;
132            }
133            start = newStart;
134        }
135        return start;
136    }
137
138    /**
139     * Nudge a given file slice end to the nearest LZO block end no earlier than
140     * the current slice end.
141     *
142     * @param end The current slice end
143     * @param fileSize The size of the file, i.e. the max end position.
144     * @return The smallest block offset in the index between [end, fileSize].
145     */
146    public long alignSliceEndToIndex(long end, long fileSize) {
147        long newEnd = findNextPosition(end);
148        if (newEnd != NOT_FOUND) {
149            end = newEnd;
150        } else {
151      // didn't find the next position
152            // we have hit the end of the file
153            end = fileSize;
154        }
155        return end;
156    }
157
158    /**
159     * Read the index of the lzo file.
160
161     * @param fs The index file is on this file system.
162     * @param lzoFile the file whose index we are reading -- NOT the index file itself.  That is,
163     * pass in filename.lzo, not filename.lzo.index, for this parameter.
164     * @throws IOException
165     */
166    public static LzoIndex readIndex(FileSystem fs, Path lzoFile) throws IOException {
167        FSDataInputStream indexIn = null;
168        try {
169            Path indexFile = lzoFile.suffix(LZO_INDEX_SUFFIX);
170            if (!fs.exists(indexFile)) {
171                // return empty index, fall back to the unsplittable mode
172                return new LzoIndex();
173            }
174
175            long indexLen = fs.getFileStatus(indexFile).getLen();
176            int blocks = (int) (indexLen / 8);
177            LzoIndex index = new LzoIndex(blocks);
178            indexIn = fs.open(indexFile);
179            for (int i = 0; i < blocks; i++) {
180                index.set(i, indexIn.readLong());
181            }
182            return index;
183        } finally {
184            if (indexIn != null) {
185                indexIn.close();
186            }
187        }
188    }
189
190    /**
191     * Index an lzo file to allow the input format to split them into separate map
192     * jobs.
193     *
194     * @param fs File system that contains the file.
195     * @param lzoFile the lzo file to index.  For filename.lzo, the created index file will be
196     * filename.lzo.index.
197     * @throws IOException
198     */
199    public static void createIndex(FileSystem fs, Path lzoFile)
200            throws IOException {
201
202        Configuration conf = fs.getConf();
203        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
204        CompressionCodec codec = factory.getCodec(lzoFile);
205        if (null == codec) {
206            throw new IOException("Could not find codec for file " + lzoFile
207                    + " - you may need to add the LZO codec to your io.compression.codecs "
208                    + "configuration in core-site.xml");
209        }
210        ((Configurable) codec).setConf(conf);
211
212        FSDataInputStream is = null;
213        FSDataOutputStream os = null;
214        Path outputFile = lzoFile.suffix(LZO_INDEX_SUFFIX);
215        Path tmpOutputFile = lzoFile.suffix(LZO_TMP_INDEX_SUFFIX);
216
217    // Track whether an exception was thrown or not, so we know to either
218        // delete the tmp index file on failure, or rename it to the new index file on success.
219        boolean indexingSucceeded = false;
220        try {
221            is = fs.open(lzoFile);
222            os = fs.create(tmpOutputFile);
223            // Solely for reading the header
224            LzopInputStream lzis = new LzopInputStream(is);
225            int numCompressedChecksums = lzis.getCompressedChecksumCount();
226            int numDecompressedChecksums = lzis.getUncompressedChecksumCount();
227
228            while (true) {
229                // read and ignore, we just want to get to the next int
230                int uncompressedBlockSize = is.readInt();
231                if (uncompressedBlockSize == 0) {
232                    break;
233                } else if (uncompressedBlockSize < 0) {
234                    throw new EOFException();
235                }
236
237                int compressedBlockSize = is.readInt();
238                if (compressedBlockSize <= 0) {
239                    throw new IOException("Could not read compressed block size");
240                }
241
242                // See LzopInputStream.getCompressedData
243                boolean isUncompressedBlock = (uncompressedBlockSize == compressedBlockSize);
244                int numChecksumsToSkip = isUncompressedBlock
245                        ? numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums;
246                long pos = is.getPos();
247                // write the pos of the block start
248                os.writeLong(pos - 8);
249                // seek to the start of the next block, skip any checksums
250                is.seek(pos + compressedBlockSize + (4 * numChecksumsToSkip));
251            }
252            // If we're here, indexing was successful.
253            indexingSucceeded = true;
254        } finally {
255            // Close any open streams.
256            if (is != null) {
257                is.close();
258            }
259
260            if (os != null) {
261                os.close();
262            }
263
264            if (!indexingSucceeded) {
265                // If indexing didn't succeed (i.e. an exception was thrown), clean up after ourselves.
266                fs.delete(tmpOutputFile, false);
267            } else {
268                // Otherwise, rename filename.lzo.index.tmp to filename.lzo.index.
269                fs.rename(tmpOutputFile, outputFile);
270            }
271        }
272    }
273}