欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hbase(二)Client客户端

程序员文章站 2022-07-14 23:49:05
...

对于使用HBase的业务方来说,从HBase客户端到HBase服务端,再到HDFS客户端,最后到HDFS服务端,这是一整条路径,其中任何一个环节出现问题,都会影响业务的可用性并造成延迟。因此,HBase的业务方需要对HBase客户端有较好地理解,以便优化服务体验。而事实上,由于HBase本身功能的复杂性以及Region定位功能设计在客户端上,导致HBase客户端并不足够轻量级。

1、 HBase客户端实现

HBase提供了面向Java、C/C++、Python等多种语言的客户端。由于HBase本身是Java开发的,所以非Java语言的客户端需要先访问ThriftServer,然后通过ThriftServer的Java HBase客户端来请求HBase集群。对其他语言的客户端,推荐使用ThriftServer的方式来访问HBase服务。

这里主要探讨HBase社区Java客户端。

下面通过一个访问HBase集群的典型示例代码,阐述HBase客户端的用法和设计,代码如下所示:

public class TestDemo {
  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
  public static final TableName tableName = TableName.valueOf("testTable");
  public static final byte[] ROW_KEY0 = Bytes.toBytes("rowkey0");
public static final byte[] ROW_KEY1 = Bytes.toBytes("rowkey1");
  public static final byte[] FAMILY = Bytes.toBytes("family");
  public static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
  public static final byte[] VALUE = Bytes.toBytes("value");

  @BeforeClass
  public static void setUpBeforeClass() throws Exception {
    TEST_UTIL.startMiniCluster();
  }

  @AfterClass
  public static void tearDownAfterClass() throws Exception {
    TEST_UTIL.shutdownMiniCluster();
  }

  @Test
  public void test() throws IOException {
    Configuration conf = TEST_UTIL.getConfiguration();
    try (Connection conn = ConnectionFactory.createConnection(conf)) {
      try (Table table = conn.getTable(tableName)) {
        for (byte[] rowkey : new byte[][] { ROW_KEY0, ROW_KEY1 }) {
Put put = new Put(rowkey).addColumn(FAMILY, QUALIFIER, VALUE);
          table.put(put);
        }

        Scan scan = new Scan().withStartRow(ROW_KEY1).setLimit(1);
        try (ResultScanner scanner = table.getScanner(scan)) {
          List<Cell> cells = new ArrayList<>();
          for (Result result : scanner) {
            cells.addAll(result.listCells());
          }
          Assert.assertEquals(cells.size(), 1);
          Cell firstCell = cells.get(0);
          Assert.assertArrayEquals(CellUtil.cloneRow(firstCell), ROW_KEY1);
          Assert.assertArrayEquals(CellUtil.cloneFamily(firstCell), FAMILY);
          Assert.assertArrayEquals(CellUtil.cloneQualifier(firstCell), QUALIFIER);
          Assert.assertArrayEquals(CellUtil.cloneValue(firstCell), VALUE);
        }
      }
}
}
}

这个示例是一个访问HBase的单元测试代码。我们在类TestDemo初始化前,通过HBase的HBaseTestingUtility工具启动一个运行在本地的Mini HBase集群,最后跑完所有的单元测试样例之后,同样通过HBaseTestingUtility工具清理相关资源,并关闭集群。

下面重点讲解TestDemo#test方法的实现。主要步骤如下。

步骤1:获取集群的Configuration对象。

对访问HBase集群的客户端来说,一般需要3个配置文件:hbase-site.xml、core-site.xml、hdfs-site.xml。只需把这3个配置文件放到JVM能加载的classpath下即可,然后通过如下代码即可加载到Configuration对象:

Configuration conf = HBaseConfiguration.create();

在示例中,由于HBaseTestingUtility拥有API可以方便地获取到Configuration对象,所以省去了加载Configuration对象的步骤。

步骤2:通过Configuration初始化集群Connection。

Connection是HBase客户端进行一切操作的基础,它维持了客户端到整个HBase集群的连接,例如一个HBase集群中有2个Master、5个RegionServer,那么一般来说,这个Connection会维持一个到Active Master的TCP连接和5个到RegionServer的TCP连接。

通常,一个进程只需要为一个独立的集群建立一个Connection即可,并不需要建立连接池。建立多个连接,是为了提高客户端的吞吐量,连接池是为了减少建立和销毁连接的开销,而HBase的Connection本质上是由连接多个节点的TCP链接组成,客户端的请求分发到各个不同的物理节点,因此吞吐量并不存在问题;另外,客户端主要负责收发请求,而大部分请求的响应耗时都花在服务端,所以使用连接池也不一定能带来更高的效益。

