2014年7月22日星期二

Hadoop计算框架-MapReduce

介绍

MapReduce是Hadoop中的计算框架,用于分布式计算统计数据;
MapReduce经历了两个版本的演变,从Hadoop0.23.0开始,MapReduce框架演变为MRv2,也可以称作YARN,即第二版的MapReduce框架,主要改进了之前框架中的JobTracker单点问题以及集群节点上限的问题;
先简单看一下旧版本的MapReduce框架的结构图。

MapReduce DataFlow

Map-Reduce data flow
  • JobClient.runJob()创建一个JobClient实例,并调用submitJob函数
    • 向JobTracker获取一个JobId
    • 检测此Job的output配置
    • 计算job的split。每个split作为一个Map任务的输入,被划分了多少个split,就会有相应多个Map任务执行;
      • 划分时,只划分那些大于HDFS block大小的文件,小于等于一个HDFS block大小的文件是不会被划分的;
      • 当hadoop处理很多小文件的时候,会产生很多个Map任务,从而导致效率低下;
    • 将Job运行所需的资源文件拷贝到JobTracker的文件系统中,包括job的jar,配置文件,split信息等;
    • 通知JobTracker Job已经可以运行
    • 每隔一秒轮询一次job进度,并反馈给命令行,知道job运行完成;
  • JobTractor收到任务后,将任务放入队列
    • Job调度器从队列中获取并初始化任务;
    • Job调度器从共享文件系统获取计算好的split信息
    • 为每个split创建一个Map Task,并为每个task分配一个ID
  • 任务分配
    • TaskTracker周期性的向JobTracker发送heartbeat。
    • heartbeat中如果TaskTracker告知JobTracker已准备好运行一个task,那么JobTracker将按优先级分配一个task给TaskTracker
    • TaskTracker拥有固定槽数的map task和reduce task,这个可以在启动TaskTracker时指定;
    • 默认调度器中Map task优先于reduce task

  • Map的过程
    • MapRunnable从split中读取一行行数据记录,并按照mapper的map函数将数据输出;
    • map的输出并不直接写入硬盘,而是先写入缓存;
    • map同时会为写入数据启动一个守护线程,当buffer达到一定的阈值,守护线程开始将内存中的数据写入硬盘;
      • partitioner: 文件在写入内存缓冲区之前,首先要经过partitioner,为写入数据分区;实际上,partitioner主要的作用是将不同的key分配到不同的分区上,从而为reduce任务提供方便;
      • 根据上图,我们可以看到,写入磁盘时有一个spill操作,spill:sort and combination,即排序和合并;
      • 首先在写入磁盘前,数据先要经过排序;
      • 如果客户端设置过combiner,那么会按照combiner的设置来合并相同的key
      • 最后向磁盘写入溢写文件;
    • 每次写都会向磁盘写入一个文件,如果写入了多个文件,最终文件还会有一个合并的过程;一般合并会将相同的key合并成一个group,例如:{“aaa”, [5, 8, 2, …]},如果client设置过Combiner,也会使用Combiner来合并相同的key;
    • reducer可以通过http协议请求map的输出文件。
  • Reduce过程
    • MapTask结束是,MapTask通知JobTracker,TaskTracker通知JobTracker, 从而JobTracker知道了Map输出和TaskTracker的关系;
    • reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出;
    • reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。reduce task有多个copy线程,可以并行拷贝;
    • 多个map输出拷贝到reduce task后,一个守护线程将其合并为一个大的排好序的文件。
    • 当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
    • 最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。

问题

这是一个简单明了的过程,但是这个结构也存在很多问题:
  • JobTractor存在单点故障
  • JobTractor管理了所有的任务,当任务数过多时会导致JobTractor的内存占用非常高,因此一个MapReduce集群由于JobTractor的问题存在一个规模上限的问题,一般是3000-4000个节点;
  • TaskTractor的运行没有考虑到资源开销的问题,如果两个资源消耗非常大的task运行到一个TaskTractor节点,很可能会导致OOM的问题;

YARN

    从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:
YARN
MRv2最根本的想法是拆分JobTracker的两个最大的功能,资源管理、任务调度和监控;它的思路是有一个全局的资源管理器(Resource Manager)和每个应用一个Application Master。应用可能是一个传统的Map-Reduce作业或者DAG(有向无环图)任务;
资源管理器和每个节点的NodeManager组成了数据计算框架;资源管理器负责最终裁决系统中运行的所有应用使用的资源;
每个应用的ApplicationMaster是一个库框架,主要任务是和ResourceManager协商资源,和NodeManager协作运行和监控任务;

Resource Manager

主要由两部分构成,调度器和应用管理器;

调度器(Scheduler)

主要负责向运行在系统中的应用分配资源。这是一个纯粹意义上的调度器,因为它不会监控或追踪应用的执行状态;而且,它也不会对失败任务重启做任何报障;调度器的调度功能是依据应用执行需要的资源量,它通过一个资源容器来分配内存、cpu、磁盘、网络等资源,初期第一个版本只有内存。
调度器有一个可插拔的策略插件,这个插件负责分区所有的应用和应用队列;例如CapacitySchedulerFairScheduler都算作这个策略插件的一种实例;
CapacityScheduler支持分层级的应用队列,从而能够更好的预测享有的集群资源的多少;

应用管理器(Applications Manager)

主要负责接收任务提交,协商第一个执行这个应用的容器作为应用Master,提供当应用失败后重启Application Master的服务;

NodeManager

是整个框架在每台机器上的代理,它主要负责资源容器,监控资源使用率并向Resource Manager报告;

ApplicationMaster

负责同调度器协商应用所需的资源容器(多个),跟踪任务执行状态,监控执行进度;

YARN和第一版MapReduce框架的区别

  • 降低了JobTracker的资源消耗,分布式的将监控任务的工作交给集群中执行任务的节点负责,相对单点的JobTractor,更加安全,可靠。
  • 新的YARN中,AppMaster是一个可变部分,用户可以根据自己应用的情况自定义AM,使得更多的模型可以运行在hadoop中;
  • 以资源作为分配任务的依据(目前是内存),比旧版的slot更加合理;
  • YARN中,每个应用负责监控应用进度的是ApplicationMaster,而ResourceManager中的ApplicationsMasters负责监控这些AppMst的运行情况并对失败的AppMst重启;
  • Container是为了将来做内存隔离而提出的一个框架;目前这个框架只支持内存隔离,而度量资源采用内存而非slot,也避免了由于map slot和reduce slot分开而造成的资源闲置的问题;

2014年7月18日星期五

Hadoop分布式文件系统-HDFS

简介

        Hadoop分布式文件系统(HDFS)是一个分布式的文件系统,被设计用于运行在商品级的硬件架构上;它和现有的一些分布式文件系统有很多相似点,但也有非常大的差别;它的容错性非常好,并且可以在低廉的硬件上运行;HDFS可以为应用提供高吞吐量的数据访问,并且非常适合那些大数据量的应用;HDFS实现很少的POSIX需求来达到和文件系统的数据交互;HDFS最初是作为Apache Nutch搜索引擎的基础组件被设计的,它也是Hadoop的核心工程之一。

假定和目标

硬件故障

一个HDFS实例可能包括数百甚至数千台服务器设备,每台负责存储整体的一部分数据。事实上,一个包括大量组件组成的系统,包含出问题的组件会有非常大的可能性,也就是说总是会有一些组件处于无法服务的状态;因此,发现这些错误组件并非常快的恢复是HDFS核心设计需求之一;

流式数据获取

在HDFS上运行的服务需要使用流的方式访问它们的数据集合;它们不是运行在普通文件系统上的普通应用;HDFS被设计更多用于批量流式获取数据而非交互式访问;它强调的是高吞吐量,而不是低延迟数据访问;POSIX有很多强制的要求但是并非是HDFS的目标,为了提高吞吐量,在一些关键地方队POSIX语义做了修改;

大数据集合

在HDFS运行的应用都具有非常大的数据集合,典型的文件大小是以GB或TB来度量的。HDFS用来被支持大文件,它应该提供高带宽的数据写入和单机群数百个节点的扩展;它可以在单个实例支持数千万的文件;

简单的一致性模型

HDFS应用读取文件需要一个一次写入多次读取模型;一个文件经过创建,写入,关闭之后就不会发生变化;这个假设简化了数据一致性的问题并且为高吞吐量提供了保证。Map/Reduce应用或者网络爬虫应用都非常适合这个模型。目前还有计划在将来扩充这个模型,使之支持文件的附加写操作。

