显示标签为“hadoop”的博文。显示所有博文
显示标签为“hadoop”的博文。显示所有博文

2014年7月22日星期二

Hadoop计算框架-MapReduce

介绍

MapReduce是Hadoop中的计算框架,用于分布式计算统计数据;
MapReduce经历了两个版本的演变,从Hadoop0.23.0开始,MapReduce框架演变为MRv2,也可以称作YARN,即第二版的MapReduce框架,主要改进了之前框架中的JobTracker单点问题以及集群节点上限的问题;
先简单看一下旧版本的MapReduce框架的结构图。

MapReduce DataFlow

Map-Reduce data flow
  • JobClient.runJob()创建一个JobClient实例,并调用submitJob函数
    • 向JobTracker获取一个JobId
    • 检测此Job的output配置
    • 计算job的split。每个split作为一个Map任务的输入,被划分了多少个split,就会有相应多个Map任务执行;
      • 划分时,只划分那些大于HDFS block大小的文件,小于等于一个HDFS block大小的文件是不会被划分的;
      • 当hadoop处理很多小文件的时候,会产生很多个Map任务,从而导致效率低下;
    • 将Job运行所需的资源文件拷贝到JobTracker的文件系统中,包括job的jar,配置文件,split信息等;
    • 通知JobTracker Job已经可以运行
    • 每隔一秒轮询一次job进度,并反馈给命令行,知道job运行完成;
  • JobTractor收到任务后,将任务放入队列
    • Job调度器从队列中获取并初始化任务;
    • Job调度器从共享文件系统获取计算好的split信息
    • 为每个split创建一个Map Task,并为每个task分配一个ID
  • 任务分配
    • TaskTracker周期性的向JobTracker发送heartbeat。
    • heartbeat中如果TaskTracker告知JobTracker已准备好运行一个task,那么JobTracker将按优先级分配一个task给TaskTracker
    • TaskTracker拥有固定槽数的map task和reduce task,这个可以在启动TaskTracker时指定;
    • 默认调度器中Map task优先于reduce task

  • Map的过程
    • MapRunnable从split中读取一行行数据记录,并按照mapper的map函数将数据输出;
    • map的输出并不直接写入硬盘,而是先写入缓存;
    • map同时会为写入数据启动一个守护线程,当buffer达到一定的阈值,守护线程开始将内存中的数据写入硬盘;
      • partitioner: 文件在写入内存缓冲区之前,首先要经过partitioner,为写入数据分区;实际上,partitioner主要的作用是将不同的key分配到不同的分区上,从而为reduce任务提供方便;
      • 根据上图,我们可以看到,写入磁盘时有一个spill操作,spill:sort and combination,即排序和合并;
      • 首先在写入磁盘前,数据先要经过排序;
      • 如果客户端设置过combiner,那么会按照combiner的设置来合并相同的key
      • 最后向磁盘写入溢写文件;
    • 每次写都会向磁盘写入一个文件,如果写入了多个文件,最终文件还会有一个合并的过程;一般合并会将相同的key合并成一个group,例如:{“aaa”, [5, 8, 2, …]},如果client设置过Combiner,也会使用Combiner来合并相同的key;
    • reducer可以通过http协议请求map的输出文件。
  • Reduce过程
    • MapTask结束是,MapTask通知JobTracker,TaskTracker通知JobTracker, 从而JobTracker知道了Map输出和TaskTracker的关系;
    • reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出;
    • reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。reduce task有多个copy线程,可以并行拷贝;
    • 多个map输出拷贝到reduce task后,一个守护线程将其合并为一个大的排好序的文件。
    • 当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
    • 最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。

问题

这是一个简单明了的过程,但是这个结构也存在很多问题:
  • JobTractor存在单点故障
  • JobTractor管理了所有的任务,当任务数过多时会导致JobTractor的内存占用非常高,因此一个MapReduce集群由于JobTractor的问题存在一个规模上限的问题,一般是3000-4000个节点;
  • TaskTractor的运行没有考虑到资源开销的问题,如果两个资源消耗非常大的task运行到一个TaskTractor节点,很可能会导致OOM的问题;

YARN

    从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:
