001/* 002 * This file is part of Hadoop-Gpl-Compression. 003 * 004 * Hadoop-Gpl-Compression is free software: you can redistribute it 005 * and/or modify it under the terms of the GNU General Public License 006 * as published by the Free Software Foundation, either version 3 of 007 * the License, or (at your option) any later version. 008 * 009 * Hadoop-Gpl-Compression is distributed in the hope that it will be 010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty 011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 012 * GNU General Public License for more details. 013 * 014 * You should have received a copy of the GNU General Public License 015 * along with Hadoop-Gpl-Compression. If not, see 016 * <http://www.gnu.org/licenses/>. 017 */ 018package com.hadoop.mapreduce; 019 020import com.hadoop.compression.lzo.LzoIndex; 021import com.hadoop.compression.lzo.LzopCodec; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.io.LongWritable; 033import org.apache.hadoop.io.Text; 034import org.apache.hadoop.mapreduce.InputFormat; 035import org.apache.hadoop.mapreduce.InputSplit; 036import org.apache.hadoop.mapreduce.JobContext; 037import org.apache.hadoop.mapreduce.RecordReader; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 040import org.apache.hadoop.mapreduce.lib.input.FileSplit; 041 042/** 043 * An {@link InputFormat} for lzop compressed text files. Files are broken into 044 * lines. Either linefeed or carriage-return are used to signal end of line. 045 * Keys are the position in the file, and values are the line of text. 046 */ 047public class LzoTextInputFormat extends FileInputFormat<LongWritable, Text> { 048 049 private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>(); 050 051 @Override 052 protected List<FileStatus> listStatus(JobContext job) throws IOException { 053 List<FileStatus> files = super.listStatus(job); 054 055 String fileExtension = new LzopCodec().getDefaultExtension(); 056 Configuration conf = job.getConfiguration(); 057 058 for (Iterator<FileStatus> iterator = files.iterator(); iterator.hasNext();) { 059 FileStatus fileStatus = iterator.next(); 060 Path file = fileStatus.getPath(); 061 FileSystem fs = file.getFileSystem(conf); 062 063 if (!file.toString().endsWith(fileExtension)) { 064 //get rid of non lzo files 065 iterator.remove(); 066 } else { 067 //read the index file 068 LzoIndex index = LzoIndex.readIndex(fs, file); 069 indexes.put(file, index); 070 } 071 } 072 073 return files; 074 } 075 076 @Override 077 protected boolean isSplitable(JobContext context, Path filename) { 078 LzoIndex index = indexes.get(filename); 079 return !index.isEmpty(); 080 } 081 082 @Override 083 public List<InputSplit> getSplits(JobContext job) throws IOException { 084 List<InputSplit> splits = super.getSplits(job); 085 Configuration conf = job.getConfiguration(); 086 // find new start/ends of the filesplit that aligns 087 // with the lzo blocks 088 089 List<InputSplit> result = new ArrayList<InputSplit>(); 090 091 for (InputSplit genericSplit : splits) { 092 // load the index 093 FileSplit fileSplit = (FileSplit) genericSplit; 094 Path file = fileSplit.getPath(); 095 FileSystem fs = file.getFileSystem(conf); 096 LzoIndex index = indexes.get(file); 097 if (index == null) { 098 throw new IOException("Index not found for " + file); 099 } 100 101 if (index.isEmpty()) { 102 // empty index, keep as is 103 result.add(fileSplit); 104 continue; 105 } 106 107 long start = fileSplit.getStart(); 108 long end = start + fileSplit.getLength(); 109 110 long lzoStart = index.alignSliceStartToIndex(start, end); 111 long lzoEnd = index.alignSliceEndToIndex(end, fs.getFileStatus(file).getLen()); 112 113 if (lzoStart != LzoIndex.NOT_FOUND && lzoEnd != LzoIndex.NOT_FOUND) { 114 result.add(new FileSplit(file, lzoStart, lzoEnd - lzoStart, fileSplit.getLocations())); 115 } 116 } 117 118 return result; 119 } 120 121 @Override 122 public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, 123 TaskAttemptContext taskAttempt) throws IOException, InterruptedException { 124 125 return new LzoLineRecordReader(); 126 } 127}