Hadoop 0.20.205的CombineFileInputFormat的实现

Hadoop 0.20.205的CombineFileInputFormat的实现,第1张

Hadoop 0.20.205的CombineFileInputFormat的实现

这是我为您准备的实现:

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.LineRecordReader;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.lib.CombineFileInputFormat;import org.apache.hadoop.mapred.lib.CombineFileRecordReader;import org.apache.hadoop.mapred.lib.CombineFileSplit;@SuppressWarnings("deprecation")public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {    @SuppressWarnings({ "unchecked", "rawtypes" })    @Override    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {        return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);    }    public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {        private final LineRecordReader linerecord;        public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations()); linerecord = new LineRecordReader(conf, filesplit);        }        @Override        public void close() throws IOException { linerecord.close();        }        @Override        public LongWritable createKey() { // TODO Auto-generated method stub return linerecord.createKey();        }        @Override        public Text createvalue() { // TODO Auto-generated method stub return linerecord.createvalue();        }        @Override        public long getPos() throws IOException { // TODO Auto-generated method stub return linerecord.getPos();        }        @Override        public float getProgress() throws IOException { // TODO Auto-generated method stub return linerecord.getProgress();        }        @Override        public boolean next(LongWritable key, Text value) throws IOException { // TODO Auto-generated method stub return linerecord.next(key, value);        }    }}

在您的作业中,首先

mapred.max.split.size
根据您想要合并输入文件的大小来设置参数。在您的 run()中执行以下 *** 作

... if (argument != null) {     conf.set("mapred.max.split.size", argument); } else {     conf.set("mapred.max.split.size", "134217728"); // 128 MB }... conf.setInputFormat(CombinedInputFormat.class);...


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5488257.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-12
下一篇2022-12-12

发表评论

登录后才能评论

评论列表(0条)

    保存