2014年7月22日星期二

Hadoop计算框架-MapReduce

介绍

MapReduce是Hadoop中的计算框架,用于分布式计算统计数据;
MapReduce经历了两个版本的演变,从Hadoop0.23.0开始,MapReduce框架演变为MRv2,也可以称作YARN,即第二版的MapReduce框架,主要改进了之前框架中的JobTracker单点问题以及集群节点上限的问题;
先简单看一下旧版本的MapReduce框架的结构图。

MapReduce DataFlow

Map-Reduce data flow
  • JobClient.runJob()创建一个JobClient实例,并调用submitJob函数
    • 向JobTracker获取一个JobId
    • 检测此Job的output配置
    • 计算job的split。每个split作为一个Map任务的输入,被划分了多少个split,就会有相应多个Map任务执行;
      • 划分时,只划分那些大于HDFS block大小的文件,小于等于一个HDFS block大小的文件是不会被划分的;
      • 当hadoop处理很多小文件的时候,会产生很多个Map任务,从而导致效率低下;
    • 将Job运行所需的资源文件拷贝到JobTracker的文件系统中,包括job的jar,配置文件,split信息等;
    • 通知JobTracker Job已经可以运行
    • 每隔一秒轮询一次job进度,并反馈给命令行,知道job运行完成;
  • JobTractor收到任务后,将任务放入队列
    • Job调度器从队列中获取并初始化任务;
    • Job调度器从共享文件系统获取计算好的split信息
    • 为每个split创建一个Map Task,并为每个task分配一个ID
  • 任务分配
    • TaskTracker周期性的向JobTracker发送heartbeat。
    • heartbeat中如果TaskTracker告知JobTracker已准备好运行一个task,那么JobTracker将按优先级分配一个task给TaskTracker
    • TaskTracker拥有固定槽数的map task和reduce task,这个可以在启动TaskTracker时指定;
    • 默认调度器中Map task优先于reduce task

  • Map的过程
    • MapRunnable从split中读取一行行数据记录,并按照mapper的map函数将数据输出;
    • map的输出并不直接写入硬盘,而是先写入缓存;
    • map同时会为写入数据启动一个守护线程,当buffer达到一定的阈值,守护线程开始将内存中的数据写入硬盘;
      • partitioner: 文件在写入内存缓冲区之前,首先要经过partitioner,为写入数据分区;实际上,partitioner主要的作用是将不同的key分配到不同的分区上,从而为reduce任务提供方便;
      • 根据上图,我们可以看到,写入磁盘时有一个spill操作,spill:sort and combination,即排序和合并;
      • 首先在写入磁盘前,数据先要经过排序;
      • 如果客户端设置过combiner,那么会按照combiner的设置来合并相同的key
      • 最后向磁盘写入溢写文件;
    • 每次写都会向磁盘写入一个文件,如果写入了多个文件,最终文件还会有一个合并的过程;一般合并会将相同的key合并成一个group,例如:{“aaa”, [5, 8, 2, …]},如果client设置过Combiner,也会使用Combiner来合并相同的key;
    • reducer可以通过http协议请求map的输出文件。
  • Reduce过程
    • MapTask结束是,MapTask通知JobTracker,TaskTracker通知JobTracker, 从而JobTracker知道了Map输出和TaskTracker的关系;
    • reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出;
    • reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。reduce task有多个copy线程,可以并行拷贝;
    • 多个map输出拷贝到reduce task后,一个守护线程将其合并为一个大的排好序的文件。
    • 当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
    • 最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。

问题

这是一个简单明了的过程,但是这个结构也存在很多问题:
  • JobTractor存在单点故障
  • JobTractor管理了所有的任务,当任务数过多时会导致JobTractor的内存占用非常高,因此一个MapReduce集群由于JobTractor的问题存在一个规模上限的问题,一般是3000-4000个节点;
  • TaskTractor的运行没有考虑到资源开销的问题,如果两个资源消耗非常大的task运行到一个TaskTractor节点,很可能会导致OOM的问题;

YARN

    从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:
YARN
MRv2最根本的想法是拆分JobTracker的两个最大的功能,资源管理、任务调度和监控;它的思路是有一个全局的资源管理器(Resource Manager)和每个应用一个Application Master。应用可能是一个传统的Map-Reduce作业或者DAG(有向无环图)任务;
资源管理器和每个节点的NodeManager组成了数据计算框架;资源管理器负责最终裁决系统中运行的所有应用使用的资源;
每个应用的ApplicationMaster是一个库框架,主要任务是和ResourceManager协商资源,和NodeManager协作运行和监控任务;

Resource Manager

主要由两部分构成,调度器和应用管理器;

调度器(Scheduler)

主要负责向运行在系统中的应用分配资源。这是一个纯粹意义上的调度器,因为它不会监控或追踪应用的执行状态;而且,它也不会对失败任务重启做任何报障;调度器的调度功能是依据应用执行需要的资源量,它通过一个资源容器来分配内存、cpu、磁盘、网络等资源,初期第一个版本只有内存。
调度器有一个可插拔的策略插件,这个插件负责分区所有的应用和应用队列;例如CapacitySchedulerFairScheduler都算作这个策略插件的一种实例;
CapacityScheduler支持分层级的应用队列,从而能够更好的预测享有的集群资源的多少;

应用管理器(Applications Manager)

主要负责接收任务提交,协商第一个执行这个应用的容器作为应用Master,提供当应用失败后重启Application Master的服务;

NodeManager

是整个框架在每台机器上的代理,它主要负责资源容器,监控资源使用率并向Resource Manager报告;

ApplicationMaster

负责同调度器协商应用所需的资源容器(多个),跟踪任务执行状态,监控执行进度;

YARN和第一版MapReduce框架的区别

  • 降低了JobTracker的资源消耗,分布式的将监控任务的工作交给集群中执行任务的节点负责,相对单点的JobTractor,更加安全,可靠。
  • 新的YARN中,AppMaster是一个可变部分,用户可以根据自己应用的情况自定义AM,使得更多的模型可以运行在hadoop中;
  • 以资源作为分配任务的依据(目前是内存),比旧版的slot更加合理;
  • YARN中,每个应用负责监控应用进度的是ApplicationMaster,而ResourceManager中的ApplicationsMasters负责监控这些AppMst的运行情况并对失败的AppMst重启;
  • Container是为了将来做内存隔离而提出的一个框架;目前这个框架只支持内存隔离,而度量资源采用内存而非slot,也避免了由于map slot和reduce slot分开而造成的资源闲置的问题;

没有评论:

发表评论