Hadoop核心组件——MR
Hadoop分布式计算框架(MapReduce)
何为MapReuce,一个例子
你想数出一摞牌中有多少张黑桃。直观方式是一张一张检查并且数出有多少张是黑桃。
MapReduce方法则是:
1.给在座的所有玩家中分配这摞牌
2.让每个玩家数自己手中的牌有几张是黑桃,然后把这个数目汇报给你
3.你把所有玩家告诉你的数字加起来,得到最后的结论
MapReduce设计理念
分布式计算
移动计算,而不是移动数据
Map-reduce的思想就是“分而治之”
Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”执行
“简单的任务”有几个含义:
数据或计算规模相对于原任务要大大缩小;
就近计算,即会被分配到存放了所需数据的节点进行计算;
这些小任务可以并行计算,彼此间几乎没有依赖关系;
Mapreduce初析
Mapreduce是一个计算框架,
既然是做计算的框架,那么表现形式就是有个输入(input),mapreduce操作这个输入(input),
通过本身定义好的计算模型,得到一个输出(output),这个输出就是我们所需要的结果。
Input——>MapReduce——>HDFS
单词统计例子
MapReduce运行机制,按照时间顺序包括:
输入分片(input split)、map阶段、shuffle阶段和reduce阶段。
1、输入分片(input split)
在进行map计算之前,mapreduce会在客户端根据程序里设置的输入路径,获取该路径下的文件及各个文件的block位置信息
mapreduce会根据输入文件计算输入分片个数与大小,
每个输入分片针对一个map任务,输入分片存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
划分Split
max.split(默认Long.MaxValue),min.split(默认1),block(64M , 2.下)
根据设置可得出每个分片大小,上述设置可得
max(min.split,min(max.split,block))
然后根据切片大小,对每个文件划分split , (记录每个split对应的偏移量)
如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片,65mb则是两个输入分片,而127mb也是两个输入分片。
分发MapTask
上面获取了map计算的单位 split,split可能由多个block组成, 前面获取的信息也包括了每个block块的位置,这样map reduce就会直到这个split的数据对应的数据主要在哪个节点上,然后每个split就会分发maptask到对应的节点上。
2、map阶段
mapTask个数由split决定,每个MapTask处理一个split的数据。
上面split阶段就知道了这个MapTask的对应的block块,这个map就会根据传过来的偏移量offset,通过hdfsAPI,对hdfs文件做 fileinput.seek(offset) 读取,这样本地的block文件就不需要传输,其他节点剩下的部分,就会通过网络传输拉取到这个节点。
然后MapTask就会通过 一行一行的读取 这个split的数据,程序员自己编写的map函数处理,然后再以以键值对输出。
3、shuffle阶段
在mapper和reducer中间的一个步骤,将map的输出作为reduce的输入的过程就是shuffle了.
Shuffle Write阶段 (Map节点上):
MapTask Context.Write 时 , 会将写的这行数据序列化先写到缓存数组中 (Buffer环中),
而写之前会根据 分区器Partitioner 计算出这行数据对应的分区号 :
分区数由ReduceTask个数决定,默认时HashPartitioner , 可自定义
根据 这行数据的Key对分区数取模 (key的hash值%上reduce的个数)
随着越来越多的数据写入Buffer环 (最大:100M)中,当达到阈值 ,环的大小的 0.8 时:
会启动一个Spill线程进行溢写,将数据写到磁盘
溢写的同时,会锁住这80%的内存,即MapTask只能往剩下的20%内存写数据
每次溢写的同时,会对这次溢写的数据进行排序:
先根据前面算出的分区号将同一个分区的放在一起,再根据排序比较器对一个分区内的数据进行排序,排序方法为快排
如果设置自定义排序比较器 sortComparator,则按自定义的排序,如果没有则默认是 KeyComparator 即按照Key排序 ,Key为String,按字符排序,Key为int,按照数值排序。
排序完成后溢写到磁盘,每次溢写生成一个小文件
每溢写 3 次会触发一个 Combiner 操作
combiner等同与在Map端的一种reduce操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作。
来减小Reduce聚合的负担,但是前提的Reduce里的计算逻辑能支持在Map端执行Combiner。
默认没有设置Combiner,如果没有设置,就不会进行Combiner。
待MapTask将所有的数据处理完成后,会将Buffer环触发Flush,将所有数据溢写到磁盘。
这时小文件达到最小spill文件数3 ,也会进行一次Combiner。
然后MapReduce会将所有小文件进行Merge合并,合并成一个大文件,等待Reduce拉取:
合并时将相同的分区的合并在一起,分区内进行归并排序。
记录下每个分区在合并后的这个大文件中的偏移量
Shuffle Read阶段 (Reduce节点上):
每个ReadTask会去Map端去拉取属于自己处理的分区数据,根据前面的偏移量去每个Map端拉取对应大文件中某段
多少个MapTask就会拉取会多少个文件。
4、reduce阶段
前面将各个MapTask的结果文件中属于自己的拉取到对应的Reduce端
然后进行归并排序合并这些文件,在返回一个逻辑迭代器来迭代这批数据(这时数据可能在磁盘,少部分可能在内存)
代码中每执行一个Reduce函数,就是处理一个组的数据:
MapReduce是根据分组比较器groupComparator来判断数据是否为一组的,(并不是一个组内Key都是相同的)
判断过程是每遍历一行,用分组比较器判断是否为一组Key,是则继续遍历,不是则下一行为另外一组的Key,再去遍历下一组
也就是分在一组的的数据在文件中必须连续的排序一起,而排序的是在文件合并时根据 排序比较器sortComparator 排序的
也就是 groupComparator 与 sortComparator中的排序逻辑必须不冲突,groupComparator 应该包含sortComparator
处理完后context.write写入磁盘
MapReduce的架构(Hadoop 1.x)
主多从架构
主 JobTracker
负责调度分配每一个子任务task运行于TaskTracker上,
如果发现有失败的 task就重新分配其任务到其他节点。
每一个hadoop集群中只一个JobTracker, 一般它运行在Master节点上,监控整个集群的资源负载
从 TaskTracker
TaskTracker 主动 与JobTracker通信,接收作业,并负责直接执行每一个任务 ,
为了减少网络带宽TaskTracker最好运行在HDFS的DataNode上,数据本地化。
Client
作业为单位,规划作业计算分布,提交作业资源到HDFS
最终提交作业到JobTracker
架构图
弊端:
JobTracker:负载过重,单点故障
资源管理与计算调度强耦合,其他计算框架需要重复实现资源管理
不同框架对资源不能全局管理