介绍
MapReduce是Hadoop中的计算框架,用于分布式计算统计数据;MapReduce经历了两个版本的演变,从Hadoop0.23.0开始,MapReduce框架演变为MRv2,也可以称作YARN,即第二版的MapReduce框架,主要改进了之前框架中的JobTracker单点问题以及集群节点上限的问题;
先简单看一下旧版本的MapReduce框架的结构图。
MapReduce DataFlow
Map-Reduce data flow |
- JobClient.runJob()创建一个JobClient实例,并调用submitJob函数
- 向JobTracker获取一个JobId
- 检测此Job的output配置
- 计算job的split。每个split作为一个Map任务的输入,被划分了多少个split,就会有相应多个Map任务执行;
- 划分时,只划分那些大于HDFS block大小的文件,小于等于一个HDFS block大小的文件是不会被划分的;
- 当hadoop处理很多小文件的时候,会产生很多个Map任务,从而导致效率低下;
- 将Job运行所需的资源文件拷贝到JobTracker的文件系统中,包括job的jar,配置文件,split信息等;
- 通知JobTracker Job已经可以运行
- 每隔一秒轮询一次job进度,并反馈给命令行,知道job运行完成;
- JobTractor收到任务后,将任务放入队列
- Job调度器从队列中获取并初始化任务;
- Job调度器从共享文件系统获取计算好的split信息
- 为每个split创建一个Map Task,并为每个task分配一个ID
- 任务分配
- TaskTracker周期性的向JobTracker发送heartbeat。
- heartbeat中如果TaskTracker告知JobTracker已准备好运行一个task,那么JobTracker将按优先级分配一个task给TaskTracker
- TaskTracker拥有固定槽数的map task和reduce task,这个可以在启动TaskTracker时指定;
- 默认调度器中Map task优先于reduce task
- Map的过程
- MapRunnable从split中读取一行行数据记录,并按照mapper的map函数将数据输出;
- map的输出并不直接写入硬盘,而是先写入缓存;
- map同时会为写入数据启动一个守护线程,当buffer达到一定的阈值,守护线程开始将内存中的数据写入硬盘;
- partitioner: 文件在写入内存缓冲区之前,首先要经过partitioner,为写入数据分区;实际上,partitioner主要的作用是将不同的key分配到不同的分区上,从而为reduce任务提供方便;
- 根据上图,我们可以看到,写入磁盘时有一个spill操作,spill:sort and combination,即排序和合并;
- 首先在写入磁盘前,数据先要经过排序;
- 如果客户端设置过combiner,那么会按照combiner的设置来合并相同的key
- 最后向磁盘写入溢写文件;
- 每次写都会向磁盘写入一个文件,如果写入了多个文件,最终文件还会有一个合并的过程;一般合并会将相同的key合并成一个group,例如:{“aaa”, [5, 8, 2, …]},如果client设置过Combiner,也会使用Combiner来合并相同的key;
- reducer可以通过http协议请求map的输出文件。
- Reduce过程
- MapTask结束是,MapTask通知JobTracker,TaskTracker通知JobTracker, 从而JobTracker知道了Map输出和TaskTracker的关系;
- reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出;
- reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。reduce task有多个copy线程,可以并行拷贝;
- 多个map输出拷贝到reduce task后,一个守护线程将其合并为一个大的排好序的文件。
- 当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
- 最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。
问题
这是一个简单明了的过程,但是这个结构也存在很多问题:- JobTractor存在单点故障
- JobTractor管理了所有的任务,当任务数过多时会导致JobTractor的内存占用非常高,因此一个MapReduce集群由于JobTractor的问题存在一个规模上限的问题,一般是3000-4000个节点;
- TaskTractor的运行没有考虑到资源开销的问题,如果两个资源消耗非常大的task运行到一个TaskTractor节点,很可能会导致OOM的问题;
YARN
从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:YARN |
资源管理器和每个节点的NodeManager组成了数据计算框架;资源管理器负责最终裁决系统中运行的所有应用使用的资源;
每个应用的ApplicationMaster是一个库框架,主要任务是和ResourceManager协商资源,和NodeManager协作运行和监控任务;
Resource Manager
主要由两部分构成,调度器和应用管理器;调度器(Scheduler)
主要负责向运行在系统中的应用分配资源。这是一个纯粹意义上的调度器,因为它不会监控或追踪应用的执行状态;而且,它也不会对失败任务重启做任何报障;调度器的调度功能是依据应用执行需要的资源量,它通过一个资源容器来分配内存、cpu、磁盘、网络等资源,初期第一个版本只有内存。
调度器有一个可插拔的策略插件,这个插件负责分区所有的应用和应用队列;例如CapacityScheduler和FairScheduler都算作这个策略插件的一种实例;
CapacityScheduler支持分层级的应用队列,从而能够更好的预测享有的集群资源的多少;
应用管理器(Applications Manager)
主要负责接收任务提交,协商第一个执行这个应用的容器作为应用Master,提供当应用失败后重启Application Master的服务;
NodeManager
是整个框架在每台机器上的代理,它主要负责资源容器,监控资源使用率并向Resource Manager报告;ApplicationMaster
负责同调度器协商应用所需的资源容器(多个),跟踪任务执行状态,监控执行进度;
YARN和第一版MapReduce框架的区别
- 降低了JobTracker的资源消耗,分布式的将监控任务的工作交给集群中执行任务的节点负责,相对单点的JobTractor,更加安全,可靠。
- 新的YARN中,AppMaster是一个可变部分,用户可以根据自己应用的情况自定义AM,使得更多的模型可以运行在hadoop中;
- 以资源作为分配任务的依据(目前是内存),比旧版的slot更加合理;
- YARN中,每个应用负责监控应用进度的是ApplicationMaster,而ResourceManager中的ApplicationsMasters负责监控这些AppMst的运行情况并对失败的AppMst重启;
- Container是为了将来做内存隔离而提出的一个框架;目前这个框架只支持内存隔离,而度量资源采用内存而非slot,也避免了由于map slot和reduce slot分开而造成的资源闲置的问题;