欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

深入分析HBase RPC(Protobuf)实现机制

程序员文章站 2022-06-03 20:58:05
...

背景 在HMaster、RegionServer内部,创建了RpcServer实例,并与Client三者之间实现了Rpc调用,HBase0.95内部引入了Google-Protobuf作为中间数据组织方式,并在Protobuf提供的Rpc接口之上,实现了基于服务的Rpc实现,本文详细阐述了HBase-Rpc实现细节。 HBase

背景

在HMaster、RegionServer内部,创建了RpcServer实例,并与Client三者之间实现了Rpc调用,HBase0.95内部引入了Google-Protobuf作为中间数据组织方式,并在Protobuf提供的Rpc接口之上,实现了基于服务的Rpc实现,本文详细阐述了HBase-Rpc实现细节。

HBase的RPC Protocol

?在HMaster、RegionServer内部,实现了rpc 多个protocol来完成管理和应用逻辑,具体如下protocol如下:

HMaster支持的Rpc协议:
MasterMonitorProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase集群监控的目的。

MasterAdminProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase表格的管理。例如TableSchema的更改,Table-Region的迁移、合并、下线(Offline)、上线(Online)以及负载平衡,以及Table的删除、快照等相关功能。

RegionServerStatusProtoco,RegionServer与Master之间的通信,Master是RpcServer端,负责提供RegionServer向HMaster状态汇报的服务。

RegionServer支持的Rpc协议:

ClientProtocol,Client与RegionServer之间的通信,RegionServer是RpcServer端,主要实现用户的读写请求。例如get、multiGet、mutate、scan、bulkLoadHFile、执行Coprocessor等。

AdminProtocols,Client与RegionServer之间的通信,RegionServer是RpcServer端,主要实现Region、服务、文件的管理。例如storefile信息、Region的操作、WAL操作、Server的开关等。

(备注:以上提到的Client可以是用户Api、也可以是RegionServer或者HMaster)

深入分析HBase RPC(Protobuf)实现机制


?HBase-RPC实现机制分析

RpcServer配置三个队列:

1)普通队列callQueue,绝大部分Call请求存在该队列中:callQueue上maxQueueLength为${ipc.server.max.callqueue.length},默认是${hbase.master.handler.count}*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER,目前0.95.1中,每个Handler上CallQueue的最大个数默认值(DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER)为10。

2)优先级队列: PriorityQueue。如果设置priorityHandlerCount的个数,会创建与callQueue相当容量的queue存储Call,该优先级队列对应的Handler的个数由rpcServer实例化时传入。

3)拷贝队列:replicationQueue。由于RpcServer由HMaster和RegionServer共用,该功能仅为RegionServer提供,queue的大小为${ipc.server.max.callqueue.size}指定,默认为1024*1024*1024,handler的个数为hbase.regionserver.replication.handler.count。

RpcServer由三个模块组成:

Listener ===Queue=== Responder

?深入分析HBase RPC(Protobuf)实现机制

这里以HBaseAdmin.listTables为例,分析一个Rpc请求的函数调用过程:

1) RpcClient创建一个BlockingRpcChannel。

2)以channel为参数创建执行RPC请求需要的stub,此时的stub已经被封装在具体Service下,stub下定义了可执行的rpc接口。

3)stub调用对应的接口,实际内部channel调用callBlockingMethod方法。

RpcClient内实现了protobuf提供的BlockingRpcChannel接口方法callBlockingMethod,

? @Override
public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
Message param, Message returnType)
throws ServiceException {
return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,
this.isa, this.rpcTimeout);
}

通过以上的实现细节,最终转换成rpcClient的调用,使用MethodDescriptor封装了不同rpc函数,使用Message基类可以接收基于Message的不同的Request和Response对象。

4)RpcClient创建Call对象,查找或者创建合适的Connection,并唤醒Connection。

5)Connection等待Call的Response,同时rpcClient调用函数中,会使用connection.writeRequest(Call call)将请求写入到RpcServer网络流中。

6)等待Call的Response,然后层层返回给更上层接口,从而完成此次RPC调用。

RPCServer收到的Rpc报文的内部组织如下:

Magic

(4Byte)

Version

(1Byte)

AuthMethod

(1Byte)

Connection

HeaderLength

(4Byte)

ConnectionHeader

Request

“HBas”

验证RpcServer的CURRENT_VERSION

与RPC报文一致

目前支持三类:

AuthMethod.SIMPLE

AuthMethod.KERBEROS

AuthMethod.DIGEST

RPC.proto定义
RPCProtos.ConnectionHeader
message ConnectionHeader {
optional UserInformation userInfo = 1;
optional string serviceName = 2;
// Cell block codec we will use sending over optional cell blocks.? Server throws exception
// if cannot deal.
optional string cellBlockCodecClass = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];
// Compressor we will use if cell block is compressed.? Server will throw exception if not supported.
// Class must implement hadoop’s CompressionCodec Interface
optional string cellBlockCompressorClass = 4;
}
序列化之后的数据