“移动计算比移动数据更划算”

一个应用请求的计算,离它操作的数据越近就越高效,在数据达到海量级别的时候更是如此。因为这样就能降低网络阻塞的影响,提高系统数据的吞吐量。将计算移动到数据附近,比之将数据移动到应用所在显然更好。HDFS为应用提供了将它们自己移动到数据附近的接口。

异构软硬件平台间的可移植性

HDFS在设计的时候就考虑到平台的可移植性。这种特性方便了HDFS作为大规模数据应用平台的推广。

Namenode 和 Datanode

HDFS采用master/slave架构。一个HDFS集群是由一个Namenode和一定数目的Datanodes组成。Namenode是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问。集群中的Datanode一般是一个节点一个,负责管理它所在节点上的存储。HDFS暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组Datanode上。Namenode执行文件系统的名字空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体Datanode节点的映射。Datanode负责处理文件系统客户端的读写请求。在Namenode的统一调度下进行数据块的创建、删除和复制。
图1 HDFS架构图
Namenode和Datanode被设计成可以在普通的商用机器上运行。这些机器一般运行着GNU/Linux操作系统(OS)。HDFS采用Java语言开发,因此任何支持Java的机器都可以部署Namenode或Datanode。由于采用了可移植性极强的Java语言,使得HDFS可以部署到多种类型的机器上。一个典型的部署场景是一台机器上只运行一个Namenode实例,而集群中的其它机器分别运行一个Datanode实例。这种架构并不排斥在一台机器上运行多个Datanode,只不过这样的情况比较少见。
集群中单一Namenode的结构大大简化了系统的架构。Namenode是所有HDFS元数据的仲裁者和管理者,这样,用户数据永远不会流过Namenode。

HBase中的MasterServer-RegionServer和Namenode-Datanode

HBase可以在底层使用HDFS存储,那么HBase中的MasterServer与Namenode关系是怎样的,RegionServer与Datanode的关系又是什么样的?

MasterServer与Namenode

MasterServer一般与Namenode运行在同一台服务器上;除此之外,他们之间是毫无关系的。

RegionServer与Datanode

        HBase中,RegionServer是负责数据存储的节点,在集成HDFS情况下,一般RegionServer和Datanode会处在同一个机器节点上;首先我们假设HDFS的文件备份数为3,那么写入数据时,RegionServer会将数据写入它管理的Region中的Memstore中,当触发Flush操作时(触发条件可以参考之前的HBase架构中的Blog),RegionServer会将数据写入到HDFS中;写入过程是向Namenode发送写入数据的请求,Namenode如果发现RegionServer和HDFS集群的某一个Datanode在同一个机器上,首先第一份数据备份会被写入到RegionServer本地的Datanode节点中,其他两份数据备份会被写入到其他的Datanodes中。由于首先写入本地的Datanode节点,数据在读取时,不会出现从远端读取的情况; 
        当一个RegionServer故障时,RegionMaster会将该regionServer负责的region重新分配给其他存活的regionServer。当一个region被分配给一个regionServer时,它首先会强制从远端读取数据,直到第一个major合并发生后,新的regionServer会重新将数据写入到本地节点;注:当一个regionServer故障时,它负责的region会被分配给多个regionServer;

文件系统的名字空间 (namespace)

HDFS支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。当前,HDFS不支持用户磁盘配额和访问权限控制,也不支持硬链接和软链接。但是HDFS架构并不妨碍实现这些特性。
Namenode负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将被Namenode记录下来。应用程序可以设置HDFS保存的文件的副本数目。文件副本的数目称为文件的副本系数,这个信息也是由Namenode保存的。

数据复制

HDFS被设计成能够在一个大集群中跨机器可靠地存储超大文件。它将每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。为了容错,文件的所有数据块都会有副本。每个文件的数据块大小和副本系数都是可配置的。应用程序可以指定某个文件的副本数目。副本系数可以在文件创建的时候指定,也可以在之后改变。HDFS中的文件都是一次性写入的,并且严格要求在任何时候只能有一个写入者。
Namenode全权管理数据块的复制,它周期性地从集群中的每个Datanode接收心跳信号和块状态报告(Blockreport)。接收到心跳信号意味着该Datanode节点工作正常。块状态报告包含了一个该Datanode上所有数据块的列表。
图2 Datanodes

副本存放: 最最开始的一步

副本的存放是HDFS可靠性和性能的关键。优化的副本存放策略是HDFS区分于其他大部分分布式文件系统的重要特性。这种特性需要做大量的调优,并需要经验的积累。HDFS采用一种称为机架感知(rack-aware)的策略来改进数据的可靠性、可用性和网络带宽的利用率。目前实现的副本存放策略只是在这个方向上的第一步。实现这个策略的短期目标是验证它在生产环境下的有效性,观察它的行为,为实现更先进的策略打下测试和研究的基础。
大型HDFS实例一般运行在跨越多个机架的计算机组成的集群上,不同机架上的两台机器之间的通讯需要经过交换机。在大多数情况下,同一个机架内的两台机器间的带宽会比不同机架的两台机器间的带宽大。
通过一个机架感知的过程,Namenode可以确定每个Datanode所属的机架id。一个简单但没有优化的策略就是将副本存放在不同的机架上。这样可以有效防止当整个机架失效时数据的丢失,并且允许读数据的时候充分利用多个机架的带宽。这种策略设置可以将副本均匀分布在集群中,有利于当组件失效情况下的负载均衡。但是,因为这种策略的一个写操作需要传输数据块到多个机架,这增加了写的代价。
在大多数情况下,副本系数是3,HDFS的存放策略是将一个副本存放在本地机架的节点上,一个副本放在同一机架的另一个节点上,最后一个副本放在不同机架的节点上。这种策略减少了机架间的数据传输,这就提高了写操作的效率。机架的错误远远比节点的错误少,所以这个策略不会影响到数据的可靠性和可用性。于此同时,因为数据块只放在两个(不是三个)不同的机架上,所以此策略减少了读取数据时需要的网络传输总带宽。在这种策略下,副本并不是均匀分布在不同的机架上。三分之一的副本在一个节点上,三分之二的副本在一个机架上,其他副本均匀分布在剩下的机架中,这一策略在不损害数据可靠性和读取性能的情况下改进了写的性能。

副本选择

为了降低整体的带宽消耗和读取延时,HDFS会尽量让读取程序读取离它最近的副本。如果在读取程序的同一个机架上有一个副本,那么就读取该副本。如果一个HDFS集群跨越多个数据中心,那么客户端也将首先读本地数据中心的副本。

安全模式

Namenode启动后会进入一个称为安全模式的特殊状态。处于安全模式的Namenode是不会进行数据块的复制的。Namenode从所有的 Datanode接收心跳信号和块状态报告。块状态报告包括了某个Datanode所有的数据块列表。每个数据块都有一个指定的最小副本数。当Namenode检测确认某个数据块的副本数目达到这个最小值,那么该数据块就会被认为是副本安全(safely replicated)的;在一定百分比(这个参数可配置)的数据块被Namenode检测确认是安全之后(加上一个额外的30秒等待时间),Namenode将退出安全模式状态。接下来它会确定还有哪些数据块的副本没有达到指定数目,并将这些数据块复制到其他Datanode上。

文件系统元数据的持久化

Namenode上保存着HDFS的名字空间。对于任何对文件系统元数据产生修改的操作,Namenode都会使用一种称为EditLog的事务日志记录下来。例如,在HDFS中创建一个文件,Namenode就会在Editlog中插入一条记录来表示;同样地,修改文件的副本系数也将往Editlog插入一条记录。Namenode在本地操作系统的文件系统中存储这个Editlog。整个文件系统的名字空间,包括数据块到文件的映射、文件的属性等,都存储在一个称为FsImage的文件中,这个文件也是放在Namenode所在的本地文件系统上。
Namenode在内存中保存着整个文件系统的名字空间和文件数据块映射(Blockmap)的映像。这个关键的元数据结构设计得很紧凑,因而一个有4G内存的Namenode足够支撑大量的文件和目录。当Namenode启动时,它从硬盘中读取Editlog和FsImage,将所有Editlog中的事务作用在内存中的FsImage上,并将这个新版本的FsImage从内存中保存到本地磁盘上,然后删除旧的Editlog,因为这个旧的Editlog的事务都已经作用在FsImage上了。这个过程称为一个检查点(checkpoint)。在当前实现中,检查点只发生在Namenode启动时,在不久的将来将实现支持周期性的检查点。
Datanode将HDFS数据以文件的形式存储在本地的文件系统中,它并不知道有关HDFS文件的信息。它把每个HDFS数据块存储在本地文件系统的一个单独的文件中。Datanode并不在同一个目录创建所有的文件,实际上,它用试探的方法来确定每个目录的最佳文件数目,并且在适当的时候创建子目录。在同一个目录中创建所有的本地文件并不是最优的选择,这是因为本地文件系统可能无法高效地在单个目录中支持大量的文件。当一个Datanode启动时,它会扫描本地文件系统,产生一个这些本地文件对应的所有HDFS数据块的列表,然后作为报告发送到Namenode,这个报告就是块状态报告。