YARN
MRv2最根本的想法是拆分JobTracker的两个最大的功能,资源管理、任务调度和监控;它的思路是有一个全局的资源管理器(Resource Manager)和每个应用一个Application Master。应用可能是一个传统的Map-Reduce作业或者DAG(有向无环图)任务;
资源管理器和每个节点的NodeManager组成了数据计算框架;资源管理器负责最终裁决系统中运行的所有应用使用的资源;
每个应用的ApplicationMaster是一个库框架,主要任务是和ResourceManager协商资源,和NodeManager协作运行和监控任务;

Resource Manager

主要由两部分构成,调度器和应用管理器;

调度器(Scheduler)

主要负责向运行在系统中的应用分配资源。这是一个纯粹意义上的调度器,因为它不会监控或追踪应用的执行状态;而且,它也不会对失败任务重启做任何报障;调度器的调度功能是依据应用执行需要的资源量,它通过一个资源容器来分配内存、cpu、磁盘、网络等资源,初期第一个版本只有内存。
调度器有一个可插拔的策略插件,这个插件负责分区所有的应用和应用队列;例如CapacitySchedulerFairScheduler都算作这个策略插件的一种实例;
CapacityScheduler支持分层级的应用队列,从而能够更好的预测享有的集群资源的多少;

应用管理器(Applications Manager)

主要负责接收任务提交,协商第一个执行这个应用的容器作为应用Master,提供当应用失败后重启Application Master的服务;

NodeManager

是整个框架在每台机器上的代理,它主要负责资源容器,监控资源使用率并向Resource Manager报告;

ApplicationMaster

负责同调度器协商应用所需的资源容器(多个),跟踪任务执行状态,监控执行进度;

YARN和第一版MapReduce框架的区别

  • 降低了JobTracker的资源消耗,分布式的将监控任务的工作交给集群中执行任务的节点负责,相对单点的JobTractor,更加安全,可靠。
  • 新的YARN中,AppMaster是一个可变部分,用户可以根据自己应用的情况自定义AM,使得更多的模型可以运行在hadoop中;
  • 以资源作为分配任务的依据(目前是内存),比旧版的slot更加合理;
  • YARN中,每个应用负责监控应用进度的是ApplicationMaster,而ResourceManager中的ApplicationsMasters负责监控这些AppMst的运行情况并对失败的AppMst重启;
  • Container是为了将来做内存隔离而提出的一个框架;目前这个框架只支持内存隔离,而度量资源采用内存而非slot,也避免了由于map slot和reduce slot分开而造成的资源闲置的问题;

2014年7月18日星期五

Hadoop分布式文件系统-HDFS

简介

        Hadoop分布式文件系统(HDFS)是一个分布式的文件系统,被设计用于运行在商品级的硬件架构上;它和现有的一些分布式文件系统有很多相似点,但也有非常大的差别;它的容错性非常好,并且可以在低廉的硬件上运行;HDFS可以为应用提供高吞吐量的数据访问,并且非常适合那些大数据量的应用;HDFS实现很少的POSIX需求来达到和文件系统的数据交互;HDFS最初是作为Apache Nutch搜索引擎的基础组件被设计的,它也是Hadoop的核心工程之一。

假定和目标

硬件故障

一个HDFS实例可能包括数百甚至数千台服务器设备,每台负责存储整体的一部分数据。事实上,一个包括大量组件组成的系统,包含出问题的组件会有非常大的可能性,也就是说总是会有一些组件处于无法服务的状态;因此,发现这些错误组件并非常快的恢复是HDFS核心设计需求之一;

流式数据获取

在HDFS上运行的服务需要使用流的方式访问它们的数据集合;它们不是运行在普通文件系统上的普通应用;HDFS被设计更多用于批量流式获取数据而非交互式访问;它强调的是高吞吐量,而不是低延迟数据访问;POSIX有很多强制的要求但是并非是HDFS的目标,为了提高吞吐量,在一些关键地方队POSIX语义做了修改;

大数据集合

在HDFS运行的应用都具有非常大的数据集合,典型的文件大小是以GB或TB来度量的。HDFS用来被支持大文件,它应该提供高带宽的数据写入和单机群数百个节点的扩展;它可以在单个实例支持数千万的文件;

简单的一致性模型

HDFS应用读取文件需要一个一次写入多次读取模型;一个文件经过创建,写入,关闭之后就不会发生变化;这个假设简化了数据一致性的问题并且为高吞吐量提供了保证。Map/Reduce应用或者网络爬虫应用都非常适合这个模型。目前还有计划在将来扩充这个模型,使之支持文件的附加写操作。