Connection还缓存了访问的Meta信息,这样后续的大部分请求都可以通过缓存的Meta信息定位到对应的RegionServer。

步骤3:通过Connection初始化Table。

Table是一个非常轻量级的对象,它实现了用户访问表的所有API操作,例如Put、Get、Delete、Scan等。本质上,它所使用的连接资源、配置信息、线程池、Meta缓存等,都来自步骤2创建的Connection对象。因此,由同一个Connection创建的多个Table,都会共享连接、配置信息、线程池、Meta缓存这些资源。在branch-1以及之前的版本中,Table并不是线程安全的类,所以并不建议在多个线程之间共享使用同一个Table实例。在HBase 2.0.0及之后的版本中,Table已经实现为线程安全类。总体上,由于Table是一个非常轻量级的对象,所以可以通过同一个Connection为每个请求创建一个Table,但要记住,在该请求执行完之后需关闭Table对象。步骤4:通过Table执行Put和Scan操作。

从示例代码中可以明显看出,HBase操作的rowkey、family、column、value等都需要先序列化成byte[],同样读取的每一个cell也是用byte[]来表示的。

以上就是访问HBase表数据的全过程。

2、 定位Meta表

HBase一张表的数据是由多个Region构成,而这些Region是分布在整个集群的RegionServer上的。那么客户端在做任何数据操作时,都要先确定数据在哪个Region上,然后再根据Region的RegionServer信息,去对应的RegionServer上读取数据。因此,HBase系统内部设计了一张特殊的表——hbase:meta表,专门用来存放整个集群所有的Region信息。hbase:meta中的hbase指的是namespace,HBase容许针对不同的业务设计不同的namespace,系统表采用统一的namespace,即hbase;meta指的是hbase这个namespace下的表名。

首先,我们来介绍一下hbase:meta表的基本结构,打开HBase Shell,我们可以看到hbase:meta表的结构如下:

