split和block的区别以及maptask和reducetask个数设定

split和block的区别以及maptask和reducetask个数设定,第1张

job.split中包含split的个数由FileInputFormat.getSplits计算出,方法的逻辑如下:

1. 读取参数mapred.map.tasks,这个参数默认设置为0,生产系统中很少修改。

2. 计算input文件的总字节数,总字节数/(mapred.map.tasks==0 ? 1: mapred.map.tasks )=goalsize

3. 每个split的最小值minSize由mapred.min.split.size参数设置,这个参数默认设置为0,生产系统中很少修改。

4. 调用computeSplitSize方法,计算出splitsize= Math.max(minSize, Math.min(goalSize, blockSize)),通常这个值=blockSize,输入的文件较小,文件字节数之和小于blocksize时,splitsize=输入文件字节数之和。

5. 对于input的每个文件,计算split的个数。

a) 文件大小/splitsize>1.1,创建一个split,这个split的字节数=splitsize,文件剩余字节数=文件大小-splitsize

b) 文件剩余字节数/splitsize<1.1,剩余的部分作为一个split

举例说明:

1. input只有一个文件,大小为100M,splitsize=blocksize,则split数为2,第一个split为64M,第二个为36M

2. input只有一个文件,大小为65M,splitsize=blocksize,则split数为1,split大小为65M

3. input只有一个文件,大小为129M,splitsize=blocksize,则split数为2,第一个split为64M,第二个为65M(最后一个split的大小可能超过splitsize)

4. input只有一个文件,大小为20M ,splitsize=blocksize,则split数为1,split大小为20M

5. input有两个文件,大小为100M和20M,splitsize=blocksize,则split数为3,第一个文件分为两个split,第一个split为64M,第二个为36M,第二个文件为一个split,大小为20M

6. input有两个文件,大小为25M和20M,splitsize=blocksize,则split数为2,第一个文件为一个split,大小为25M,第二个文件为一个split,大小为20M

假设一个job的input大小固定为100M,当只包含一个文件时,split个数为2,maptask数为2,但当包含10个10M的文件时,maptask数为10。

下面来分析reducetask,纯粹的mapreduce task的reduce task数很简单,就是参数mapred.reduce.tasks的值,hadoop-site.xml文件中和mapreduce job运行时不设置的话默认为1。

在HIVE中运行sql的情况又不同,hive会估算reduce task的数量,估算方法如下:

通常是ceil(input文件大小/1024*1024*1024),每1GB大小的输入文件对应一个reduce task。

特殊的情况是当sql只查询count(*)时,reduce task数被设置成1。

1、我们知道map的数量和文件数、文件大小、块大小、以及split大小有关,而reduce的数量跟哪些因素有关呢?

设置mapred.tasktracker.reduce.tasks.maximum的大小可以决定单个tasktracker一次性启动reduce的数目,但是不能决定总的reduce数目。

conf.setNumReduceTasks(4)JobConf对象的这个方法可以用来设定总的reduce的数目,看下Job Counters的统计:

确实启动了4个reduce:看下输出:

只有2个reduce在干活。为什么呢?

shuffle的过程,需要根据key的值决定将这条<K,V>(map的输出),送到哪一个reduce中去。送到哪一个reduce中去靠调用默认的org.apache.hadoop.mapred.lib.HashPartitioner的getPartition()方法来实现。

HashPartitioner类:

numReduceTasks的值在JobConf中可以设置。默认的是1:显然太小。

这也是为什么默认的设置中总启动一个reduce的原因。

返回与运算的结果和numReduceTasks求余。

Mapreduce根据这个返回结果决定将这条<K,V>,送到哪一个reduce中去。

key传入的是LongWritable类型,看下这个LongWritable类的hashcode()方法:

简简单单的返回了原值的整型值。

因为getPartition(K2 key, V2 value,int numReduceTask)返回的结果只有2个不同的值,所以最终只有2个reduce在干活。

HashPartitioner是默认的partition类,我们也可以自定义partition类 :

仅仅需要覆盖getPartition()方法就OK。通过:

conf.setPartitionerClass(MyPartitioner.class)

可以设置自定义的partition类。

同样由于之返回2个不同的值0,1,不管conf.setNumReduceTasks(4)设置多少个reduce,也同样只会有2个reduce在干活。

由于每个reduce的输出key都是经过排序的,上述自定义的Partitioner还可以达到排序结果集的目的:

part-00000和part-00001是这2个reduce的输出,由于使用了自定义的MyPartitioner,所有key小于20051210的的<K,V>都会放到第一个reduce中处理,key大于20051210就会被放到第二个reduce中处理。

每个reduce的输出key又是经过key排序的,所以最终的结果集降序排列。

但是如果使用上面自定义的partition类,又conf.setNumReduceTasks(1)的话,会怎样? 看下Job Counters:

只启动了一个reduce。

(1)、 当setNumReduceTasks( int a) a=1(即默认值),不管Partitioner返回不同值的个数b为多少,只启动1个reduce,这种情况下自定义的Partitioner类没有起到任何作用。

(2)、 若a!=1:

a、当setNumReduceTasks( int a)里 a设置小于Partitioner返回不同值的个数b的话:

同时设置setNumReduceTasks( 2)。

于是抛出异常:

某些key没有找到所对应的reduce去处。原因是只启动了a个reduce。

b、当setNumReduceTasks( int a)里 a设置大于Partitioner返回不同值的个数b的话,同样会启动a个reduce,但是只有b个redurce上会得到数据。启动的其他的a-b个reduce浪费了。

c、理想状况是a=b,这样可以合理利用资源,负载更均衡。


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/tougao/11279004.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-14
下一篇2023-05-14

发表评论

登录后才能评论

评论列表(0条)

    保存