“移动计算比移动数据更划算”

一个应用请求的计算,离它操作的数据越近就越高效,在数据达到海量级别的时候更是如此。因为这样就能降低网络阻塞的影响,提高系统数据的吞吐量。将计算移动到数据附近,比之将数据移动到应用所在显然更好。HDFS为应用提供了将它们自己移动到数据附近的接口。

异构软硬件平台间的可移植性

HDFS在设计的时候就考虑到平台的可移植性。这种特性方便了HDFS作为大规模数据应用平台的推广。

Namenode 和 Datanode

HDFS采用master/slave架构。一个HDFS集群是由一个Namenode和一定数目的Datanodes组成。Namenode是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问。集群中的Datanode一般是一个节点一个,负责管理它所在节点上的存储。HDFS暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组Datanode上。Namenode执行文件系统的名字空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体Datanode节点的映射。Datanode负责处理文件系统客户端的读写请求。在Namenode的统一调度下进行数据块的创建、删除和复制。
图1 HDFS架构图
Namenode和Datanode被设计成可以在普通的商用机器上运行。这些机器一般运行着GNU/Linux操作系统(OS)。HDFS采用Java语言开发,因此任何支持Java的机器都可以部署Namenode或Datanode。由于采用了可移植性极强的Java语言,使得HDFS可以部署到多种类型的机器上。一个典型的部署场景是一台机器上只运行一个Namenode实例,而集群中的其它机器分别运行一个Datanode实例。这种架构并不排斥在一台机器上运行多个Datanode,只不过这样的情况比较少见。
集群中单一Namenode的结构大大简化了系统的架构。Namenode是所有HDFS元数据的仲裁者和管理者,这样,用户数据永远不会流过Namenode。

HBase中的MasterServer-RegionServer和Namenode-Datanode

HBase可以在底层使用HDFS存储,那么HBase中的MasterServer与Namenode关系是怎样的,RegionServer与Datanode的关系又是什么样的?

MasterServer与Namenode

MasterServer一般与Namenode运行在同一台服务器上;除此之外,他们之间是毫无关系的。

RegionServer与Datanode

        HBase中,RegionServer是负责数据存储的节点,在集成HDFS情况下,一般RegionServer和Datanode会处在同一个机器节点上;首先我们假设HDFS的文件备份数为3,那么写入数据时,RegionServer会将数据写入它管理的Region中的Memstore中,当触发Flush操作时(触发条件可以参考之前的HBase架构中的Blog),RegionServer会将数据写入到HDFS中;写入过程是向Namenode发送写入数据的请求,Namenode如果发现RegionServer和HDFS集群的某一个Datanode在同一个机器上,首先第一份数据备份会被写入到RegionServer本地的Datanode节点中,其他两份数据备份会被写入到其他的Datanodes中。由于首先写入本地的Datanode节点,数据在读取时,不会出现从远端读取的情况; 
        当一个RegionServer故障时,RegionMaster会将该regionServer负责的region重新分配给其他存活的regionServer。当一个region被分配给一个regionServer时,它首先会强制从远端读取数据,直到第一个major合并发生后,新的regionServer会重新将数据写入到本地节点;注:当一个regionServer故障时,它负责的region会被分配给多个regionServer;

文件系统的名字空间 (namespace)

HDFS支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。当前,HDFS不支持用户磁盘配额和访问权限控制,也不支持硬链接和软链接。但是HDFS架构并不妨碍实现这些特性。
Namenode负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将被Namenode记录下来。应用程序可以设置HDFS保存的文件的副本数目。文件副本的数目称为文件的副本系数,这个信息也是由Namenode保存的。

数据复制

HDFS被设计成能够在一个大集群中跨机器可靠地存储超大文件。它将每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。为了容错,文件的所有数据块都会有副本。每个文件的数据块大小和副本系数都是可配置的。应用程序可以指定某个文件的副本数目。副本系数可以在文件创建的时候指定,也可以在之后改变。HDFS中的文件都是一次性写入的,并且严格要求在任何时候只能有一个写入者。
Namenode全权管理数据块的复制,它周期性地从集群中的每个Datanode接收心跳信号和块状态报告(Blockreport)。接收到心跳信号意味着该Datanode节点工作正常。块状态报告包含了一个该Datanode上所有数据块的列表。
图2 Datanodes