./bin/hbase shell
hbase(main):001:0> describe 'hbase:meta'
hbase:meta, {TABLE_ATTRIBUTES => {IS_META => 'true', REGION_REPLICATION => '1', 
  coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoi
  nt|536870911|'}}
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', BLOOMFILTER => 'NONE', VERSIONS => '10',
IN_MEMORY => 'true', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =>'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', CACHE_DATA_IN_L1 => 'true',
 MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '8192',
 REPLICATION_SCOPE => '0'}
1 row(s) in 0.2350 seconds

hbase:meta表的结构非常简单,整个表只有一个名为info的ColumnFamily。而且HBase保证hbase:meta表始终只有一个Region,这是为了确保meta表多次操作的原子性,因为HBase本质上只支持Region级别的事务(注意表结构中用到了MultiRowMutationEndpoint这个coprocessor,就是为了实现Region级别事务)。

那么,hbase:meta表内具体存放的是哪些信息呢?下图较为清晰地描述了hbase:meta表内存储的信息。

总体来说,hbase:meta的一个rowkey就对应一个Region,rowkey主要由TableName(业务表名)、StartRow(业务表Region区间的起始rowkey)、Timestamp(Region创建的时间戳)、EncodedName(上面3个字段的MD5Hex值)4个字段拼接而成。每一行数据又分为4列,分别是info:regioninfo、info:seqnumDuringOpen、info:server、info:serverstartcode。

info:regioninfo:该列对应的Value主要存储4个信息,即EncodedName、RegionName、Region的StartRow、Region的StopRow。

info:seqnumDuringOpen:该列对应的Value主要存储Region打开时的sequenceId。

info:server:该列对应的Value主要存储Region落在哪个RegionServer上。

info:serverstartcode:该列对应的Value主要存储所在RegionServer的启动Timestamp。

Hbase(二)Client客户端

理解了hbase:meta表的基本信息后,就可以根据rowkey来查找业务的Region了。例如,现在需要查找micloud:note表中rowkey=userid334452'所在的Region,可以设计如下查询语句:

scan 'hbase:meta', {STARTROW=>'micloud:note,userid334452,9999999999999',REVERSED=>true,LIMIT=>1}

这里,读者可能会感到奇怪:为什么需要用一个9999999999999的timestamp,以及为什么要用反向查询Reversed Scan呢?

首先,9999999999999是13位时间戳中最大值。其次因为HBase在设计hbase:meta表的rowkey时,把业务表的StartRow(而不是StopRow)放在hbase:meta表的rowkey。这样,如果某个Region对应的区间是[bbb,ccc),为了定位rowkey=bc的Region,通过正向Scan只会找到[bbb,ccc)这个区间的下一个区间,但是,即使我们找到了[bbb,ccc)的下一个区间,也没法快速找到[bbb,ccc)这个Region的信息。所以,采用Reversed Scan是比较合理的方案。

在理解了如何根据rowkey去hbase:meta表中定位业务表的Region之后,试着思考另外一个问题:HBase作为一个分布式数据库系统,一个大的集群可能承担数千万的查询写入请求,而hbase:meta表只有一个Region,如果所有的流量都先请求hbase:meta表找到Region,再请求Region所在的RegionServer,那么hbase:meta表的将承载巨大的压力,这个Region将马上成为热点Region,且根本无法承担数千万的流量。那么,如何解决这个问题呢?

事实上,解决思路很简单:把hbase:meta表的Region信息缓存在HBase客户端,如图所示。

Hbase(二)Client客户端

 

HBase客户端有一个叫做MetaCache的缓存,在调用HBase API时,客户端会先去MetaCache中找到业务rowkey所在的Region,这个Region可能有以下三种情况:

·Region信息为空,说明MetaCache中没有这个rowkey所在Region的任何Cache。此时直接用上述查询语句去hbase:meta表中Reversed Scan即可,注意首次查找时,需要先读取ZooKeeper的/hbase/meta-region-server这个ZNode,以便确定hbase:meta表所在的RegionServer。在hbase:meta表中找到业务rowkey所在的Region之后,将(regionStartRow,region)这样的二元组信息存放在一个MetaCache中。这种情况极少出现,一般发生在HBase客户端到服务端连接第一次建立后的少数几个请求内,所以并不会对HBase服务端造成巨大压力。

·Region信息不为空,但是调用RPC请求对应RegionServer后发现Region并不在这个RegionServer上。这说明MetaCache信息过期了,同样直接Reversed Scan hbase:meta表,找到正确的Region并缓存。通常,某些Region在两个RegionServer之间移动后会发生这种情况。但事实上,无论是RegionServer宕机导致Region移动,还是由于Balance导致Region移动,发生的几率都极小。而且,也只会对Region移动后的极少数请求产生影响,这些请求只需要通过HBase客户端自动重试locate meta即可成功。

·Region信息不为空,且调用RPC请求到对应RegionSsrver后,发现是正确的RegionServer。绝大部分的请求都属于这种情况,也是代价极小的方案。

由于MetaCache的设计,客户端分摊了几乎所有定位Region的流量压力,避免出现所有流量都打在hbase:meta的情况,这也是HBase具备良好拓展性的基础。所谓Region级别事务,就是当多个操作落在同一个Region内时,HBase能保证这一批操作执行的原子性。如果多个操作分散在不同的Region,则无法保证这批操作的原子性。

3 Scan的复杂之处

HBase客户端的Scan操作应该是比较复杂的RPC操作。为了满足客户端多样化的数据库查询需求,Scan必须能设置众多维度的属性。常用的有startRow、endRow、Filter、caching、batch、reversed、maxResultSize、version、timeRange等。

为便于理解,我们先来看一下客户端Scan的核心流程。在上面的代码示例中,我们已经知道table.getScanner(scan)可以拿到一个scanner,然后只要不断地执行scanner.next()就能拿到一个Result,用户每次执行scanner.next(),都会尝试去名为cache的队列中拿result。如果cache队列已经为空,则会发起一次RPC向服务端请求当前scanner的后续result数据。客户端收到result列表之后,通过scanResultCache把这些results内的多个cell进行重组,最终组成用户需要的result放入到Cache中。为什么需要对RPC response中的result进行重组呢?这是因为RegionServer为了避免被当前RPC请求耗尽资源,实现了多个维度的资源限制(例如timeout、单次RPC响应最大字节数等),一旦某个维度资源达到阈值,就马上把当前拿到的cell返回给客户端。这样客户端拿到的result可能就不是一行完整的数据,因此需要对result进行重组。

理解了scanner的执行流程之后,再来理解Scan的几个重要的概念。

·caching:每次loadCache操作最多放caching个result到cache队列中。控制caching,也就能控制每次loadCache向服务端请求的数据量,避免出现某一次scanner.next()操作耗时极长的情况。

·batch:用户拿到的result中最多含有一行数据中的batch个cell。如果某一行有5个cell,Scan设的batch为2,那么用户会拿到3个result,每个result中cell个数依次为2,2,1。

·allowPartial:用户能容忍拿到一行部分cell的result。设置了这个属性,将跳过重组流程,直接把服务端收到的result返回给用户。

·maxResultSize:loadCache时单次RPC操作最多拿到maxResultSize字节的结果集。

4 HBase客户端其他要点

4.1   RPC重试配置要点        

在HBase客户端到服务端的通信过程中,可能会碰到各种各样的异常。例如有以下几种导致重试的常见异常:

·待访问Region所在的RegionServer发生宕机,此时Region已经被移到一个新的RegionServer上,但由于客户端存在meta缓存,首次RPC请求仍然访问到了旧的RegionServer。后续将重试发起RPC。

·服务端负载较大,导致单次RPC响应超时。客户端后续将继续重试,直到RPC成功或者超过客户容忍最大延迟。

·访问meta表或者ZooKeeper异常。

下面我们了解一下HBase常见的几个超时参数。

1)hbase.rpc.timeout:表示单次RPC请求的超时时间,一旦单次RPC超过该时间,上层将收到TimeoutException。默认为60000ms。

