
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class SortWritable implements WritableComparable{ private String first; private int second; @Override public int compareTo(SortWritable o) { int i = this.first.compareTo(o.first); if(i==0){ int i1 = Integer.valueOf(this.second+"").compareTo(Integer.valueOf(o.second)); return i1; }else{ return i; } } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.first); out.writeInt(second); } @Override public void readFields(DataInput in) throws IOException { this.first=in.readUTF(); this.second=in.readInt(); } public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public String toString() { return this.first+"t"+this.second; } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMap extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //定义计数器Map输入多少条 Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter"); counter.increment(1L); String[] strings = value.toString().split("t"); SortWritable sortWritable = new SortWritable(); sortWritable.setFirst(strings[0]); sortWritable.setSecond(Integer.parseInt(strings[1])); context.write(sortWritable,NullWritable.get()); System.out.println(sortWritable.toString()); } }
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReduce extends Reducer{ @Override protected void reduce(SortWritable key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.Counter counter = context.getCounter(Counter.REDUCE_INPUT_RECORDS); counter.increment(1L); for (NullWritable nullWritable:values) { context.write(key,NullWritable.get()); org.apache.hadoop.mapreduce.Counter counter1 = context.getCounter(Counter.REDUCE_OUTPUT_RECORDS); counter1.increment(1L); } } public static enum Counter{ REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS, } }
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)