副本存放: 最最开始的一步

副本的存放是HDFS可靠性和性能的关键。优化的副本存放策略是HDFS区分于其他大部分分布式文件系统的重要特性。这种特性需要做大量的调优,并需要经验的积累。HDFS采用一种称为机架感知(rack-aware)的策略来改进数据的可靠性、可用性和网络带宽的利用率。目前实现的副本存放策略只是在这个方向上的第一步。实现这个策略的短期目标是验证它在生产环境下的有效性,观察它的行为,为实现更先进的策略打下测试和研究的基础。
大型HDFS实例一般运行在跨越多个机架的计算机组成的集群上,不同机架上的两台机器之间的通讯需要经过交换机。在大多数情况下,同一个机架内的两台机器间的带宽会比不同机架的两台机器间的带宽大。
通过一个机架感知的过程,Namenode可以确定每个Datanode所属的机架id。一个简单但没有优化的策略就是将副本存放在不同的机架上。这样可以有效防止当整个机架失效时数据的丢失,并且允许读数据的时候充分利用多个机架的带宽。这种策略设置可以将副本均匀分布在集群中,有利于当组件失效情况下的负载均衡。但是,因为这种策略的一个写操作需要传输数据块到多个机架,这增加了写的代价。
在大多数情况下,副本系数是3,HDFS的存放策略是将一个副本存放在本地机架的节点上,一个副本放在同一机架的另一个节点上,最后一个副本放在不同机架的节点上。这种策略减少了机架间的数据传输,这就提高了写操作的效率。机架的错误远远比节点的错误少,所以这个策略不会影响到数据的可靠性和可用性。于此同时,因为数据块只放在两个(不是三个)不同的机架上,所以此策略减少了读取数据时需要的网络传输总带宽。在这种策略下,副本并不是均匀分布在不同的机架上。三分之一的副本在一个节点上,三分之二的副本在一个机架上,其他副本均匀分布在剩下的机架中,这一策略在不损害数据可靠性和读取性能的情况下改进了写的性能。

副本选择

为了降低整体的带宽消耗和读取延时,HDFS会尽量让读取程序读取离它最近的副本。如果在读取程序的同一个机架上有一个副本,那么就读取该副本。如果一个HDFS集群跨越多个数据中心,那么客户端也将首先读本地数据中心的副本。

安全模式

Namenode启动后会进入一个称为安全模式的特殊状态。处于安全模式的Namenode是不会进行数据块的复制的。Namenode从所有的 Datanode接收心跳信号和块状态报告。块状态报告包括了某个Datanode所有的数据块列表。每个数据块都有一个指定的最小副本数。当Namenode检测确认某个数据块的副本数目达到这个最小值,那么该数据块就会被认为是副本安全(safely replicated)的;在一定百分比(这个参数可配置)的数据块被Namenode检测确认是安全之后(加上一个额外的30秒等待时间),Namenode将退出安全模式状态。接下来它会确定还有哪些数据块的副本没有达到指定数目,并将这些数据块复制到其他Datanode上。

文件系统元数据的持久化

Namenode上保存着HDFS的名字空间。对于任何对文件系统元数据产生修改的操作,Namenode都会使用一种称为EditLog的事务日志记录下来。例如,在HDFS中创建一个文件,Namenode就会在Editlog中插入一条记录来表示;同样地,修改文件的副本系数也将往Editlog插入一条记录。Namenode在本地操作系统的文件系统中存储这个Editlog。整个文件系统的名字空间,包括数据块到文件的映射、文件的属性等,都存储在一个称为FsImage的文件中,这个文件也是放在Namenode所在的本地文件系统上。
Namenode在内存中保存着整个文件系统的名字空间和文件数据块映射(Blockmap)的映像。这个关键的元数据结构设计得很紧凑,因而一个有4G内存的Namenode足够支撑大量的文件和目录。当Namenode启动时,它从硬盘中读取Editlog和FsImage,将所有Editlog中的事务作用在内存中的FsImage上,并将这个新版本的FsImage从内存中保存到本地磁盘上,然后删除旧的Editlog,因为这个旧的Editlog的事务都已经作用在FsImage上了。这个过程称为一个检查点(checkpoint)。在当前实现中,检查点只发生在Namenode启动时,在不久的将来将实现支持周期性的检查点。
Datanode将HDFS数据以文件的形式存储在本地的文件系统中,它并不知道有关HDFS文件的信息。它把每个HDFS数据块存储在本地文件系统的一个单独的文件中。Datanode并不在同一个目录创建所有的文件,实际上,它用试探的方法来确定每个目录的最佳文件数目,并且在适当的时候创建子目录。在同一个目录中创建所有的本地文件并不是最优的选择,这是因为本地文件系统可能无法高效地在单个目录中支持大量的文件。当一个Datanode启动时,它会扫描本地文件系统,产生一个这些本地文件对应的所有HDFS数据块的列表,然后作为报告发送到Namenode,这个报告就是块状态报告。