2)hbase.client.retries.number:表示调用API时最多容许发生多少次RPC重试操作。默认为35次。

3)hbase.client.pause:表示连续两次RPC重试之间的休眠时间,默认为100ms。注意,HBase的重试休眠时间是按照随机退避算法计算的,若hbase.client.pause=100,则第一次RPC重试前将休眠100ms左右,第二次RPC重试前将休眠200ms左右,第三次RPC重试前将休眠300ms左右,第四次重试前将休眠500ms左右,第五次重试前将休眠1000ms左右,第六次重试则将休眠2000ms左右……也就是重试次数越多,则休眠的时间会越长。因此,若按照默认的hbase.client.retries.number=35,则可能长期卡在休眠和重试两个步骤中。

4)hbase.client.operation.timeout:表示单次API的超时时间,默认值为1200000ms。注意,get/put/delete等表操作称为一次API操作,一次API可能会有多次RPC重试,这个operation.timeout限制的是API操作的总超时。

假设某业务要求单次HBase的读请求延迟不超过1s,那么该如何设置上述4个超时参数呢?

首先,hbase.client.operation.timeout应该设成1s。其次,在SSD集群上,如果集群参数设置合适且集群服务正常,则基本可以保证p99延迟在100ms以内,因此hbase.rpc.timeout设成100ms。这里,hbase.client.pause用默认的100ms。

最后,在1s之内,第一次RPC耗时100ms,休眠100ms;第二次RPC耗时100ms,休眠200ms;第三次RPC耗时100ms,休眠300ms;第四次RPC耗时100ms,休眠500ms(不是完全线性递增的)。因此,在hbase.client.operation.timeout内,至少可执行4次RPC重试,实际中单次RPC耗时可能更短(因为有hbase.rpc.timeout保证了单次RPC最长耗时),所以hbase.client.retries.number可以稍微设大一点(保证在1s内有更多的重试,从而提高请求成功的概率),比如设成6次。

4.2 CAS接口是Region级别串行执行的,吞吐受限

HBase客户端提供一些重要的CAS(Compare And Swap)接口,例如:

boolean checkAndPut(byte[] row, byte[] family,byte[] qualifier,byte[] value, Put put)

long incrementColumnValue(byte[] row,byte[] family,byte[] qualifier,long amount)

这些接口在高并发场景下,能很好地保证读取与写入操作的原子性。例如,有多个分布式的客户端同时更新一个计数器count,可以通过increment接口来保证任意时刻只有一个客户端能成功原子地执行count++操作。

需要特别注意的是,这些CAS接口在RegionServer上是Region级别串行执行的,也就是说,同一个Region内部的多个CAS操作是严格串行执行的,不同Region间的多个CAS操作可以并行执行。

以checkAndPut为例,简要说明一下CAS的运行步骤:

1)服务端拿到Region的行锁(row lock),避免出现两个线程同时修改一行数据,从而破坏了行级别原子性的情况。

2)等待该Region内的所有写入事务都已经成功提交并在mvcc上可见。

3)通过Get操作拿到需要check的行数据,进行条件检查。若条件不符合,则终止CAS。

4)将checkAndPut的put数据持久化。

5)释放第1)步拿到的行锁。

关键在于第2)步,必须要等所有正在写入的事务成功提交并在mvcc上可见。由于branch-1的HBase是写入完成时,即先释放行锁,再sync WAL,最后推mvcc(写入吞吐更高)。所以,第1)步拿到行锁之后,若跳过第2)步则可能未读取到最新的版本。例如:两个客户端并发对x=100这行数据进行increment操作:

·客户端A读取到x=100,开始进行increment操作,将x设成101。

·注意此时客户端A行锁已释放,但A的put操作mvcc仍不可见。客户端B依旧读到老版本x=100,进行increment操作,又将x设成101。

这样,客户端认为成功执行了两次increment操作,但是服务端却只increment了一次,导致语义矛盾。

因此,对那些依赖CAS(Compare-And-Swap:指increment/append这样的读后写原子操作)接口的服务,需要意识到这个操作的吞吐是受限的,因为CAS操作本质上是Region级别串行执行的。当然,在HBase 2.x版已经调整设计,对同一个Region内的不同行可以并行执行CAS,这大大提高了Region内的CAS吞吐。

4.3 Scan Filter设置

HBase作为一个数据库系统,提供了多样化的查询过滤手段。最常用的就是Filter,例如一个表有很多个列簇,用户想找到那些列簇不为C的数据。那么,可设计一个如下的Scan:

Scan scan = new Scan();

scan.setFilter(new FamilyFilter(CompareOp.NOT_EQUAL,

    new BinaryComparator(Bytes.toBytes("C"))));

如果想查询列簇不为C且Qualifier在[a,z]区间的数据,可以设计一个如下的Scan:

Scan scan = new Scan();

FamilyFilter ff =

    new FamilyFilter(CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("C")));

ColumnRangeFilter qf =

    new ColumnRangeFilter(Bytes.toBytes("a"), true, Bytes.toBytes("b"), true);

filterList ffilterList = new filterList(Operator.MUST_PASS_ALL, ff, qf);

scan.setFilter(ffilterList);

上面代码使用了一个带AND的FilterList来连接FamilyFilter和ColumnRangeFilter。

有了Filter,大量无效数据可以在服务端内部过滤,相比直接返回全表数据到客户端然后在客户端过滤,要高效很多。但是,HBase的Filter本身也有不少局限,如果使用不当,仍然可能出现极其低效的查询,甚至对线上集群造成很大负担。

4.4  少量写和批量写

HBase是一种对写入操作非常友好的系统,但是当业务有大批量的数据要写入HBase中时,仍会碰到写入瓶颈。为了适应不同数据量的写入场景,HBase提供了3种常见的数据写入API,如下所示。

·table.put(put):这是最常见的单行数据写入API,在服务端先写WAL,然后写MemStore,一旦MemStore写满就flush到磁盘上。这种写入方式的特点是,默认每次写入都需要执行一次RPC和磁盘持久化。因此,写入吞吐量受限于磁盘带宽、网络带宽以及flush的速度。但是,它能保证每次写入操作都持久化到磁盘,不会有任何数据丢失。最重要的是,它能保证put操作的原子性。

·table.put(List<Put>puts):HBase还提供了批量写入的接口,即在客户端缓存put,等凑足了一批put,就将这些数据打包成一次RPC发送到服务端,一次性写WAL,并写MemStore。相比第一种方式,此方法省去了多次往返RPC以及多次刷盘的开销,吞吐量大大提升。不过,这个RPC操作耗时一般都会长一点,因为一次写入了多行数据。另外,如果List内的put分布在多个Region内,则不能保证这一批put的原子性,因为HBase并不提供跨Region的多行事务,换句话说,这些put中,可能有一部分失败,一部分成功,失败的那些put操作会经历若干次重试。

·bulk load:通过HBase提供的工具直接将待写入数据生成HFile,将这些HFile直接加载到对应的Region下的CF内。在生成HFile时,在HBase服务端没有任何RPC调用,只在load HFile时会调用RPC,这是一种完全离线的快速写入方式。bulk load应该是最快的批量写手段,同时不会对线上的集群产生巨大压力。当然,在load完HFile之后,CF内部会进行Compaction,但是Compaction是异步的且可以限速,所以产生的IO压力是可控的。因此,bulk load对线上集群非常友好。

例如,我们之前碰到过一种情况,有两个集群,互为主备,其中一个集群由于工具bug导致数据缺失,想通过另一个备份集群的数据来修复异常集群。最快的方式就是,把备份集群的数据导一个快照拷贝到异常集群,然后通过CopyTable工具扫快照生成HFile,最后bulk load到异常集群,完成数据的修复。

另外的一种场景是,用户在写入大量数据后,发现选择的split keys不合适,想重新选择split keys建表。这时,也可以通过Snapshot生成HFile再bulk load的方式生成新表。