欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hbase-高级API-协处理器(endpoint)

程序员文章站 2022-07-15 10:09:32
...

endpoint就像是RDBMS中的存储过程一样
行键决定了哪一个 region处理这个请求
协处理器提供了以endpoint概念为代表的动态调用,实现将计算转移到服务器的功能(如计算某个多列数据,最坏可能会传递多有列到客户端)

CoprocessorProtocol接口
为给客户端提供自定义的RPC协议,系统提供一个扩展CoprocessorProtocol协议的协处理器 。 通过这个接口可以定义协处理器希望暴露给用户的任意方法。通过以下 HTable提供的调用方法 , 这个协议实现协处理器实例之间通信(怎么使用协处理器,)

//单
<T extends CoprocessorProtocol> T coprocessorProxyl(Class<T> protoco1,byte[]  row)

<T extends CoprocessorProtoco1,R> Map<byte[],R> coprocessorExec(Class<T> protoco1,byte[]  startKey,byte[]  endKey,Batch.Ca11<T,R> ca11able)
//范围region
<T extends CoprocessorProtoco1,R> void coprocessorExec(Class<T> protocol,bytel]  startKey,byte[] endKey,Batch.Ca11<T,R> callable,Batch.Callback<R> ca11back) 

CoprocessorProtocol实例和表中单个 region联系在一起,所以客户端的RPC调用必须定义region, 这个 region会在 CoprocessorProtocol方法的调用中被使用到(region之间的联系使)

客户端代码很少直接对 region进行操作,而且 region的名字经常变化,然而协处理器 RPC调用会通过行键来查找涉及的 region
单个region
此方法使用单个行键调用coprocessorProxy () 返回一个CoprocessorProtocol接口的动态代理,它使用包含给定行键的region作为RPC endpoint,即使给空行键对应的行不存在也不影响。
一段范围的region
此方法通过使用起始行键和终止行键来调用coprocessorExec () 表中包含在 起始行键到终止行键(不包含终止行健)范围内的所有region都将作为PRC endpoint

被传入到HTable的方法中的行键作为参数
这个参数不会传入CoprocessorProtocol 的实现中,而仅仅被用于确定远端调用的endpoint的region

Batch类为CoprocessorProtocol为设计region的方法实现了两个接口: Batch.Call方法(客户端方法)来调用CoprocessorProtocol实例的方法,每个选中的 region将会调用一次这个接口的call ()方法,并将CoprocessorProtocol实例作为 region的参数

在调用完成时,客户端可选择实现Batch.Callback来传回每次region调用的结果。

//使用函数R call (T instance)返回的值作为参数,并且每个 region都会调用并返回
void update(byte[] region,byte[] row,R result)

BaseEndpointCoprocessor 类
实现一个endpoint需要两步
扩展 CoprocessorProtocol 接口。
这将设定与新endpoint的通信细节,即定义了客户端和服务器端的PRC协议
扩展 BaseEndpointCoprocessor 类。
!!必须实现endpoint涉及的方法,包括抽象类BaseEndpointCoprocessor, 以及之前定义的endpoint协议接口(region需要用到的方法在这里面)

//1.实现了CoprocessorProtocol,并为HBase添加了自定义的方法。客户端可远程调用该方法来统计每个region中的行数目和KeyValue数目
//例:实现endpoint协议,添加一个行和KeyValue的计数方法
//这将设定与新endpoint的通信细节,即定义了客户端和服务器端的PRC协议
public interface RowCountProtocol extends CoprocessorProtocol { 
    long getRowCount() throws lOException;

    long getRowCount(Filter filter)throws 10Exception;

    long getKeyValueCount() throws lOException;
}
//2.将新协议的接口和继承自BaseEndpointCoprocessor的类结合起来。使用环境提供InternalScanner实例来访问数据

public class RowCountEndpoint extends BaseEndpointCoprocessor implements RowCountProtocol 

//配置endpoint写处理器
//客户端调用

Batch类提供Batch.forMethod()来跟便捷的访问远程endpoint,它配置完成的实例会发往所有region服务器