通讯协议

所有的HDFS通讯协议都是建立在TCP/IP协议之上。客户端通过一个可配置的TCP端口连接到Namenode,通过ClientProtocol协议与Namenode交互。而Datanode使用DatanodeProtocol协议与Namenode交互。一个远程过程调用(RPC)模型被抽象出来封装ClientProtocol和Datanodeprotocol协议。在设计上,Namenode不会主动发起RPC,而是响应来自客户端或 Datanode 的RPC请求。

健壮性

HDFS的主要目标就是即使在出错的情况下也要保证数据存储的可靠性。常见的三种出错情况是:Namenode出错, Datanode出错和网络割裂(network partitions)。

磁盘数据错误,心跳检测和重新复制

每个Datanode节点周期性地向Namenode发送心跳信号。网络割裂可能导致一部分Datanode跟Namenode失去联系。Namenode通过心跳信号的缺失来检测这一情况,并将这些近期不再发送心跳信号Datanode标记为宕机,不会再将新的IO请求发给它们。任何存储在宕机Datanode上的数据将不再有效。Datanode的宕机可能会引起一些数据块的副本系数低于指定值,Namenode不断地检测这些需要复制的数据块,一旦发现就启动复制操作。在下列情况下,可能需要重新复制:某个Datanode节点失效,某个副本遭到损坏,Datanode上的硬盘错误,或者文件的副本系数增大。

集群均衡

HDFS的架构支持数据均衡策略。如果某个Datanode节点上的空闲空间低于特定的临界点,按照均衡策略系统就会自动地将数据从这个Datanode移动到其他空闲的Datanode。当对某个文件的请求突然增加,那么也可能启动一个计划创建该文件新的副本,并且同时重新平衡集群中的其他数据。这些均衡策略目前还没有实现。

数据完整性

从某个Datanode获取的数据块有可能是损坏的,损坏可能是由Datanode的存储设备错误、网络错误或者软件bug造成的。HDFS客户端软件实现了对HDFS文件内容的校验和(checksum)检查。当客户端创建一个新的HDFS文件,会计算这个文件每个数据块的校验和,并将校验和作为一个单独的隐藏文件保存在同一个HDFS名字空间下。当客户端获取文件内容后,它会检验从Datanode获取的数据跟相应的校验和文件中的校验和是否匹配,如果不匹配,客户端可以选择从其他Datanode获取该数据块的副本。

元数据磁盘错误

FsImage和Editlog是HDFS的核心数据结构。如果这些文件损坏了,整个HDFS实例都将失效。因而,Namenode可以配置成支持维护多个FsImage和Editlog的副本。任何对FsImage或者Editlog的修改,都将同步到它们的副本上。这种多副本的同步操作可能会降低Namenode每秒处理的名字空间事务数量。然而这个代价是可以接受的,因为即使HDFS的应用是数据密集的,它们也非元数据密集的。当Namenode重启的时候,它会选取最近的完整的FsImage和Editlog来使用。
Namenode是HDFS集群中的单点故障(single point of failure)所在。如果Namenode机器故障,是需要手工干预的。目前,自动重启或在另一台机器上做Namenode故障转移的功能还没实现。

快照

快照支持某一特定时刻的数据的复制备份。利用快照,可以让HDFS在数据损坏时恢复到过去一个已知正确的时间点。HDFS目前还不支持快照功能,但计划在将来的版本进行支持。

数据组织

数据块

HDFS被设计成支持大文件,适用HDFS的是那些需要处理大规模的数据集的应用。这些应用都是只写入数据一次,但却读取一次或多次,并且读取速度应能满足流式读取的需要。HDFS支持文件的“一次写入多次读取”语义。一个典型的数据块大小是64MB。因而,HDFS中的文件总是按照64M被切分成不同的块,每个块尽可能地存储于不同的Datanode中。

Staging

客户端创建文件的请求其实并没有立即发送给Namenode,事实上,在刚开始阶段HDFS客户端会先将文件数据缓存到本地的一个临时文件。应用程序的写操作被透明地重定向到这个临时文件。当这个临时文件累积的数据量超过一个数据块的大小,客户端才会联系Namenode。Namenode将文件名插入文件系统的层次结构中,并且分配一个数据块给它。然后返回Datanode的标识符和目标数据块给客户端。接着客户端将这块数据从本地临时文件上传到指定的Datanode上。当文件关闭时,在临时文件中剩余的没有上传的数据也会传输到指定的Datanode上。然后客户端告诉Namenode文件已经关闭。此时Namenode才将文件创建操作提交到日志里进行存储。如果Namenode在文件关闭前宕机了,则该文件将丢失。
上述方法是对在HDFS上运行的目标应用进行认真考虑后得到的结果。这些应用需要进行文件的流式写入。如果不采用客户端缓存,由于网络速度和网络堵塞会对吞估量造成比较大的影响。这种方法并不是没有先例的,早期的文件系统,比如AFS,就用客户端缓存来提高性能。为了达到更高的数据上传效率,已经放松了POSIX标准的要求。

流水线复制

当客户端向HDFS文件写入数据的时候,一开始是写到本地临时文件中。假设该文件的副本系数设置为3,当本地临时文件累积到一个数据块的大小时,客户端会从Namenode获取一个Datanode列表用于存放副本。然后客户端开始向第一个Datanode传输数据,第一个Datanode一小部分一小部分(4 KB)地接收数据,将每一部分写入本地仓库,并同时传输该部分到列表中第二个Datanode节点。第二个Datanode也是这样,一小部分一小部分地接收数据,写入本地仓库,并同时传给第三个Datanode。最后,第三个Datanode接收数据并存储在本地。因此,Datanode能流水线式地从前一个节点接收数据,并在同时转发给下一个节点,数据以流水线的方式从前一个Datanode复制到下一个。

存储空间回收

文件的删除和恢复

当用户或应用程序删除某个文件时,这个文件并没有立刻从HDFS中删除。实际上,HDFS会将这个文件重命名转移到/trash目录。只要文件还在/trash目录中,该文件就可以被迅速地恢复。文件在/trash中保存的时间是可配置的,当超过这个时间时,Namenode就会将该文件从名字空间中删除。删除文件会使得该文件相关的数据块被释放。注意,从用户删除文件到HDFS空闲空间的增加之间会有一定时间的延迟。
只要被删除的文件还在/trash目录中,用户就可以恢复这个文件。如果用户想恢复被删除的文件,他/她可以浏览/trash目录找回该文件。/trash目录仅仅保存被删除文件的最后副本。/trash目录与其他的目录没有什么区别,除了一点:在该目录上HDFS会应用一个特殊策略来自动删除文件。目前的默认策略是删除/trash中保留时间超过6小时的文件。将来,这个策略可以通过一个被良好定义的接口配置。

减少副本系数

当一个文件的副本系数被减小后,Namenode会选择过剩的副本删除。下次心跳检测时会将该信息传递给Datanode。Datanode遂即移除相应的数据块,集群中的空闲空间加大。同样,在调用setReplication API结束和集群中空闲空间增加间会有一定的延迟。

2014年7月17日星期四

HBase架构(四) 表结构设计

创建

        HBase的表结构创建和更新可以通过HBase shell或者在JAVA API中使用HBaseAdmin
        当修改ColumnFamily时,Table要被置为无效,例如:
Configuration config = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
String table = "myTable";

admin.disableTable(table);

HColumnDescriptor cf1 = ...;
admin.addColumn(table, cf1);      // adding new ColumnFamily
HColumnDescriptor cf2 = ...;
admin.modifyColumn(table, cf2);    // modifying existing ColumnFamily

admin.enableTable(table);

更新结构

        当更新表或者列族的结构之后,更新会在下一次大型(Major)合并之后或StoreFile被重写之后生效。