通讯协议

所有的HDFS通讯协议都是建立在TCP/IP协议之上。客户端通过一个可配置的TCP端口连接到Namenode,通过ClientProtocol协议与Namenode交互。而Datanode使用DatanodeProtocol协议与Namenode交互。一个远程过程调用(RPC)模型被抽象出来封装ClientProtocol和Datanodeprotocol协议。在设计上,Namenode不会主动发起RPC,而是响应来自客户端或 Datanode 的RPC请求。

健壮性

HDFS的主要目标就是即使在出错的情况下也要保证数据存储的可靠性。常见的三种出错情况是:Namenode出错, Datanode出错和网络割裂(network partitions)。

磁盘数据错误,心跳检测和重新复制

每个Datanode节点周期性地向Namenode发送心跳信号。网络割裂可能导致一部分Datanode跟Namenode失去联系。Namenode通过心跳信号的缺失来检测这一情况,并将这些近期不再发送心跳信号Datanode标记为宕机,不会再将新的IO请求发给它们。任何存储在宕机Datanode上的数据将不再有效。Datanode的宕机可能会引起一些数据块的副本系数低于指定值,Namenode不断地检测这些需要复制的数据块,一旦发现就启动复制操作。在下列情况下,可能需要重新复制:某个Datanode节点失效,某个副本遭到损坏,Datanode上的硬盘错误,或者文件的副本系数增大。

集群均衡

HDFS的架构支持数据均衡策略。如果某个Datanode节点上的空闲空间低于特定的临界点,按照均衡策略系统就会自动地将数据从这个Datanode移动到其他空闲的Datanode。当对某个文件的请求突然增加,那么也可能启动一个计划创建该文件新的副本,并且同时重新平衡集群中的其他数据。这些均衡策略目前还没有实现。

数据完整性

从某个Datanode获取的数据块有可能是损坏的,损坏可能是由Datanode的存储设备错误、网络错误或者软件bug造成的。HDFS客户端软件实现了对HDFS文件内容的校验和(checksum)检查。当客户端创建一个新的HDFS文件,会计算这个文件每个数据块的校验和,并将校验和作为一个单独的隐藏文件保存在同一个HDFS名字空间下。当客户端获取文件内容后,它会检验从Datanode获取的数据跟相应的校验和文件中的校验和是否匹配,如果不匹配,客户端可以选择从其他Datanode获取该数据块的副本。

元数据磁盘错误

FsImage和Editlog是HDFS的核心数据结构。如果这些文件损坏了,整个HDFS实例都将失效。因而,Namenode可以配置成支持维护多个FsImage和Editlog的副本。任何对FsImage或者Editlog的修改,都将同步到它们的副本上。这种多副本的同步操作可能会降低Namenode每秒处理的名字空间事务数量。然而这个代价是可以接受的,因为即使HDFS的应用是数据密集的,它们也非元数据密集的。当Namenode重启的时候,它会选取最近的完整的FsImage和Editlog来使用。
Namenode是HDFS集群中的单点故障(single point of failure)所在。如果Namenode机器故障,是需要手工干预的。目前,自动重启或在另一台机器上做Namenode故障转移的功能还没实现。

快照

快照支持某一特定时刻的数据的复制备份。利用快照,可以让HDFS在数据损坏时恢复到过去一个已知正确的时间点。HDFS目前还不支持快照功能,但计划在将来的版本进行支持。

数据组织

数据块

HDFS被设计成支持大文件,适用HDFS的是那些需要处理大规模的数据集的应用。这些应用都是只写入数据一次,但却读取一次或多次,并且读取速度应能满足流式读取的需要。HDFS支持文件的“一次写入多次读取”语义。一个典型的数据块大小是64MB。因而,HDFS中的文件总是按照64M被切分成不同的块,每个块尽可能地存储于不同的Datanode中。

