
(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
(6)-partitioner:用户自定义的partitioner程序
(7)-combiner:用户自定义的combiner程序(必须用java实现)
(8)-D:作业的一些属性(以前用的是-jonconf),具体有:
1)mapred.map.tasks:map task数目
2)mapred.reduce.tasks:reduce task数目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数
据的分隔符,默认均为\t。
4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目
另外,Hadoop本身还自带一些好用的Mapper和Reducer:
(1)Hadoop聚集功能
Aggregate提供一个特殊的reducer类和一个特殊的combiner类,并且有一系列的“聚合器”(例如“sum”,“max”,“min”等)用于聚合一组value的序列。用户可以使用Aggregate定义一个mapper插件类,这个类用于为mapper输入的每个key/value对产生“可聚合项”。Combiner/reducer利用适当的聚合器聚合这些可聚合项。要使用Aggregate,只需指定“-reducer aggregate”。
(2)字段的选取(类似于Unix中的‘cut’)
Hadoop的工具类org.apache.hadoop.mapred.lib.FieldSelectionMapReduc帮助用户高效处理文本数据,就像unix中的“cut”工具。工具类中的map函数把输入的key/value对看作字段的列表。 用户可以指定字段的分隔符(默认是tab),可以选择字段列表中任意一段(由列表中一个或多个字段组成)作为map输出的key或者value。 同样,工具类中的reduce函数也把输入的key/value对看作字段的列表,用户可以选取任意一段作为reduce输出的key或value。
后来将slaves节点的hostname也修正为IP映射表内对应的名字,解决?
根据一位外国友人的说明,在reduce阶段 ,0-33%阶段是 shuffle 阶段,就是根据键值 来讲本条记录发送到指定的reduce,这个阶段应该是在map还没有完全完成的时候就已经开始了,因为我们会看到map在执行到一个百分比后reduce也启动了,这样做也提高了程序的执行效率。
34%-65%阶段是sort阶段,就是reduce根据收到的键值进行排序。map阶段也会发生排序,map的输出结果是以键值为顺序排序后输出,可以通过只有map阶段处理的输出来验证(以前自己验证过,貌似确有这么回事,大家自己再验证下,免得我误人子弟啊)。
66%-100%阶段是处理阶段,这个阶段才是真正的处理阶段,如果程序卡在这里,估计就是你的reduce程序有问题了。
索性等了一晚上,第二天终于有动静了
和上面的记录对比发现,从%67到%77用了11个小时!这明显是reduce程序效率太慢了。也可能是数据倾斜问题。中间也试过增加reducer的数量,但无果。最终我索性减少了输入文件的行数,使其只有三行:
然后重新运行程序,瞬间得到了结果:
可见,结果是正确的。
令人诧异的是很快就执行完了,难道真的是shell脚本不适合做类似统计这样的事情吗?
大数据的时代, 到处张嘴闭嘴都是Hadoop, MapReduce, 不跟上时代怎么行? 可是对一个hadoop的新手, 写一个属于自己的MapReduce程序还是小有点难度的, 需要建立一个maven项目, 还要搞清楚各种库的依赖, 再加上编译运行, 基本上头大两圈了吧。 这也使得很多只是想简单了解一下MapReduce的人望而却步。本文会教你如何用最快最简单的方法编写和运行一个属于自己的MapReduce程序, let's go!
首先有两个前提:
1. 有一个已经可以运行的hadoop 集群(也可以是伪分布系统), 上面的hdfs和mapreduce工作正常 (这个真的是最基本的了, 不再累述, 不会的请参考 http://hadoop.apache.org/docs/current/)
2. 集群上安装了JDK (编译运行时会用到)
正式开始
1. 首先登入hadoop 集群里面的一个节点, 创建一个java源文件, 偷懒起见, 基本盗用官方的word count (因为本文的目的是教会你如何快编写和运行一个MapReduce程序, 而不是如何写好一个功能齐全的MapReduce程序)
内容如下:
import java.io.IOException
import java.util.StringTokenizer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.GenericOptionsParser
public class myword {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1)
private Text word = new Text()
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString())
while (itr.hasMoreTokens()) {
word.set(itr.nextToken())
context.write(word, one)
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable()
public void reduce(Text key, Iterable<IntWritable>values,
Context context
) throws IOException, InterruptedException {
int sum = 0
for (IntWritable val : values) {
sum += val.get()
}
result.set(sum)
context.write(key, result)
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration()
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs()
if (otherArgs.length != 2) {
System.err.println('Usage: wordcount <in><out>')
System.exit(2)
}
Job job = new Job(conf, 'word count')
job.setJarByClass(myword.class)
job.setMapperClass(TokenizerMapper.class)
job.setCombinerClass(IntSumReducer.class)
job.setReducerClass(IntSumReducer.class)
job.setOutputKeyClass(Text.class)
job.setOutputValueClass(IntWritable.class)
FileInputFormat.addInputPath(job, new Path(otherArgs[0]))
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]))
System.exit(job.waitForCompletion(true) ? 0 : 1)
}
}
与官方版本相比, 主要做了两处修改
1) 为了简单起见,去掉了开头的 package org.apache.hadoop.examples
2) 将类名从 WordCount 改为 myword, 以体现是我们自己的工作成果 :)
2. 拿到hadoop 运行的class path, 主要为编译所用
运行命令
hadoop classpath
保存打出的结果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-mapreduce/lib/*:/usr/lib/gphd/hadoop-mapreduce/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:
3. 编译
运行命令
javac -classpath xxx ./myword.java
xxx部分就是上一步里面取到的class path
运行完此命令后, 当前目录下会生成一些.class 文件, 例如:
myword.class myword$IntSumReducer.class myword$TokenizerMapper.class
4. 将class文件打包成.jar文件
运行命令
jar -cvf myword.jar ./*.class
至此, 目标jar 文件成功生成
5. 准备一些文本文件, 上传到hdfs, 以做word count的input
例子:
随意创建一些文本文件, 保存到mapred_test 文件夹
运行命令
hadoop fs -put ./mapred_test/
确保此文件夹成功上传到hdfs 当前用户根目录下
6. 运行我们的程序
运行命令
hadoop jar ./myword.jar myword mapred_test output
顺利的话, 此命令会正常进行, 一个MapReduce job 会开始工作, 输出的结果会保存在 hdfs 当前用户根目录下的output 文件夹里面。
至此大功告成!
如果还需要更多的功能, 我们可以修改前面的源文件以达到一个真正有用的MapReduce job。
但是原理大同小异, 练手的话, 基本够了。
一个抛砖引玉的简单例子, 欢迎板砖。
转载
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)