列族的数量

        由于HBase目前对任何大于2-3个列族的表处理的不是非常好,所以最好保持列族的数量比较小。目前Flushing和合并操作都是在一个Region范围内的,因此如果一个列族的数据写入导致一次Flush操作,那么相邻的其他列族也会进行Flash,即使它们携带的数据非常小。当存在过多的列族的时候,会导致一系列无用IO负载。
        试着尽量在设计时只使用一个列族。只有在查询数据仅仅是在单一的列族上时,才引入第二个、第三个列族。你的查询一次只查询一个列族,而不是两个或更多。

列族的基数

        当多个列族在同一个表中存在时,一定要注意它们的基数(行数);如果一个表中某一个列族有一百万行记录,而另外一个列族包含十亿行记录,那么前一个列族的的数据会被分布到非常多的region和regionServer中。这会导致前一个列族的大量scan操作是非常低效的。

RowKey设计

自增/时间戳 作为rowkey

HBase中的rowkey在存储时的排序是按照字典(byte order)序排序的,如果一个rowkey被设计成以自增序列(1,2,3)或时间戳,那么当大量写入时,所有的client端都会向同一个region写入数据(当然也是在同一个node上),随后会集中向下一个region写入数据,如此往复;
如果真的需要时间戳数据,那么我们可以通过另外一种rowkey设计来达到目的,例如[metric_type][time_stamp],这样设计的好处是保留的时间数据,但是时间并不作为起始的rowkey值,通过metric_type对时间进行散列,使得数据被散列到不同节点的region上。又保持了数据局部性,相同的type的相邻时间的数据在同一处存储,读取时能更加有效。

尽量降低row和clomun的大小

在HBase中,数据值(values)在系统中传输时,会携带着他的行row, 列column和时间戳信息。如果你的rows或者columns的值过大,尤其是相对它们的值来说,HBase的StoreFile中存储的索引文件会占用很大的空间,因为每个cell的值的坐标(row+column)非常大。

列族

尽量保持列族名称非常短,最好只有一个字符;

Attributes

尽量短,虽然myVeryImportantAttribute非常容易读出意思,但是存储到HBase最好非常短,例如:via.

RowKey长度

在保持rowkey的意义基础上做到越短越好,因为get, scan还需要通过rowkey查找数据;无意义的短的key并不比带有良好的get/scan意义的稍长的一些的key。长度和意义之间有一种平衡和取舍。

字节类型

long型在计算机存储中占8个字节,你可以存储一个无符号的长整数到18,446,744,073,709,551,615。如果你以string类型存储这个数字,假设一个字节一个字符,你需要3倍的字节数;

反序时间戳

  • 反序scanAPI

HBASE-4811实现类scan全表或部分数据反序的API,避免了需要为正序或反序优化你的表设计,这个特性在0.98和之后的版本中提供;
数据库中一个常见的问题是如何能快速查找最近版本的数据;一个使用反序时间戳作为key的一部分的技术可以非常有效的解决这个问题;在key之后附件一个反序时间戳(Long.MAX_VALUE-timestamp)
最近的插入表中的数据可以通过一个scan操作获取第一个记录来实现;因为HBase的keys是排序的,这个key比任何之前插入的key都靠前,因此排在第一个。

Rowkeys和列族

rowkey的作用域是列族,相同的rowkey可能存在于每一个列族中。

RowKey的不变性

rowkey是不变的,只有当删除或重新插入的时候才会改变。这在HBase是一个常识,因此HBase才能在查询的时候准确的命中目标;

Rowkey和Region分裂的关系

在分裂表之前,一个非常关键的问题是,你要了解你的表中的rowkey在分裂边界是如何分布的。下面是一个例子演示这个关键问题,假设我们有一张表,表的rowkey是以16进制数开始的,比如("0000000000000000" 到 "ffffffffffffffff"),我们通过Bytes.split (是创建分区时HBaseAdmin.createTable(byte[] startKey, byte[] endKey, numRegions)的分裂策略),会生成下面的分裂结果:
48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48                                // 0
54 -10 -10 -10 -10 -10 -10 -10 -10 -10 -10 -10 -10 -10 -10 -10                 // 6
61 -67 -67 -67 -67 -67 -67 -67 -67 -67 -67 -67 -67 -67 -67 -68                 // =
68 -124 -124 -124 -124 -124 -124 -124 -124 -124 -124 -124 -124 -124 -124 -126  // D
75 75 75 75 75 75 75 75 75 75 75 75 75 75 75 72                                // K
82 18 18 18 18 18 18 18 18 18 18 18 18 18 18 14                                // R
88 -40 -40 -40 -40 -40 -40 -40 -40 -40 -40 -40 -40 -40 -40 -44                 // X
95 -97 -97 -97 -97 -97 -97 -97 -97 -97 -97 -97 -97 -97 -97 -102                // _
102 102 102 102 102 102 102 102 102 102 102 102 102 102 102 102                // f
后面的备注是首字符,第一个是0,最后一个是f,看起来一切OK,但是实际上,rowkey会分布在前两个分区以及最后一个分区中,由于a-f有六个组合,有可能最后一个分区会成为热区;
问题的原因是16进制数字只有[a-f][0-9],9-a之间是大量的空白,因此会导致9-a之间的region不会命中任何数据;
经验1:预演表分裂是最佳实践,预演时要注意,你需要将key分配到所有的key空间中;
经验2:虽然不建议使用16进制字符作为key的开头,但是仍然有办法可以使得每个region都能够分派到key空间;
下面是一个表分裂的例子:
public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)
throws IOException {
  try {
    admin.createTable( table, splits );
    return true;
  } catch (TableExistsException e) {
    logger.info("table " + table.getNameAsString() + " already exists");
    // the table already exists...
    return false;
  }
}

public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
  byte[][] splits = new byte[numRegions-1][];
  BigInteger lowestKey = new BigInteger(startKey, 16);
  BigInteger highestKey = new BigInteger(endKey, 16);
  BigInteger range = highestKey.subtract(lowestKey);
  BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
  lowestKey = lowestKey.add(regionIncrement);
  for(int i=0; i < numRegions-1;i++) {
    BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
    byte[] b = String.format("%016x", key).getBytes();
    splits[i] = b;
  }
  return splits;
}

版本(Version)数量

最大数量

行的每个列族最大的版本存储个数可以通过HColumnDescriptor来配置;默认值是1. 这是一个非常关键的配置,因为在HBase的数据结构中,HBase不会覆盖当前的row的值,而是根据时间和列名写入多个值,超出的数据会在major合并中删除。这个值应该根据应用的需求增减;
设置一个非常大的存储版本数量是不被推荐的,因为这会带来storeFile尺寸上的增加,除非有非常的需要。

最小数量

最小数量也可以通过HColumnDescriptor来设置;默认值为0,表示这个参数不生效。最小数量参数是与存活时间参数一起使用的,例如保存最近T分钟的数据,最多M个,最少N个。(N<M). 这个参数应该在存活时间被激活的那些列族使用,并且确定这个值小于最大版本数量;

支持的数据类型

HBase支持通过put操作和Result进行字节流输入和输出;所以任何可以被转换为byte数组的数据都可以被存储;所以输入数据可以是string,long,复杂对象,甚至是图片;

计数器

另外一个被支持的数据类型是计数器(一个自增的数字),更多查看HTable的Increment ;

二级索引

这个问题更像:rowkey被设计成a模式,但却要用b模式搜索;例如,rowkey设计为[user][timestamp],如果以user查询则非常容易,因为rowkey是以user开头的,但是如果按照timestamp搜索,则不是那么容易了,因为相同的timestamp可能已经横跨很多region了。
这个问题并没有非常好的答案,这个问题依赖于:
  • user的数量
  • 数据大小和数据写入频率
  • 查询条件的灵活性(例如随机的时间跨度vs预定义好的时间段)
  • 期望的查询速度
并且最后的解决方案也受集群大小和你投入给这个问题的计算能力的影响;你可以通过RDBMS来解决这个问题,或者新建一张HBase表存储时间戳为开头的数据,二级索引这个特性在HBase目前是不提供的。

2014年7月15日星期二

HBase架构(三) Region-分区

        分区是HBase中的Table的最基础的组成部分;每个列族会被存储到一个存储中;它的主要结构如下:
Table       (HBase table)
    Region       (Regions for the table)
         Store          (Store per ColumnFamily for each Region for the table)
              MemStore           (MemStore for each Store for each Region for the table)
              StoreFile          (StoreFiles for each Store for each Region for the table)
                    Block             (Blocks within a StoreFile within a Store for each Region for the table)
 