Staging

客户端创建文件的请求其实并没有立即发送给Namenode,事实上,在刚开始阶段HDFS客户端会先将文件数据缓存到本地的一个临时文件。应用程序的写操作被透明地重定向到这个临时文件。当这个临时文件累积的数据量超过一个数据块的大小,客户端才会联系Namenode。Namenode将文件名插入文件系统的层次结构中,并且分配一个数据块给它。然后返回Datanode的标识符和目标数据块给客户端。接着客户端将这块数据从本地临时文件上传到指定的Datanode上。当文件关闭时,在临时文件中剩余的没有上传的数据也会传输到指定的Datanode上。然后客户端告诉Namenode文件已经关闭。此时Namenode才将文件创建操作提交到日志里进行存储。如果Namenode在文件关闭前宕机了,则该文件将丢失。
上述方法是对在HDFS上运行的目标应用进行认真考虑后得到的结果。这些应用需要进行文件的流式写入。如果不采用客户端缓存,由于网络速度和网络堵塞会对吞估量造成比较大的影响。这种方法并不是没有先例的,早期的文件系统,比如AFS,就用客户端缓存来提高性能。为了达到更高的数据上传效率,已经放松了POSIX标准的要求。

流水线复制

当客户端向HDFS文件写入数据的时候,一开始是写到本地临时文件中。假设该文件的副本系数设置为3,当本地临时文件累积到一个数据块的大小时,客户端会从Namenode获取一个Datanode列表用于存放副本。然后客户端开始向第一个Datanode传输数据,第一个Datanode一小部分一小部分(4 KB)地接收数据,将每一部分写入本地仓库,并同时传输该部分到列表中第二个Datanode节点。第二个Datanode也是这样,一小部分一小部分地接收数据,写入本地仓库,并同时传给第三个Datanode。最后,第三个Datanode接收数据并存储在本地。因此,Datanode能流水线式地从前一个节点接收数据,并在同时转发给下一个节点,数据以流水线的方式从前一个Datanode复制到下一个。

存储空间回收

文件的删除和恢复

当用户或应用程序删除某个文件时,这个文件并没有立刻从HDFS中删除。实际上,HDFS会将这个文件重命名转移到/trash目录。只要文件还在/trash目录中,该文件就可以被迅速地恢复。文件在/trash中保存的时间是可配置的,当超过这个时间时,Namenode就会将该文件从名字空间中删除。删除文件会使得该文件相关的数据块被释放。注意,从用户删除文件到HDFS空闲空间的增加之间会有一定时间的延迟。
只要被删除的文件还在/trash目录中,用户就可以恢复这个文件。如果用户想恢复被删除的文件,他/她可以浏览/trash目录找回该文件。/trash目录仅仅保存被删除文件的最后副本。/trash目录与其他的目录没有什么区别,除了一点:在该目录上HDFS会应用一个特殊策略来自动删除文件。目前的默认策略是删除/trash中保留时间超过6小时的文件。将来,这个策略可以通过一个被良好定义的接口配置。

减少副本系数

当一个文件的副本系数被减小后,Namenode会选择过剩的副本删除。下次心跳检测时会将该信息传递给Datanode。Datanode遂即移除相应的数据块,集群中的空闲空间加大。同样,在调用setReplication API结束和集群中空闲空间增加间会有一定的延迟。

2014年6月18日星期三

[翻译]Hadoop中的SequenceFile