Batch.Call call = Batch.forMethod(RowCountProtocal.class,"getKeyValueCount");
Map<byte[],Long> results = table.coprocessorExec(RowCountProtocol.class,null,null,call);

forMethod()方法使用Java的反射机制来获取给定的方法,返回的Batch.Call实例将会 执行endpoint的功能,并且返回此方法的协议(RowCountProtocal.class)定义的应返回的数据类型

通过直接扩展Batch.Call实例,可以对结果执行额外的处理,这样处理会更加方便和灵活

//扩展批量调用来执行多个endpoint的调用
Map<byte[], Pair<Long,Long» results = table.coprocessorExec(
RowCountProtocol.class,null,null,
new Batch.Call<RowCountProtocol,Pair<Long,Long>> () { 
public Pair<Long,Long> call(RowCountProtocol counter) throws IOException {      

    return new Pair(counter.getRowCount(), 
        counter.getKeyValueCount());
    }
});

使用 coprocessorProxy ()获取了一个endpoint的本地客户端代理
由于行键被指定了,不管这个行键在region中是否存在, 只要这个行健在region的起始行键和终止行健之间,客户端API都会通过行键路由该代理调用到包含这个行键的region

RowCountProtocol protocol = table.coprocessorProxy( RowCountProtocol.class,Bytes.toBytes("row4")); 
long rowsInRegion = protocol.getRowCount(); 
System.out.println("Region Row Count: " + rowsInRegion);

通过使用代理引用,客户端代码可以调用任何CoprocessorProtocol中描述的服务器端 函数,同时可以返回对应region的处理结果,图为两种方法的差异
hbase-高级API-协处理器(endpoint)

HTablePool
与其为客户端的每个请求创建一个HTable实例,不如创建一个实例,然后不断地复用 这个实例。创建HTable实例是非常耗时的操作。在资源高度紧张的情况下,应该创建一个实例,多次复用它
在多线程环境中重用HTable实例会出现其他问题。 HTable类不是线程安全的,本地的写缓冲区并不能保证一致性()。必须为每个线程创建一个HTable实例
客户端可以通过HTablePool类来解决这个问题。它的目的是为HBase集群提供客户端连接池

//通过以下构造器创建池
HTablePool()  默认构造器使用classpath的配置创建表实例池,设定大小为无限
//等价
Configuration conf = HBaseConfiguration.create() 
HTablePool pool = new HTablePool(conf,Integer.MAX_VALUE)

HTablePool(Configuration config,int maxSize) 
HTablePool(Configuration config,int maxSize, HTablelnterfaceFactory tableFactory)

maxSize是连接池中允许的最大HTable实例数目
可选参数tableFactory将会被用endpoint

使用表池

//用这个方法来获得表
HTablelnterface getTable(String tableName) 
HTablelnterface getTable(byte[] tableName) 
void putTable(HTablelnterface table)

getTable方法从表实例池中获取HTable实例,使用之后通过putTable()方法放回。以上方法把一些工作迁移到了表实例池配置的HTablelnterfaceFactory接口,maxSize参数并不强行限制用户所能得到的HTablelnterface实例的上界, 用户可以使用getTable方法访问尽可能多得Table实例。 这个参数仅仅设置表实例池中能够存放的HTablelnterface实例的数目
例如,当用户将这个值设置为5 时,调用10次 getTable ()会创建 10个 HTable实例不过之后只有5 次 putTable ()方法发挥作用,后面 5 次会被直接忽略。更重要的是,工厂的释放(release) 机制也不会被调用

关闭表实例池中特定表实例的方法:

void closeTablePool(String tableName) 
void closeTablePool(byte[] tableName)

很明显,这两个方法的功能是相同的
close方法会遍历所有保存在列表中与参数对应的表引用,然后使用工厂的释放机制。 这对于释放一张表的所有资源,并重头再来非常有用。
所有的资源都需要释放,用户需要对自己使用过的所有表都调用这个方法。

//使用htablepool共享htable实例
Configuration conf = HBaseConfiguration.create(); 
HTablePool pool = new HTablePool(conf,5);

