
- 分区
- 排序
- 规约
- 分组
- 主类代码
将数据分成若干个块,每个块可以按照约定形成文件
步骤:
1 继承 Partitioner 类 并重写 getPartition方法
2 在主类中设置启用分区 job.setPartitionerClass(OrderParition.class);
注意:
1 在继承 Partitioner
时的两个类型 分别对应了 K2 V2 也就是 Mapper的输出类型 2 getPartition 中的参数 i 是为RedeceTask的个数 在主类中以 job.setNumReduceTasks(1);设置
3 setNumReduceTasks 设置了几个RedeceTask 就会产生几个结果文件
代码展示 partition 类
package ordersTop; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class OrderParition extends Partitioner排序{ public int getPartition(OrderBean orderBean, Text text, int i) { return (orderBean.getName().hashCode() & Integer.MAX_VALUE) % i; } }
- 步骤:
- 1 创建key的实体类 实现 WritableComparable 接口
- 2 排序主要依靠接口下 compareTo 方法 定义自己的排序规则
- 3 write 以及 readFields 为根据字段类型的统一写法
- 注意:
- 1 排序字段必须包含在key中
- 2 主类不需要其他设置
key实体类代码
package ordersTop; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable规约{ private String order; private String name; private int amount; @Override public String toString() { return order + ',' + name + ',' + amount + ','; } public String getOrder() { return order; } public void setOrder(String order) { this.order = order; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public int compareTo(OrderBean o) { if (this.name.compareTo(o.getName()) == 0){ return o.getAmount() - this.amount; } return this.name.compareTo(o.getName()); } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(order); dataOutput.writeUTF(name); dataOutput.writeInt(amount); } public void readFields(DataInput dataInput) throws IOException { this.order = dataInput.readUTF(); this.name = dataInput.readUTF(); this.amount = dataInput.readInt(); } }
分组规约其实就是一个 Reduce
和Reduce写法一致 之只是在主类中设置不同
job.setCombinerClass(OrderReduce.class);
- 步骤:
- 1 创建自定义分组规则类继承 WritableComparator
- 2 重写compare方法 为你需要的分组规则
package ordersTop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderGroup extends WritableComparator {
public OrderGroup() {
super(OrderBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean bean1 = (OrderBean) a;
OrderBean bean2 = (OrderBean) b;
return bean1.getName().compareTo(bean2.getName());
}
}
主类代码
package ordersTop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class Main {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"ordertop");
// 输入路径 与 读取方式
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///C:\Users\Administrator\Desktop\Hadoop学习\orders.txt"));
job.setMapperClass(OrderMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(Text.class);
// job.setPartitionerClass(OrderParition.class); //分区 用时取消注释
// job.setCombinerClass(OrderReduce.class); //规约 用时取消注释
// job.setGroupingComparatorClass(OrderGroup.class); //分组 用时取消注释
job.setReducerClass(OrderReduce.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// job.setNumReduceTasks(3);//设置ReduceTasks个数 用时取消注释
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///C:\Users\Administrator\Desktop\Hadoop学习\ordertop"));
boolean flag = job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)