多分区的考虑

       实际上,HBase在设计上被设计了以较小的分区数(20-200),较大尺寸(5-20Gb)运行在每个RegionServer上;如此设计有以下原因:

  • MSLAB(A memstore-local allocation buffer)需要为每个memstore事先分配2M内存空间,如果有1000个分区存储两个列族的数据,那么还未存储任何数据时,已经在JVM中分配了3.9GB的空间;
    • 注:MSLAB,是为了避免产生过多的内存碎片,首先在memstore中分配一块内存buffer,写入对象就写入到这个连续的buffer中,当回收时一起回收,避免由于回收数据导致产生过多的内存碎片;
  • 当以一个固定速率写入数据时,过多的region会引起总体内存占比过高,从而导致过于频繁的磁盘回写操作;例如有1000个region,一个列族,假设我们设置了regionServer的整体memstore使用上限为5GB,那么当达到了5GB时会强制将内存的数据回写到磁盘,而5GB对于每个memstore来说,每个只有5MB的数据;
  • 由于master要对大量的region进行处理,要花费非常多的时间在分配、批量移动这些region,这会导致ZK的工作非常繁重,并且并不会那么实时了。
  • 旧版本的HBase中,少量的regionServer上的大量region会导致存储文件的索引的增长,会加速heap的使用量,有可能会导致内存压力货OOM Exceptions。
  • 另外一个是对Map/Reduce的任务数的影响;典型的情况是每个HBase的分区对应一个mapper,所以,如果一个RS上只有少数的region,比如5个,那么对于mapreduce任务来说可能不会有足够的任务来执行,当然,1000个region又显得太多了。

Region-RegionServer分配

       这一部分讨论分区是如何被分配到RegionServer上的。
  • 启动:当HBase启动时,分区是按照下面的规则分配的
    • Master会在启动时,会启动AssignmentManager
    • AssignmentManager会在META中查询已存在的region分配数据
    • 如果分区分配仍然有效,那么分配数据会被保留;
    • 如果分区数据无效,那么LoadBalancerFactory会被启动,他会分配region。默认的分配规则:DefaultLoadBalancer 将随机的分配region给regionServer;
    • META会更新RegionServer的分配数据,RegionServer也会开始启动对分配过来的region的服务;
  • 故障机制:如果一个RegionServer发生故障:
    • 分配到该RegionServer上的region立刻成为不可用状态;
    • Master会发现RegionServer发生故障;
    • region分配会被认为失效,将被按照启动时的顺序重新分配;
    • 正在进行的查询将被重试,不会被丢失;
    • 操作将在下述时间内被切换到新的RegionServer上:
      • zookeeper session过期时间+region分割时间+分配/重试 时间

Region-RegionServer局部性

        局部性(Locality)原理:处理器访问存储器时,无论是读取指令还是存取数据,所访问的存储单元在一段时间内都趋向于一个较小的连续区域中。
  • 时间局部性:刚刚被访问过的数据可能很短时间内还会被访问
  • 空间局部性:最近的将来被使用到的数据可能和现在使用的数据在空间地址上很近;
Region-RegionServer局部性问题是通过HDFS的block复制解决的。HDFS客户端在选择写入地址时,默认执行了下述步骤:
  • 第一份复制文件被写到了本地节点;
  • 第二份被写入到不同机架上的一个随机节点;
  • 第三份被写入到统一机架上的剩余节点中的一个随机节点;
  • 随后的复制文件被写入到集群的随机节点中;
HBase在flush或文件合并后解决了数据局部性的问题;在一个RegionServer的故障情况下,另外一个RegionServer被分配的region中可能会包括非本地存储文件(因为数据复制不在本地),可是随着新数据写入到region中,或者表被压缩,storeFile会被重写,会变成本地文件。

Region分裂

       Region在达到一个配置阈值的时候会进行分裂;
       分裂是完全在RegionServer上完成的,Master不会参与。RegionServer分裂一个分区,下线分裂的分区,然后将子分区数据写入META中,在父亲所在的RegionServer打开子分区,并报告这次分裂给Master。

  • 分裂策略
    • 默认分裂策略可以被一个自定义的RegionSplitPolicy实现(HBase0.94+)自定义的策略必须扩展HBase默认的策略:ConstantSizeRegionSplitPolicy
    • 策略可以在HBase的总配置文件中指定,或者在使用每个Table时指定

HTableDescriptor myHtd = ...;
myHtd.setValue(HTableDescriptor.SPLIT_POLICY, MyCustomSplitPolicy.class.getName());


    • 默认分裂策略(0.94.0+)IncreasingToUpperBoundRegionSplitPolicy
      • 同一个表下的region的数量的^3*MemstoreSize*2
        • Memstore的Flush size = 128M,第一次分裂就是256M,分裂为2个region
        • 第二次分裂是2^3*128*2=2G

Region在线合并

        Master和RegionServer都参与了Region的合并;客户端发送合并的RPC命令到Master,然后master根据这些region所在的RegionServer加载的情况,移动需要合并的region到加载region最多的RegionServer中。最后master发送合并请求到这个regionserver,regionserver运行分区合并程序;和分裂很像,分区合并是在regionserver内部完成的,下线这些分区,然后在文件系统合并两个分区,自动从META删除合并中的分区信息,并增加合并分区信息到META中。打开合并后的分区,并报告合并到master。
        下面是一个merge的shell例子:

$ hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME'
          hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME', true
        这是一个异步的过程,调用会立刻返回无需等待;

Store

store包括一个Memstore和0或者多个StoreFile。一个Store对应表的某个分区下的一个列族。

Memstore

        Memstore保存对这个store的数据的修改记录。修改记录是cell的KeyValue值。当一个flush请求被发起时,当前的memstore会被移到快照版本中并且被清除;HBase会使用新的memstore来处理新的请求,并且保存备份快照版本直到flush操作返回成功。备注:当Flash操作 开始时,属于同一个region的所有memstore都会进行flash操作。

Memstore Flush

满足下列的任一条件都会触发memstore的flush操作,需要注意的是,flush的最小单元是region而不是一个memstore。
  • 当一个memstore的大小达到了配置中 hbase.hregion.memstore.flush.size大小,所有同一个region下的memstore都会被清除写入磁盘;
  • 当一个region下所有的memstore的大小达到了hbase.regionserver.global.memstore.upperLimit 的大小, 一个regionServer上的多个region上的memstore会被清除写入磁盘以降低regionServer上整体memstore的大小。清除的顺序是按照region拥有的memstore的占有率降序进行的。Region上的Memstore会被清除直到regionServer上所有的Memstore大小低于hbase.regionserver.global.memstore.upperLimit这个值为止;
  • 当每个regionServer的HLog的数量大于hbase.regionserver.max.logs这个值时,regionServer上的很多Memstore会进行Flash来降低HLog的数量;Flash的顺序是按时间排序,最久存在的Memstore会首先被Flash,直到HLog的数量低于配置的hbase.regionserver.max.logs值。

scans

  • 当一个客户端发起一个scan请求,HBase会生成多个RegionScanner对象,每个region一个,来完成这个请求。
  • RegionScanner包含多个StoreScanner,每个列族一个。
  • 每个StoreScanner又包含多个StoreFileScanner对象,对应列族上的一个storeFile和HFile,另外还包含多个MemstoreScanner对象,对应于Memstore。
  • 这两个scanner扫描出来的对象列表会被合并成一个,由于是按照时间倒序排列的,因此从Memstore扫描出来的对象一直处于合并列表的最后。
  • 当一个StoreFileScanner被构建时,他被添加了一个值为当前Memstore时间戳memstoreTS的MultiVersionConsistencyControl读取检查点,过滤那些超过这个时间的数据记录。

StoreFile(HFile)

你的数据存储在StoreFile中。通过 hbase.hregion.max.filesize可以设置StoreFile的大小,当一个列族的其中一个StoreFile超过这个值,该StoreFile所在的region就要被分裂为两个。

HFile格式

参考之前的博客《[翻译]Apache HFile》

HDFS中的HFile

HBase的Table在HDFS中的目录结构:
/hbase
     /<Table>             (Tables in the cluster)
          /<Region>           (Regions for the table)
               /<ColumnFamily>      (ColumnFamilies for the Region for the table)
                    /<StoreFile>        (StoreFiles for the ColumnFamily for the Regions for the table)
            