HTablelnterface[] tables = new HTablelnterface[10]; 
for(int n = 0;n < 10;n++){ 
    tables[n] = pool.getTable("testtable");
    System.out.println(Bytes.toString(tables[n].getTableName()));
}

for(int n = 0;n < 5;n++){ 
    pool.putTable(tables[n]);
}
pool.closeTablePool ( "testtable" );

创建表实例池,并允许保留5 个实例。
获取10个实例,超出容量5 个。
向表实例池返还HTable实例,其中的5 个会被保留,多余的会被丢弃。 
关闭整个表实例池,释放其中保留的表实例引用。

连接管理
HTable实例都需要与远程主机建立连接。连接在内部用HConnection 类表示,重要的是其被 HConnectionManager类管理并共享。用户只需要创建一个Configuration实例,然后利用客户端API使用这些类。
HBase内部用键值映射来存储连接,使用 Configuration实例作为键值映射的键。换句话说,当你创建很多HTable实例时,如果你提供了相同的Configuration引用,那么它们都共享同一个底层的HConnection实例。细节如下

共享 ZooKeeper连接
因为每个客户端最终都需要ZooKeeper连接来完成表的region地址初始寻址。连接 —旦建立后,共亭就变得很有意义,这使得之后的客户端实例可以共用。
缓存通用资源
通 过 ZooKeeper查询到的-ROOT-和 .META.的地址,以及region的地址定位都需要网络传输开销。这些地址将被缓存在客户端来减少网络的调用次数,因此达到加速寻址的目的

对于每个连接到远程集群的本地客户端来说,它们的地址表都是相同的,因此运行相同进程的客户端共享连接非常有用,这是通过共享HConnection实例来实现的。 另外,当寻址失败时(如 region拆分时),连接有内置的重试机制来刷新缓存,对于 其他所有共享相同连接引用的客户端来说,这项更改立即生效,因此这更加减少了客户端初始化连接的开销
另一个受益的类是HTablePool,所有连接池中的HTable实例都自动共用一个提供的 Configuration实例,因此它们也共享连接。因为当用户想创建多个HTable实例时,最好先创建一个共用的Configuration实例。

HTable tablel = new HTable(“table1”);
HTable table2 = new HTable(“table2”);
上述代码不如以下代码有效:
Configuration conf = HBaseConfiguration.create();
HTable tablel = new HTable(conf,”tablel”);
//…
HTable table2 = new HTable(conf,”table2”);
后者隐式共享HBase客户端API类提供的连接。 目前没有明显的证据表明共享连接会带来性能问题,即使在繁忙的多线程环境下也是如此。

共享连接的缺点在于释放,如果用户不显式关闭连接,它将一直存在,直到客户端退
出。这样可能导致很多ZooKeeper连接都保持打开状态,尤其是在大型分布式环境下, 比如执行MapReduce作业的HBase程序,这样可能会产生一些问题。最坏的情况是耗尽所有的连接句柄或内存,并导致I/O异常。用户可通过显式关闭连接来避免这种情况。

每次用户重用Configuration实例时,连接管理器都会增加引用计数。因此用户必须调 用close来触发清除工作。以下是用显式的方法来清理一个连接或所有连接。
static void deleteConnection(Configuration conf,boolean stopProxy)
static void deleteAllConnections (boolean stopProxy)

所有的共享连接都是按照Configuration实例作为键,因此用户需要提供这个实例来关闭相应的连接。布尔类型参数stopProxy保证强制清除整个客户端的RPC栈,因此不再需要远程连接服务器时,应该将这个参数设置为true。deleteAllConnections ()函数只需要stopProxy参数,它将遍历整个连接管理器注册过的共享连接列表,然后逐一释放连接。

如果用户需要显式地使用某个连接,可以通过如下方式使用getConnection方法。
Configuration newConfig = new Configuration(originalConf);
HConnection connection = HConnectionManager.getConnection(newConfig);
// Use the connection to your hearts’ delight and then when done…
HConnectionManager.deleteConnection(newConfig,true);
这样操作的好处是可以保证这个连接的用户唯一,但是,切记必须要在调用结束后关
闭它。

相关标签: endpoint hbase