001/* 002 * This file is part of Hadoop-Gpl-Compression. 003 * 004 * Hadoop-Gpl-Compression is free software: you can redistribute it 005 * and/or modify it under the terms of the GNU General Public License 006 * as published by the Free Software Foundation, either version 3 of 007 * the License, or (at your option) any later version. 008 * 009 * Hadoop-Gpl-Compression is distributed in the hope that it will be 010 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty 011 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 012 * GNU General Public License for more details. 013 * 014 * You should have received a copy of the GNU General Public License 015 * along with Hadoop-Gpl-Compression. If not, see 016 * <http://www.gnu.org/licenses/>. 017 */ 018package com.hadoop.mapreduce; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FSDataInputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.io.LongWritable; 026import org.apache.hadoop.io.Text; 027import org.apache.hadoop.io.compress.CompressionCodec; 028import org.apache.hadoop.io.compress.CompressionCodecFactory; 029import org.apache.hadoop.mapreduce.InputSplit; 030import org.apache.hadoop.mapreduce.RecordReader; 031import org.apache.hadoop.mapreduce.TaskAttemptContext; 032import org.apache.hadoop.mapreduce.lib.input.FileSplit; 033import org.apache.hadoop.util.LineReader; 034 035/** 036 * Reads line from an lzo compressed text file. Treats keys as offset in file 037 * and value as line. 038 */ 039public class LzoLineRecordReader extends RecordReader<LongWritable, Text> { 040 041 private long start; 042 private long pos; 043 private long end; 044 private LineReader in; 045 private FSDataInputStream fileIn; 046 047 private final LongWritable key = new LongWritable(); 048 private final Text value = new Text(); 049 050 /** 051 * Get the progress within the split. 052 */ 053 @Override 054 public float getProgress() { 055 if (start == end) { 056 return 0.0f; 057 } else { 058 return Math.min(1.0f, (pos - start) / (float) (end - start)); 059 } 060 } 061 062 public synchronized long getPos() throws IOException { 063 return pos; 064 } 065 066 @Override 067 public synchronized void close() throws IOException { 068 if (in != null) { 069 in.close(); 070 } 071 } 072 073 @Override 074 public LongWritable getCurrentKey() throws IOException, InterruptedException { 075 return key; 076 } 077 078 @Override 079 public Text getCurrentValue() throws IOException, InterruptedException { 080 return value; 081 } 082 083 @Override 084 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { 085 FileSplit split = (FileSplit) genericSplit; 086 start = split.getStart(); 087 end = start + split.getLength(); 088 final Path file = split.getPath(); 089 Configuration job = context.getConfiguration(); 090 091 FileSystem fs = file.getFileSystem(job); 092 CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job); 093 final CompressionCodec codec = compressionCodecs.getCodec(file); 094 if (codec == null) { 095 throw new IOException("Codec for file " + file + " not found, cannot run"); 096 } 097 098 // open the file and seek to the start of the split 099 fileIn = fs.open(split.getPath()); 100 101 // creates input stream and also reads the file header 102 in = new LineReader(codec.createInputStream(fileIn), job); 103 104 if (start != 0) { 105 fileIn.seek(start); 106 107 // read and ignore the first line 108 in.readLine(new Text()); 109 start = fileIn.getPos(); 110 } 111 112 this.pos = start; 113 } 114 115 @Override 116 public boolean nextKeyValue() throws IOException, InterruptedException { 117 //since the lzop codec reads everything in lzo blocks 118 //we can't stop if the pos == end 119 //instead we wait for the next block to be read in when 120 //pos will be > end 121 while (pos <= end) { 122 key.set(pos); 123 124 int newSize = in.readLine(value); 125 if (newSize == 0) { 126 return false; 127 } 128 pos = fileIn.getPos(); 129 130 return true; 131 } 132 133 return false; 134 } 135}