【原文】http://blog.cloudera.com/blog/2011/01/hadoop-io-sequence-map-set-array-bloommap-files/ hadoop中的SequenceFile提供了一种持久存储二进制k-v键值对的数据结构。和B-tree不同,SequenceFile不能支持对指定key的修改,增加或删除。整个文件只能以追加的方式写入数据。 SequenceFile有三种存储格式:非压缩格式,记录压缩格式和分块压缩格式; 每种格式都包含一个Header,这个Header可以帮助读取方识别存储格式; 1. 包括key值和value的Class信息,读取方可以通过反射机制实例化这些类。 2. 版本 version number 3. 存储数据的格式,如果是压缩的格式,则还会提供压缩编码的class name信息; SequenceFile还可以包含一个二级k-v结构的列表,作为整个SequenceFile的metadata; SequenceFile的元数据信息中只能存储text-text类型的数据,元数据是在SequenceFile的Writer初始化的时候被写入的,因此元数据是不能被更改的。 元数据(摘自维基百科):其它文件保存信息常常伴随着文件自身保存在文件系统中。文件长度可能是分配给这个文件的区块数,也可能是这个文件实际的字节数。文件最后修改时间也许记录在文件的时间戳中。有的文件系统还保存文件的创建时间,最后访问时间及属性修改时间。(不过大多数早期的文件系统不记录文件的时间信息)其它信息还包括文件设备类型(如:区块数,字符集,套接口,子目录等等),文件所有者的ID,组ID,还有访问权限(如:只读,可执行等等)。 SequenceFile的文件存储有三种格式,其中非压缩和记录压缩两个格式非常类似;调用append方法来添加记录时,会append一个记录(record)到SequenceFile,Record包括: 整体record的长度(key长度+value长度);key长度;key;value;(key,value都是raw格式数据) 记录压缩和非压缩的区别仅在于,value是否是被压缩的,并且压缩时会提供一个压缩编码类; Block压缩格式相对比较复杂;数据在达到一个阈值前不会被写入,当达到阈值之后,所有的key和value都会被集中压缩,并生成keys的长度的list以及values的长度;两个块记录中间存在一个同步标记(sync marker) 如上图,每个块记录会记录buffer的记录数,还包含key长度的列表,keys列表,value长度列表,value列表; Hadoop的SequenceFile是其他文件的基础,例如:MapFile, SetFile, ArrayFile and BloomMapFile. 关于Sync标记: a sync point is a point in the stream which can used by to resynchronize with a record boundary if the reader is "lost" - for example after seeking to an arbitrary position on the stream. 其他: MapFile:实际上是一个有序的sequenceFile。它是一个包含两个sequenceFile的目录,data文件(/data)和index文件(/index); data包含所有的k-v记录,但是key(n+1)必须大于等于key(n), 当进行append()操作时会检查,如果检查失败,会返回一个IOException “Key out of order”.
index记录了所有key值和记录的偏移位置;index的数据会被加载到内存中,通过存储的映射关系快速定位到文件中的记录; 需要注意的是,index不会将所有的key都加载到内存中,默认情况,没个128条记录会有一条index的映射值存储到内存中,可以通过setIndexInterval来改变这个数值; SetFile和ArrayFile都是基于MapFile的,通过增加很少的几行代码实现的。 setFile只会append(key), value是一个NullWritable 的实例; ArrayFile只会append(value), key是一个LongWritable实例的记录编号,内容是count+1; BloomFile实现了MapFile,并且增加了另外一个bloom file(/bloom), 这个文件中存储的是序列化的DynamicBloomFilter变量,存储了新增加的key值;

2013年2月7日星期四

KAFKA在统计系统中的应用

Kafka

kafka是LinkedIn开源的一款分布式的发布-订阅消息系统,它具有:
1. 通过O(1)的磁盘结构持久化存储消息,即使TB级的数据也能保持长期稳定;
2. 高吞吐率:即使非常普通的硬件,kafka也能支持每秒数十万的消息;
3. 支持通过kafka服务器和消费集群来分区消息;
4. 支持Hadoop并行加载;

设计流程

通过集成kafka和log4j在各个需要采集日志的系统进行日志采集,日志统一发送到kafka的消息队列。再通过定时运行kafka-hadoop的作业,将数据从KAFKA同步到hadoop的HDFS中。hadoop的各类map-reduce作业可以对数据进行统计,存储到db或缓存中。

安装

首先安装kafka,下载kafka到本地,然后执行:
> tar xzf kafka-<VERSION>.tgz
> cd kafka-<VERSION>
> ./sbt update
> ./sbt package
启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2010-11-21 23:45:02,335] INFO Reading configuration from: config/zookeeper.properties...
启动kafka
> bin/kafka-server-start.sh config/server.properties
jkreps-mn-2:kafka-trunk jkreps$ bin/kafka-server-start.sh config/server.properties 
[2010-11-21 23:51:39,608] INFO starting log cleaner every 60000 ms (kafka.log.LogManager)
[2010-11-21 23:51:39,628] INFO connecting to ZK: localhost:2181 (kafka.server.KafkaZooKeeper)...
发送一些消息
> bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic test 
This is a message
This is another message
查看消息
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
至此,kafka已经启动成功。

