
这是我为您准备的实现:
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.LineRecordReader;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.lib.CombineFileInputFormat;import org.apache.hadoop.mapred.lib.CombineFileRecordReader;import org.apache.hadoop.mapred.lib.CombineFileSplit;@SuppressWarnings("deprecation")public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException { return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class); } public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> { private final LineRecordReader linerecord; public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations()); linerecord = new LineRecordReader(conf, filesplit); } @Override public void close() throws IOException { linerecord.close(); } @Override public LongWritable createKey() { // TODO Auto-generated method stub return linerecord.createKey(); } @Override public Text createvalue() { // TODO Auto-generated method stub return linerecord.createvalue(); } @Override public long getPos() throws IOException { // TODO Auto-generated method stub return linerecord.getPos(); } @Override public float getProgress() throws IOException { // TODO Auto-generated method stub return linerecord.getProgress(); } @Override public boolean next(LongWritable key, Text value) throws IOException { // TODO Auto-generated method stub return linerecord.next(key, value); } }}在您的作业中,首先
mapred.max.split.size根据您想要合并输入文件的大小来设置参数。在您的 run()中执行以下 *** 作 :
... if (argument != null) { conf.set("mapred.max.split.size", argument); } else { conf.set("mapred.max.split.size", "134217728"); // 128 MB }... conf.setInputFormat(CombinedInputFormat.class);...欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)