001/* 002 * This file is part of Hadoop-Gpl-Compression. 003 * 004 * Hadoop-Gpl-Compression is free software: you can redistribute it 005 * and/or modify it under the terms of the GNU General Public License 006 * as published by the Free Software Foundation, either version 3 of 007 * the License, or (at your option) any later version. 008 * 009 * Hadoop-Gpl-Compression is distributed in the hope that it will be 010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty 011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 012 * GNU General Public License for more details. 013 * 014 * You should have received a copy of the GNU General Public License 015 * along with Hadoop-Gpl-Compression. If not, see 016 * <http://www.gnu.org/licenses/>. 017 */ 018package com.hadoop.mapred; 019 020import com.hadoop.compression.lzo.LzoIndex; 021import com.hadoop.compression.lzo.LzopCodec; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.io.LongWritable; 033import org.apache.hadoop.io.Text; 034import org.apache.hadoop.mapred.FileInputFormat; 035import org.apache.hadoop.mapred.FileSplit; 036import org.apache.hadoop.mapred.InputSplit; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapred.RecordReader; 039import org.apache.hadoop.mapred.Reporter; 040 041/** 042 * This class conforms to the old (org.apache.hadoop.mapred.*) hadoop API style 043 * which is deprecated but still required in places. Streaming, for example, 044 * does a check that the given input format is a descendant of 045 * org.apache.hadoop.mapred.InputFormat, which any InputFormat-derived class 046 * from the new API fails. In order for streaming to work, you must use 047 * com.hadoop.mapred.DeprecatedLzoTextInputFormat, not 048 * com.hadoop.mapreduce.LzoTextInputFormat. The classes attempt to be alike in 049 * every other respect. 050 */ 051@SuppressWarnings("deprecation") 052public class DeprecatedLzoTextInputFormat extends FileInputFormat<LongWritable, Text> { 053 054 public static final String LZO_INDEX_SUFFIX = ".index"; 055 private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>(); 056 057 @Override 058 protected FileStatus[] listStatus(JobConf conf) throws IOException { 059 List<FileStatus> files = new ArrayList<FileStatus>(Arrays.asList(super.listStatus(conf))); 060 061 String fileExtension = new LzopCodec().getDefaultExtension(); 062 063 Iterator<FileStatus> it = files.iterator(); 064 while (it.hasNext()) { 065 FileStatus fileStatus = it.next(); 066 Path file = fileStatus.getPath(); 067 068 if (!file.toString().endsWith(fileExtension)) { 069 // Get rid of non-LZO files. 070 it.remove(); 071 } else { 072 FileSystem fs = file.getFileSystem(conf); 073 LzoIndex index = LzoIndex.readIndex(fs, file); 074 indexes.put(file, index); 075 } 076 } 077 078 return files.toArray(new FileStatus[]{}); 079 } 080 081 @Override 082 protected boolean isSplitable(FileSystem fs, Path filename) { 083 LzoIndex index = indexes.get(filename); 084 return !index.isEmpty(); 085 } 086 087 @Override 088 public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException { 089 FileSplit[] splits = (FileSplit[]) super.getSplits(conf, numSplits); 090 // Find new starts/ends of the filesplit that align with the LZO blocks. 091 092 List<FileSplit> result = new ArrayList<FileSplit>(); 093 094 for (FileSplit fileSplit : splits) { 095 Path file = fileSplit.getPath(); 096 FileSystem fs = file.getFileSystem(conf); 097 LzoIndex index = indexes.get(file); 098 if (index == null) { 099 throw new IOException("Index not found for " + file); 100 } 101 if (index.isEmpty()) { 102 // Empty index, keep it as is. 103 result.add(fileSplit); 104 continue; 105 } 106 107 long start = fileSplit.getStart(); 108 long end = start + fileSplit.getLength(); 109 110 long lzoStart = index.alignSliceStartToIndex(start, end); 111 long lzoEnd = index.alignSliceEndToIndex(end, fs.getFileStatus(file).getLen()); 112 113 if (lzoStart != LzoIndex.NOT_FOUND && lzoEnd != LzoIndex.NOT_FOUND) { 114 result.add(new FileSplit(file, lzoStart, lzoEnd - lzoStart, fileSplit.getLocations())); 115 } 116 } 117 118 return result.toArray(new FileSplit[result.size()]); 119 } 120 121 @Override 122 public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, 123 JobConf conf, Reporter reporter) throws IOException { 124 reporter.setStatus(split.toString()); 125 return new DeprecatedLzoLineRecordReader(conf, (FileSplit) split); 126 } 127 128}