整个Request存储是经过编码之后的byte数组,包括如下几个部分:

RequestHeaderLength(RawVarint32)

RequestHeader

ParamSize(RawVarint32)

Param

CellScanner

RPC.proto定义:
message RequestHeader {
// Monotonically increasing callId to keep track of RPC requests and their response
optional uint32 callId = 1;
optional RPCTInfo traceInfo = 2;
optional string methodName = 3;
// If true, then a pb Message param follows.
optional bool requestParam = 4;
// If present, then an encoded data block follows.
optional CellBlockMeta cellBlockMeta = 5;
// TODO: Have client specify priority
}
序列化之后的数据
并从Header中确认是否存在Param和CellScanner,如果确认存在的情况下,会继续访问。

Protobuf的基本类型Message,
Request的Param继承了Message,
这个需要获取的Method类型决定。

从功能上讲,RpcServer上包含了三个模块,

1)Listener。包含了多个Reader线程,通过Selector获取ServerSocketChannel接收来自RpcClient发送来的Connection,并从中重构Call实例,添加到CallQueue队列中。

?”IPC Server listener on 60021″ daemon prio=10 tid=0x00007f7210a97800 nid=0x14c6 runnable [0x00007f720e8d0000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked (a sun.nio.ch.Util$2)
- locked (a java.util.Collections$UnmodifiableSet)
- locked (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:84)
at org.apache.hadoop.hbase.ipc.RpcServer$Listener.run(RpcServer.java:646)

2)Handler。负责执行Call,调用Service的方法,然后返回Pair

“IPC Server handler 0 on 60021″ daemon prio=10 tid=0x00007f7210eab000 nid=0x14c7 waiting on condition [0x00007f720e7cf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for? (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at org.apache.hadoop.hbase.ipc.RpcServer$Handler.run(RpcServer.java:1804)

3) Responder。负责把Call的结果返回给RpcClient。

?”IPC Server Responder” daemon prio=10 tid=0x00007f7210a97000 nid=0x14c5 runnable [0x00007f720e9d1000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked (a sun.nio.ch.Util$2)
- locked (a java.util.Collections$UnmodifiableSet)
- locked (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.doRunLoop(RpcServer.java:833)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.run(RpcServer.java:816)

RpcClient为Rpc请求建立Connection,通过Connection将Call发送RpcServer,然后RpcClient等待结果的返回。

?思考

1)为什么HBase新版本使用了Protobuf,并实现RPC接口?

HBase是Hadoop生态系统内重要的分布式数据库,Hadoop2.0广泛采用Protobuf作为中间数据组织方式,整个系统内Wire-Compatible的统一需求。

2)HBase内部实现的Rpc框架对于服务性能的影响?

目前使用Protobuf作为用户请求和内部数据交换的数据格式,采用更为紧缩编码格式,能够提高传输数据的效率。但是,有些优化仍然可以在该框架内探索:

实现多个Request复用Connection(把多个短连接合并成一个长连接);

在RpcServer内创建多个CallQueue,分别处理不同的Service,分离管理逻辑与应用逻辑的队列,保证互不干扰;

Responder单线程的模式,是否高并发应用的瓶颈所在?

是否可以分离Read/Write请求占用的队列,以及处理的handler,从而使得读写性能能够更加平衡?

针对读写应用的特点,在RpcServer层次内对应用进行分级,建立不同优先级的CallQueue,按照Hadoop-FairScheduler的模式,然后配置中心调度(类似OMega或者Spallow轻量化调度方案),保证实时应用的低延迟和非实时应用的高吞吐。优先级更好的Call会优先被调度给Handler,而非实时应用可以实现多个Call的合并操作,从而提高吞吐。

3)Protobuf内置编码与传统压缩技术是否可以配合使用?

使用tcpdump获取了一段HMaster得到的RegionServer上报来的信息:

深入分析HBase RPC(Protobuf)实现机制

以上的信息几乎是明文出现在tcp-ip连接中,因此,是否在Protobuf-RPC数据格式采取一定的压缩策略,会给scan、multiGet等数据交互较为密集的应用提供一种优化的思路。

参考文献:

[1] HBase Rpc Protocols: ?http://blog.zahoor.in/2012/08/protocol-buffers-in-hbase/

[2] HBase project 0.95.1
本系列文章属于Binos_ICT在Binospace个人技术博客原创,原文链接为http://www.binospace.com/index.php/in-depth-analysis-hbase-rpc-0-95-version-implementation-mechanism/,未经允许,不得转载。

From Binospace, post 深入分析HBase RPC(Protobuf)实现机制

文章的脚注信息由WordPress的wp-posturl插件自动生成


Copyright © 2008
This feed is for personal, non-commercial use only.
The use of this feed on other websites breaches copyright. If this content is not in your news reader, it makes the page you are viewing an infringement of the copyright. (Digital Fingerprint:
)