HBase WAL
/hbase
     /.logs
          /<RegionServer>    (RegionServers)
               /<HLog>           (WAL HLog files for the RegionServer)
            

Blocks

StoreFile是有很多的block组成的。block的大小在每个列族的基础配置中。

KeyValue

KeyValue类是HBase的核心存储类,KeyValue封装了一个字节数组,并且将地址偏移和长度也封装到了数组中,以便能够判断那段内容是属于KeyValue。
KeyValue格式如下:
  • keylength
  • valuelength
  • key
  • value
其中,key又可以被进一步解析为:
  • rowlength
  • row
  • columnfamily length
  • columnfamiliy
  • columnqualifier
  • timestamp
  • keytype
KeyValue实例不会跨block。例如如果一个8MB的KeyValue,即便blocksize被设置成64KB,KeyValue仍然会被当做一个block进行读取。
下面是一个例子:
  • Put#1: rowkey=row1, cf:attr1=value1
  • Put#2: rowkey=row1, cf:attr2=value2
即使是对同一行进行put操作,每个操作都会创建一个KeyValue实例,
  • Put#1:
    • rowlenght------------->4
    • row----------------------->row1
    • columnfamilylenth-->2
    • columnfamily-------->cf
    • columnqualifier------>attr1
    • timestamp-------------->server time of put
    • keytype----------------->put
  • Put#2
    • rowlenght------------->4
    • row----------------------->row1
    • columnfamilylenth-->2
    • columnfamily-------->cf
    • columnqualifier------>attr2
    • timestamp-------------->server time of put
    • keytype----------------->put
最值得注意的是,key的大小会受row,columnfamily和columnqualifier的长度影响,这三个值越大,key的长度越大;

压缩Compaction

        压缩是为了降低StoreFile的数量来将他们合并,从而使得读操作的效率提高;压缩可能对平台来说是一个资源密集型的操作,众多因素决定它可能会帮助改善或降低性能;
       压缩有两种类型:minor小型压缩和major大型压缩
       minor:通常是操作小数量连续的小尺寸storeFile,重写他们到一个新的storeFile中。小型压缩不会删除delete记录和无效的cell内容。如果minor压缩操作了一个存储中所有的storeFile,那么就会升级成一次大型压缩;如果有大量的小文件需要被压缩,算法会趋向使用minor压缩来清理这些小型文件;
       major:目的是将一个存储中的的所有数据合并为一个storeFile。它会处理那些删除标记和多版本数据。
        压缩和删除:HBase执行删除时,数据并未被物理删除,而是一个墓碑标记会标记这个记录,墓碑标记阻止了数据在获取时被返回。当大型压缩时,数据会被实际删除,墓碑标记也会从storeFile中移除。如果删除是由于过期发生的,那么不会有墓碑标记,过期数据会被过滤而且不会被回写到压缩后的storeFile中。
        压缩和版本:当创建一个列族时,可以指定一个数据版本最大上限,通过配置HColumnDescriptor.setMaxVersions(int versions) 来实现。默认值是3.如果多余这个配置值的数据存在,超出的那些版本对应的数据会被清除。

2014年7月14日星期一

HBase架构分析(二) 目录表-Master-RegionServer

介绍

        HBase是一种NoSQL数据库。NoSQL所表示的意思是区分于传统的RDBMS-关系型数据库,它不支持关系型数据库的SQL语言。现在,有很多的NoSQL类型数据库,例如BerkeleyDB是一个本地的NoSQL数据库,而HBase则是一个分布式的NoSQL数据库。谈到数据库,其实HBase更像是一个数据存储,因为它不支持数据库的多种特点,比如SQL,比如二级索引、触发器、高级搜索语言等;
        但是HBase也有自己的很多特性,它可以进行线性和模块化扩展;HBase集群的扩展是通过扩展RegionServer来实现的,RegionServer则是运行在一组服务器上。如果一个HBase集群从10个扩展到20个,那么集群的存储空间和计算能力都会翻番;关系型数据库也能很好的扩展,但是仅仅是在 一个点上的纵向扩展,通过扩展硬件的计算、存储和内存来达到服务的升级;HBase有以下特性:

  • 强一致性读写:HBase不是最终一致性的数据库,所以一些计数器在HBase中实现非常合适;
  • 自动分片策略:HBase的表通过分区(regions)分布在集群中,分区(regions)则会自动的随着数据增长重新分割并重新分布;
  • 自动分区服务容错;
  • Hadoop/HDFS集成:HBase底层支持直接使用HDFS进行存储,因为HDFS是一个分布式存储系统;
  • MapReduce:HBase支持使用HBase作为并行的MapReduce作业的数据源;
  • Java Client API:HBase支持非常简单的Java API 接口;
  • Thrift/REST API:HBase也支持Thrift或Rest API;
  • Block Cache and Bloom Filters: HBase支持分块缓存和bloom filters来实现快速检索;
  • 运行管理:HBase提供和JMX一样的内置web页面来观察服务的情况;

什么时候使用HBase

  • 当你有数亿或者数十亿的记录需要存储时,可以选择使用HBase;当你只有几千到一百万行数据时,传统的关系型数据库可能更合适;
  • 确保你可以放弃关系型数据库提供的特性;
  • 确保你有足够的硬件,即使对于HDFS来说,少于5个节点的集群的效率也不会很好;

HDFS和HBase的区别

        HDFS是一个被设计用来存储大文件的文件系统,它无法提供单个独立记录的快速查找;HBase是建立在HDFS基础上的,提供数据在巨大的数据表中的快速查找、更新;
HBase内部将数据存储在索引后的数据文件中,这些数据文件被保存到了HDFS中。

Catalog Tables

        HBase的目录表:hbase:meta会被HBase的shell命令:list过滤,list命令不会显示出它,但是它确实是实际存在的一个独立的表;

-ROOT-

        -ROOT-表从0.96开始已经被移除了。
        -ROOT-表用来追踪.META表的的位置(.META是hbase:meta的之前的旧名字)-到0.96版本
        -ROOT-表的结构如下:
  • KEY
    • .META. region key (.META.,,1)
  • VALUE
    • info:regininfo(序列化的HRegionInfo,hbase:meta实例)
    • info:server(拥有hbase:meta的服务器server:port)
    • info:serverstartcode(拥有hbase:meta的服务器进程的启动时间)

