mapreduce分为几个阶段(各个阶段做了什么)
今天抽时间又重新整体复习了下mapreduce在各个阶段的执行过程,感觉每一次学习都会有更大的收获,今天就把我学习到的东西全都和大家一起分享出来,方便自己梳理知识和记忆的同时,也希望能给其他小伙伴带来收获!每天进步一点点!
一.什么是MapReduce
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
MapReduce的核心思想是“分而治之,先分后合”,即将一个大的、复杂的工作或任务,拆分成多个小的任务,并行处理,最终进行合并==。适用于大量复杂的、时效性不高的任务处理场景(大规模离线数据处理场景)。其中Map阶段就是分,Reduce阶段是合。
map:以一条记录为单位做映射~!主要做映射,变换,过滤,1进N出(进来1条记录出去N条记录)
reduce:以一组(相同Key数据为一组)为单位做计算~!主要做分解,缩小,归纳,一组进N出(进来一组记录出去N条记录)
map与reduce之间的衔接:键值对(Key,Value)
(Key,Value):由map映射实现,键值对的键划分数据分组
mapreduce简单的过程
二.MapReduce运行过程
1.MapReduce大致运行过程
MapReduce的大致运行过程
MapReduce分为Map阶段和Reduce阶段,从上图中可以看出所有Map任务完成后才可以进行Reduce阶段,Reduce的输入来自于Map的输出,两者之间是有依赖的,不是并行关系。
3个split(分片)是可以并行运行的,代表3个map任务,一个切片对应一个map计算,默认情况下,以HDFS的一个块(block)的大小(默认128M)为一个分片。
那么思考一下,为什么不直接用一个block直接对应一个map计算?而是要在加一个split呢?
这就是在软件工程学当中经常提到的加一层解耦,block块的大小是固定的,split的大小默认情况下等于block块的大小,当然我们也是可以根据自己的需求来设置split的大小,可以大于块大小,也可以小于块大小,这样就可以满足未来不同项目组的不同需求了,用起来相当灵活了。
可以看出,map的个数(并行度)是由split的数量决定的,每个split对应一个map计算,默认情况下,一个split就是一个块,split是可以复用block偏移量,大小,location信息的,split可以根据block的location相关信息来找到对应副本的机器,然后map程序可以移动到相应的机器上进行计算,很好的实现了计算向数据移动的语义。
接下来介绍下分组和分区的概念:
分组:将相同key的value进行合并,key相等的话,将分到同一个组里面
分区:决定当前的key交给哪个reduce进行处理,默认情况下,根据key的hash值,对reduce个数取余
Reduce的阶段也可以设置多个reduce任务来计算,一个分区由一个reduce任务处理,每个reduce任务可以计算同一分组或多组中的数据。也就是说一个分区包含一个组或多个组中的数据。相同组的数据是不可切割开的,如果切开了,计算结果是不准确的。如果一个reduce任务中有多个组,那么需要开启多个线程来处理不同组中的数据,
reduce的个数(并行度)由人来决定,可以根据不同需求自己手动设置。
2.MapReduce一个map,reduce阶段的详细过程
一个MapReduce的详细过程
如上图所示,MapReduce整体分为Map端任务和Reduce端任务,整个MapReduce程序又分为如下5个阶段。
input
首先读取HDFS上的文件,分出若干个分片(split),split会将文件内容格式化为记录,以记录为单位调用map方法,每一条记录都会调用一次map方法。
map
经过map方法后,map的输出映射为key,value,key,value会参与分区计算,拿着key算出分区号(根据key的hash值,对reduce个数取余),最终map输出key,value,分区(partition)。
shuffle:即为洗牌(包括Map端shuffle和Reduce端shuffle)***重要阶段****
maptask的输出是一个文件(中间数据集),存在本地的文件系统中,也就是说中间数据集并不是存放到HDFS中的。假设这个文件中有50000条记录,那么如何将这50000条记录放到本地文件系统中呢?是要每产出一条就写一条吗?内存进程和磁盘多次进行切换,这样速度会很慢,此时就会出现一个环形内存缓冲区(默认大小为100M)。
1.map端的shuffle:
(1)溢写(spill)
每个map处理之后的结果将会进入环形缓冲区。内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为spill,中文可理解为溢写。溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。比例默认是0.8,也就是当缓冲区的数据值已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。map task的输出结果还可以往剩下的20MB内存中写,互不影响。
(2)排序
当溢写线程启动后,需要对这80MB空间内的分区,key做排序(sort)。需要做一个2次排序,分区有序,且分区内key有序,保证未来相同一组key会相邻的排在一起。
(3)往磁盘溢写数据
当环形缓冲区达到阈值80%,开始将分区排序后的数据溢写磁盘变成文件,每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。最终会产生多个小文件。
(4)merge合并
将spill生成的小文件进行合并,将多个溢写文件合并到一个文件,上面的每个小文件都是内部有序,外部无序的,所以在合并时做一个归并排序来达到合并后的文件是有序的。
map task结束,通过appmaster,appmaster通过reduce过来拉取数据。
2.reduce的shuffle:
(1)copy过程
简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过http方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
(2)merge合并
将每个map task的结果中属于自己分区的数据进行合并,同样做归并排序,使得相同分区的数据的key是有序的。copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle使用。
reduce
将合并后的数据f直接输入到reduce函数,进行相关计算,产出最终结果。
output
将最终结果存放到HDFS上。
分析了MapReduce的运行过程,可以发现MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com