安装hadoop,下载hadoop0.20.2,按照hadoop的操作手册安装,此处省略安装过程。

配置log4j和kafka-log

#配置KAFKA
#Hostname表示应用的名称
#Topic表示打印日志生成的目录名
log4j.appender.KAFKA=com.comp.kafka.AsyncKafkaAppender
log4j.appender.KAFKA.topic=TOPIC
log4j.appender.KAFKA.bufferSize = 10
log4j.appender.KAFKA.brokerList=0:kafka-server-ip:9092
log4j.appender.KAFKA.serializerClass=kafka.serializer.StringEncoder
log4j.appender.KAFKA.hostname=APP-NAME
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
log4j.logger.com.comp= DEBUG, KAFKA
配置中,配置了kafka的ip和端口,并设置了topic和序列化类和pattern。
AsyncKafkaAppender是继承了log4j的AsyncAppender,是log4j异步发送日志的模式,当log达到bufferSize的大小时,会统一由log4j异步执行打印操作。代码如下:
public class AsyncKafkaAppender extends AsyncAppender {
    private java.lang.String topic;
    private java.lang.String serializerClass;
    private java.lang.String zkConnect;
    private java.lang.String brokerList;
    private java.lang.String hostname;

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getSerializerClass() {
        return serializerClass;
    }

    public void setSerializerClass(String serializerClass) {
        this.serializerClass = serializerClass;
    }

    public String getZkConnect() {
        return zkConnect;
    }

    public void setZkConnect(String zkConnect) {
        this.zkConnect = zkConnect;
    }

    public String getBrokerList() {
        return brokerList;
    }

    public void setBrokerList(String brokerList) {
        this.brokerList = brokerList;
    }

    public String getHostname() {
        return hostname;
    }

    public void setHostname(String hostname) {
        this.hostname = hostname;
    }
    @Override
    public void activateOptions() {
        super.activateOptions();
        synchronized (this) {
            KafkaLog4jAppender kafka = new KafkaLog4jAppender();
            kafka.setLayout(getLayout());
            kafka.setHostname(getHostname());
            kafka.setBrokerList(getBrokerList());
            kafka.setSerializerClass(getSerializerClass());
            kafka.setZkConnect(getZkConnect());
            kafka.setTopic(getTopic());
            kafka.activateOptions();
            addAppender(kafka);
        }
    }

    @Override
    public boolean requiresLayout() {
        return true;
    }
}
这时我们就可以撰写一个测试代码,测试log是否已经发送到了kafka。

KAFKA消息发送到Hadoop

在kafka的contrib目录的hadoop-consumer中有一系列的文件,包括脚本,jar,配置文件等。我们可以直接使用这个目录下的脚本进行定时数据同步。
需要先修改test目录下的test.properties:
# kafka的topic名称
kafka.etl.topic=TOPIC

# hdfs location of jars
hdfs.default.classpath.dir=/tmp/kafka/lib

# number of test events to be generated
event.count=1000

# hadoop id and group
hadoop.job.ugi=kafka,hadoop

# kafka server uri
kafka.server.uri=tcp://localhost:9092

# hdfs location of input directory
input=hdfs://localhost:9000/tmp/kafka/data

# hdfs location of output directory
output=hdfs://localhost:9000/tmp/kafka/output

# limit the number of events to be fetched;
# value -1 means no limitation
kafka.request.limit=-1

# kafka parameters
client.buffer.size=1048576
client.so.timeout=60000
修改topic名称以及对应hadoop的input,output目录。
首先生成offset,执行:
./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
然后拷贝依赖的jar文件:
./copy-jars.sh ${hdfs.default.classpath.dir}
最后执行hadoop任务:
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties
执行完成后,运行hadoop脚本查看是否已经在output目录生成数据:
bin/hadoop fs -cat /tmp/kafka/output/part-00000 | wc -l
至此,从日志从各个系统已经成功收集到hadoop的HDFS文件系统中。
现在只需要撰写一些map-reduce作业,就可以利用hadoop进行数据统计了。

KAFKA消息清除

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.file.size=536870912
在配置中,如果一个log file距离上一次写入时间达到168小时,也就是一周,会自动清除这个日志文件;日志文件的上限大小是536870912,超过这个大小会创建新的日志文件。