hbase:meta

        hbase:meta保存了一个系统分区的列表,这个列表之前是通过-ROOT-来追踪定位的,现在它保存到了Zookeeper上;它的结构如下:
  • KEY
    • 分区主键([table],[region start key],[region id]
  • VALUES
    • info:regininfo(region的序列化的HRegionInfo实例)
    • info:server(regionServer的server:port)
    • info:serverstartcode(regionServer进程的启动时间)
        如果一个table正在进行分割,那么两个额外的列会被创建:info:splitA, info:splitB, 这两个列代表了两个孩子分区,他们的值仍然是序列化之后的HRegionInfo实例,当分割结束时,当前这一行也会同时被删除;
HRegionInfo说明:
        HRegionInfo中的空白key用来标识table的开始和结束;如果一个region start key为空,则标识它是一个表的开始第一个分区;如果start key和end key都为空,则该分区是该表的唯一分区;

启动排序

        首先,hbase:meta信息是从zookeeper上进行查找的,另外,hbase:meta会根据服务器地址和启动时间进行更新;

Client和ClientFilters以后的blog再总结

Master(主节点)

        HMaster是MasterServer的实现,MasterServer负责监控集群中所有RegionServer节点,也是所有元数据变化的接口;在一个分布式集群中,MasterServer实际上是运行在一个NameNode上。

启动行为

        如果运行在一个多主服务的集群上,所有的主服务都会启动完毕准备操作整个集群。当当前活动的主节点丢失时(可能是丢失到zookeeper的连接或重启),另外一个闲置的主节点就会马上接管整个集群;

运行时的影响

        一个经常被提及的问题是,当单master节点的集群中,master down机会产生什么影响?因为所有的regionServer节点都与master节点直接通信,结果是down机后整个集群会处在一个相对稳定的状态;因为保存hbase:meta表数据不存在master节点上,而是在zookeeper上,但是Master控制了所有regionServer的故障转移以及数据分割,因此Master应该尽快的被启动起来;

接口

        HMasterInterface暴露的接口主要是面向元数据的:
  • 表操作 (createTable, modifyTable, removeTable, enable, disable)
  • 列族 (addColumn, modifyColumn, removeColumn)
  • 分区Region (move, assign, unassign)
        例如,HbaseAdmin调用disableTable是通过Master Server来完成的;

进程

        Master运行了几个后台线程;
  • LoadBalancer(负载均衡)
    • 定时或当所有region都没有请求时,LoadBalancer会启动,移动Region来实现集群的负载均衡;
  • CatalogJanitor(目录看管)
    • 定时检查并清理hbase:meta表中的数据;

RegionServer

        HRegionServer是RegionServer的实现;负责服务和维护分区(regions);RegionServer运行在DataNode上。

接口

        HRegionRegionInterface暴露出的接口是面向分区维护和数据维护的:
  • 数据(get, put, delete, next, 等等.)
  • 分区 (splitRegion, compactRegion, 等.)

进程

        RegionServer运行了不同种类的后台线程:
  • CompactSplitThread
    • 检查数据分割,负责小型数据合并(minor compaction)
  • MajorCompactionChecker
    • 大型数据合并的检查
  • MemStoreFlusher
    • 定时将内存中的数据写到文件存储中;
  • LogRoller
    • 定时检查RegionServer的HLog

Coprocessors

        在0.92版本被添加进来;更多信息可以参考Blog Overview of CoProcessors 

Block Cache

        HBase提供了三种不同的块缓存,分别是使用堆实现的LruBlockCache,以及非堆实现的BucketCache, SlabCache;
  • 缓存的选择
    • LruBlockCache是块缓存的原始实现,它完全在java堆中。BucketCache和SlabCache被设计用来在堆外存储数据的,当然也可以保存到堆或者文件中;注意,SlabCache现在不被推荐,并且要在1.0.0版本移除;
    • BucketCache是更多产品化部署的首选并且有更多的配置项;相对于堆内实现的LruBlockCache,BucketCache可能会在读取数据时稍慢,但是相比较堆内的GC次数更少,BucketCache因此会更稳定;
    • 一些证据表明BucketCache比SlabCache有更少的GC次数,因此BucketCache会更稳定;之所以说SlabCache有更多的垃圾回收次数,原因是在SlabCache中,数据会从L1移动到L2,也就是激活了SlabCache配置,等于是激活了一个两层缓存系统;它的实现类是:DoubleBlockCache,当一个数据从L1中消失时,会被移动到L2层;
    • BucketCache 的实现类是:CombinedBlockCache。所有的数据块都被保存到了BucketCache,而元数据块,例如索引和Bloom数据块都被保存到了LruBlockCache;

Write Ahead Log (WAL)

        写前日志WAL记录了所有的HBase的数据变动,并存储到磁盘文件中;WAL之所以会有价值,是因为数据是定时从内存存储写入到文件中的,当写入操作未完成但是RegionServer却down机时,WAL保证这些数据可以被重新读取;如果写入WAL失败,整个写入将会返回失败;
        HBase使用HLog来实现保存WAL。一般每个RegionServer实例只有一个WAL实例,在写入MemStore之前,HBase会先将日志写入WAL。
WAL在HDFS中的目录位于每个regionserver子目录下的/hbase/WALs;

2014年7月11日星期五

HBase架构分析(一) 数据模型

HTable

向HBase存储数据,实际上是通过HBase Table来保存数据的。HBase Table是由行(rows)和列(colmums)组成的。所有的列都属于一个列族(column family),而表的单元(table cells)代表着列和行的交叉点是版本化的(versioned),而每个cell中存储的数据是一个字节数组。
表的row key也是由字节数组成的,因此它可以是string或者其他什么复杂类型对象或者二进制数据甚至是序列化的数据结构。HBase中的row是通过row key作为主键存储的,存储是字节有序(byte-ordered)的,所有的访问Table的数据请求都是通过row key完成的,它是表的主键。
图1. HTable

概念视图

下表是一个从HBase说明上拿过来的例子。Table名字是webtable,包含两个列族,分别是contents和anchor。anchor包含两个列:anchor:cssnsi.com和anchor:my.look.ca; 而contents包含一个列:contents:html

列名称

列名称是由该列的列族名作为前缀,本身的qualifier作为后缀组成,中间使用冒号分割;

Row KeyTimeStampColumnFamily contentsColumnFamily anchor
"com.cnn.www"t9anchor:cnnsi.com = "CNN"
"com.cnn.www"t8anchor:my.look.ca = "CNN.com"
"com.cnn.www"t6contents:html = "<html>..."
"com.cnn.www"t5contents:html = "<html>..."
"com.cnn.www"t3contents:html = "<html>..."

物理视图

虽然逻辑上table被看做是row的集合,但是实际在物理存储上,文件是按照列族分别存储的。新增加的列:clomunfamily:column可以直接被添加到任何的列族上,而无需提前声明。

列族anchor
Row KeyTime StampColumn Family anchor
"com.cnn.www"t9anchor:cnnsi.com = "CNN"
"com.cnn.www"t8anchor:my.look.ca = "CNN.com"
列族contents
Row KeyTime StampColumnFamily "contents:"
"com.cnn.www"t6contents:html = "<html>..."
"com.cnn.www"t5contents:html = "<html>..."
"com.cnn.www"t3contents:html = "<html>..."
需要说明的是,概念视图中的空白cells并未被存储,因为一个面向列族存储格式无需存储。因此,当请求contents:html在时间点t8时的值不会返回任何值;如果没提供时间戳,则最新被写入的值会被返回,因为存储时是按照时间倒序存储的。所以当请求com.cnn.www时没有提供时间戳参数,则contents:html在t6的值,anchor:my.look.ca在t8的值和anchor:cnnsi.com在t9的值被返回。更多有关Apache HBase如何存储数据请关注之后的博客:Region

Namespace命名空间

命名空间有三个特点:
  • 配额管理:限定一个命名空间可以消费的资源(regions, tables)上限;
  • 安全管理
  • RegionServer分组

命名空间管理

<table namespace>:<table qualifier>

#Create a namespace
create_namespace 'my_ns'
            
#create my_table in my_ns namespace
create 'my_ns:my_table', 'fam'
          
#drop namespace
drop_namespace 'my_ns'
          
#alter namespace
alter_namespace 'my_ns', {METHOD => 'set', 'PROPERTY_NAME' => 'PROPERTY_VALUE'}

定义好的命名空间

有两个定义好的命名空间
  • hbase-系统命名空间,用于hbase内部表的存储;
  • default-默认空间,如果表没有指定的命名空间则会自动被计入default;

Row

row keys是byte数组类型。Row存储时按照顺序存储,最小的row key存储在第一个。

Column Family

列在hbase中按照列族进行分组。一个列族的所有成员都以该列族名称作为前缀。
在物理存储上,所有同一个列族的成员都会被存储在同一个文件系统里。由于协调存储细节都在列族一级完成,建议所有的列族成员有着相同的访问模式和大小特征;

Cells

{row,column,version}元组指定了表中的一个cell,cell中的数据是按字节存储的。

Versions

由于{row,column,version}元组指定了表中的一个cell,所以一个可能性是,有大量的cell具有相同的row和column值,但是仅仅是version不同。
rows和columns的关键字都是按字节存储的,但是version确实一个长整型数据。这个版本标识包括当前的时间戳,例如java.util.Date.getTime()或Sytem.currentTimeMillis()方法返回的值,距离1970年1月1日 UTC时间的毫秒数。
Hbase的version是按照递减排序的,所以当读取一个存储文件时,最新的数据会先被发现。
关于versions,有两个非常常见的问题:
  • 如果对同一cell的多个写入操作具有相同的版本号,那么哪个值会被写入?
    • 答案是:最后一个被写入的值
  • 使用同一个version对cell进行写入操作是否OK?
    • 答案是:OK

versions和HBase的操作

get/Scan
默认情况下,如果不指定version信息,那么当执行一个get操作时,具有最大的version值的cell会被返回(有可能不是最后被写入的那个cell,见后续),默认情况可以通过下面两个方式改变:
default get实例,下面的Get会返回当前行的最新数据
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR = "attr".getBytes();
...
Get get = new Get(Bytes.toBytes("row1"));
Result r = htable.get(get);
byte[] b = r.getValue(CF, ATTR);  // returns current version of value
而下面的例子会返回至少3个版本的数据
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR = "attr".getBytes();
...
Get get = new Get(Bytes.toBytes("row1"));
get.setMaxVersions(3);  // will return last 3 versions of row
Result r = htable.get(get);
byte[] b = r.getValue(CF, ATTR);  // returns current version of value
List kv = r.getColumn(CF, ATTR);  // returns all versions of this column

Put操作
执行一个put操作,cell总会产生一个新的version信息。默认情况下,系统会使用当前的时间戳currentTimeMillis,不过你也可以指定一个version值(长整型),这意味着可以指定一个将来的或过去的时间戳,或随意一个没有时间含义的数字。
覆盖现在的某一个值,执行一个put操作,根据相同的row,column和version。
使用内部机制设置version
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR = "attr".getBytes();
...
Put put = new Put(Bytes.toBytes(row));
put.add(CF, ATTR, Bytes.toBytes( data));
htable.put(put);
指定一个version
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR = "attr".getBytes();
...
Put put = new Put( Bytes.toBytes(row));
long explicitTimeInMs = 555;  // just an example
put.add(CF, ATTR, explicitTimeInMs, Bytes.toBytes(data));
htable.put(put);

Delete操作
有三种不同的删除类型,分别是

  • 删除一列指定版本的值
  • 删除一列所有版本的值
  • 删除一个列族中所有列的值
当删除一个行时,HBase内部会给每个列族创建一个删除标记,而不是每一个列。
删除通过创建删除标记来实现的,例如,你打算删除一行数据,执行操作时可以制定一个version或默认使用currentTimeMillis作为version,这意味着,每一个cell中等于或小于这个version的cell值都会被删除。HBase不会立刻对delete操作修改存储文件中的数据,而是写入了一个删除标记,这个标记会标记那些被删除的值,如果你指定的version值大于已存在的最大的version,你可以认为这行数据已经被删除了。
删除标记在major数据合并时被执行清除,除非列族设置了KEEP_DELETED_CELLS值。某些场景下,用户可能需要对delete的数据保存一段时间,此时可以通过在配置文件中设置TTL: hbase.hstore.time.to.purge.deletes来实现。如果没有设置或设置为0,所有的删除标记的数据都会在major文件合并时被清除,换句话说,删除标记的持续时间:(major文件合并时间-标记时间)+TTL

Delete操作会覆盖Put操作
即使put发生在delete之后,依然会被delete覆盖。如果已经删除了version<=T的数据,那么当对version<=T的数据进行put操作时,该数据仍旧会被标记为删除,但是put操作虽然不会失败,单当执行get操作时,会发现该数据没有被改变。它会在major文件合并之后执行,如果你一直在执行一个递增的version,这并不是一个问题。

主要文件合并会改变搜索结果
当设置一个cell的三个值的版本为t1,t2,t3时,如果搜索两个版本的数据,会返回t2,t3的值,但是当标记t2被删除时,执行主文件合并之后,t1,t3会被返回了。

排序

所有对HBase的操作,HBase都会返回有序的数据。首先是按照row排序,然后是列族,后面是column qualifier,最后是时间戳(反序),因此新数据会首先返回;

Column元数据

列族内部的key-value实例之外不会存储任何元数据。因此,HBase不光能支持每行中大量的列,而且支持不同的列中不同的行集合;
唯一获取行中完整的列集合的方法是处理所有的行数据,关于Key-Value存储结构,简要介绍一下:
Key-Value在字节数组中的格式如下:
  • key-length
  • value-length
  • key
  • value
其中,key可以被解析成如下结构:
  • rowlength
  • row(例如row key)
  • columnfamiliy lenght
  • columnqualifier
  • timestamp
  • keytype

JOIN操作

在HBase中,join操作是不支持的。
但是开发者可以在自己的应用中自己实现。两个主要的策略是:向HBase写入非规范化的数据,或者通过在应用或MapReduce代码中查询HBase多张表的数据之后再做处理。

2014年7月1日星期二

Mysql中查询相关优化分析

无意中看到一个SQL的对比分析,关于独立子查询和相关子查询对最终结果集的耗时对比。主要的SQL集中在in (select ...)下面分析一下此类SQL优化的原理和方法;

子查询

当一个查询是另外一个查询的条件时,此时称之为子查询;
子查询又分为独立子查询和相关子查询,顾名思义,独立子查询表示该子查询可以独立完成,不依赖于外部变量;而相关子查询则依赖于外部数据传入;

慢查询日志

Mysql可以通过修改/etc/my.cnf来设定慢查询日志。修改my.cnf文件:
slow_query_log = 1
slow_query_log_file = /tmp/mysqlslow.log
long_query_time = 0.0001
log-queries-not-using-indexes
log-slow-admin-statements
log-slow-slave-statements
然后重启mysql
#>service mysqld restart
我们设定的超时时间是0.0001秒,当查询超过这个时间就会打印慢查询日志。

实例

查询一张表,查询出每月最后一天的数据并展示;
我们可能第一时间写出的sql类似于这样:
SELECT * FROM orders where date in (select max(date) from orders group by substring(orders.date, -10, 7));
注意,这个sql语句用到了in,按照我们自然的想法,我们会认为这个sql将会先执行in后面的子查询,再执行外部查询。但实际上是这样吗?
我们用explain解释一下这个sql,返回:
mysql> explain SELECT * FROM orders where date in (select max(date) from orders group by substring(orders.date, -10, 7));
+----+--------------------+----------+------+---------------+------+---------+------+------+---------------------------------+
| id | select_type        | table    | type | possible_keys | key  | key_len | ref  | rows | Extra                           |
+----+--------------------+----------+------+---------------+------+---------+------+------+---------------------------------+
|  1 | PRIMARY            | orders   | ALL  | NULL          | NULL | NULL    | NULL |   93 | Using where                     |
|  2 | DEPENDENT SUBQUERY | orders   | ALL  | NULL          | NULL | NULL    | NULL |   93 | Using temporary; Using filesort |
+----+--------------------+----------+------+---------------+------+---------+------+------+---------------------------------+
我们看到后面的子查询是一个DEPENDENT SUBQUERY,也就是一个相关子查询。
原来,在mysql执行这个sql语句时,该语句被翻译为下面的语句执行:
SELECT * FROM orders where date exists (select max(date) from orders group by substring(orders.date, -10, 7) having `max(date)`=date);
解释之后是作为一个相关子查询来执行,假设外部查询的时间复杂度是O(M), 相关子查询的时间复杂度是O(N), 则此查询的时间复杂度不是O(M+N)而是O(M+M*N)
通过慢查询日志我们也可以看到该子查询的逻辑查询次数:
# Time: 140701 15:28:05
# User@Host: root[root] @ localhost []
# Query_time: 0.015520  Lock_time: 0.000162 Rows_sent: 4  Rows_examined: 9480
SET timestamp=1404199685;
SELECT * FROM orders where date in (select max(date) from orders group by substring(orders.date, -10, 7));
我们当前的表行数只有93行,但是这个sql执行时逻辑查询次数竟然达到了9480次。因此in的效率很低,最好尽量避免使用in语句。

优化

可以通过降低相关子查询表的条数来降低整体逻辑查询次数,例如这个sql可以优化为:
SELECT * FROM orders A where exists (select * from (select max(date) from orders group by substring(orders.date, -10, 7)) B where B.`max(date)`=A.date);
解释这个sql可以看到:
+----+--------------------+------------+------+---------------+------+---------+------+------+---------------------------------+
| id | select_type        | table      | type | possible_keys | key  | key_len | ref  | rows | Extra                           |
+----+--------------------+------------+------+---------------+------+---------+------+------+---------------------------------+
|  1 | PRIMARY            | A          | ALL  | NULL          | NULL | NULL    | NULL |   93 | Using where                     |
|  2 | DEPENDENT SUBQUERY |  | ALL  | NULL          | NULL | NULL    | NULL |    4 | Using where                     |
|  3 | DERIVED            | youku_tj   | ALL  | NULL          | NULL | NULL    | NULL |   93 | Using temporary; Using filesort |
+----+--------------------+------------+------+---------------+------+---------+------+------+---------------------------------+
相关子查询的行数只有4行了。大大降低了逻辑查询次数,从慢查询日志也反映了这个改变:
# Time: 140701 15:27:27
# User@Host: root[root] @ localhost []
# Query_time: 0.000915  Lock_time: 0.000201 Rows_sent: 4  Rows_examined: 560
SET timestamp=1404199647;
SELECT * FROM orders A where exists (select * from (select max(date) from orders group by substring(orders.date, -10, 7)) B where B.`max(date)`=A.date);
因此当通过explain查看sql的执行效率时,要注意DEPENDENT SUBQUERY对效率的影响,从两个SQL的执行效率看,改进后的执行之间是0.00915s,改进之前是0.015520s 还是影响比较大的,尤其对行数很大的表影响尤其明显;