0%

HBase连接

HBase 连接

以 alihbase-client 2.8.6 jar 为例

1. 连接的定义

1
2
3
4
5
A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper. Connections are instantiated through the ConnectionFactory class. The lifecycle of the connection is managed by the caller, who has to close() the connection to release the resources.

The connection object contains logic to find the master, locate regions out on the cluster, keeps a cache of locations and then knows how to re-calibrate after they move. The individual connections to servers, meta cache, zookeeper connection, etc are all shared by the Table and Admin instances obtained from this connection.

Connection creation is a heavy-weight operation. Connection implementations are thread-safe, so that the client can create a connection once, and share it with different threads. Table and Admin instances, on the other hand, are light-weight and are not thread-safe. Typically, a single connection per client application is instantiated and every thread will obtain its own Table instance. Caching or pooling of Table and Admin is not recommended.

这是 Connection 类的注解,总结来说连接是一个很重的操作,因为这个连接要关联 zk、server、缓存等等。

client 端进行读写操作,第一步要连接 zk 获取元数据所在的 region server;第二步去元数据所在的 region server 里读取元数据,根据 rowkey 确定数据所在的 region server;第三步发送读写请求到具体的 region server 中

Connection 是线程安全的,所以在使用的时候,一个进程共享一个连接即可,Table、Admin 操作是线程不安全的,即用即弃。

2. 连接的实现

主要通过 ConnectionFactory 类进行创建管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
* created from returned connection share zookeeper connection, meta cache, and connections to
* region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* Connection connection = ConnectionFactory.createConnection(conf);
* Table table = connection.getTable(TableName.valueOf("table1"));
* try {
* table.get(...);
* ...
* } finally {
* table.close();
* connection.close();
* }
*/
public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
throws IOException {
setupHBaseUEParamsIfNeeded(conf);
setupHBaseMultiParamsIfNeeded(conf);
if (user == null) {
UserProvider provider = UserProvider.instantiate(conf);
user = provider.getCurrent();
}

String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionImplementation.class.getName());
Class<?> clazz;
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
try {
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class);
constructor.setAccessible(true);
return (Connection) constructor.newInstance(conf, pool, user);
} catch (Exception e) {
throw new IOException(e);
}
}

上面代码摘自 ConnectionFactory 类下面 createConnection 方法,通过反射,创建 ConnectionImplementation 类实例(这里被阿里云封装过,可以全局搜索 ConnectionImpl 应该可以搜到类似的方法)

根据参数,可以看到这个连接是有默认的连接池的

3. 连接池

在 ConnectionImplementation 类下面有一个变量 rpcClient,往下看源码可以看到一个类 AbstractRpcClient,下面有一个变量

1
this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf))

这里 connections 变量保存的就是具体的连接信息,可以看到 PoolMap<ConnectionId, T> connections 这里保存的是一个 map,同时这里的 ConnectionId 保存的是地址、ticket等等多个信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ConnectionId(User ticket, String serviceName, InetSocketAddress address) {
this.address = address;
this.ticket = ticket;
this.serviceName = serviceName;
}
/**
* Return the pool type specified in the configuration, which must be set to either
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
* former. For applications with many user threads, use a small round-robin pool. For applications
* with few user threads, you may want to try using a thread-local pool. In any case, the number
* of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
* system's hard limit on the number of connections.
* @param config configuration
* @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
*/
private static PoolMap.PoolType getPoolType(Configuration config) {
return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal);
}

首先是连接池类型,hbase.client.ipc.pool.type 根据这个配置项可以去选择,默认是 RoundRobin。如果是有许多线程的话,建议用 RoundRobin 这样的类型。

1
2
3
private static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
}

其次是连接池大小,hbase.client.ipc.pool.size 根据这个配置项可以调整大小,默认是 1,并且这一项只有当 RoundRobin 类型时生效

4. 实现

由于 Connection 是线程安全的,可以在程序初始化的时候创建一个连接,并且配置连接池大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 所有进程共用一个Connection对象
private static void initHBaseConnection() {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "url");
// 设置用户名密码,默认root:root,可根据实际情况调整
conf.set("hbase.client.username", "xxx");
conf.set("hbase.client.password", "xxx");
conf.set("hbase.client.ipc.pool.type", "RoundRobin");
conf.set("hbase.client.ipc.pool.size", "256");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
} catch (Exception e) {
e.printStackTrace();
}
hbaseConnection = connection;
}
// 每个线程使用单独的Table对象
Table table = hbaseConnection.getTable(TableName.valueOf("test"));
try {
...
} finally {
table.close();
}