|
目錄 正文 協(xié)處理器—Coprocessor
1、 起源
Hbase 作為列族數(shù)據(jù)庫最經(jīng)常被人詬病的特性包括:無法輕易建立“二級索引”,難以執(zhí) 行求和、計數(shù)、排序等操作。比如,在舊版本的(<0.92)Hbase 中,統(tǒng)計數(shù)據(jù)表的總行數(shù),需 要使用 Counter 方法,執(zhí)行一次 MapReduce Job 才能得到。雖然 HBase 在數(shù)據(jù)存儲層中集成 了 MapReduce,能夠有效用于數(shù)據(jù)表的分布式計算。然而在很多情況下,做一些簡單的相 加或者聚合計算的時候,如果直接將計算過程放置在 server 端,能夠減少通訊開銷,從而獲 得很好的性能提升。于是,HBase 在 0.92 之后引入了協(xié)處理器(coprocessors),實現(xiàn)一些激動 人心的新特性:能夠輕易建立二次索引、復(fù)雜過濾器(謂詞下推)以及訪問控制等。
2、介紹
協(xié)處理器有兩種:observer 和 endpoint
Observer 類似于傳統(tǒng)數(shù)據(jù)庫中的觸發(fā)器,當發(fā)生某些事件的時候這類協(xié)處理器會被 Server 端調(diào)用。Observer Coprocessor 就是一些散布在 HBase Server 端代碼中的 hook 鉤子, 在固定的事件發(fā)生時被調(diào)用。比如:put 操作之前有鉤子函數(shù) prePut,該函數(shù)在 put 操作執(zhí) 行前會被 Region Server 調(diào)用;在 put 操作之后則有 postPut 鉤子函數(shù)
以 HBase0.92 版本為例,它提供了三種觀察者接口:
RegionObserver:提供客戶端的數(shù)據(jù)操縱事件鉤子:Get、Put、Delete、Scan 等。
WALObserver:提供 WAL 相關(guān)操作鉤子。
MasterObserver:提供 DDL-類型的操作鉤子。如創(chuàng)建、刪除、修改數(shù)據(jù)表等。
到 0.96 版本又新增一個 RegionServerObserver
下圖是以 RegionObserver 為例子講解 Observer 這種協(xié)處理器的原理:

1、客戶端發(fā)出 put 請求
2、該請求被分派給合適的 RegionServer 和 region
3、coprocessorHost 攔截該請求,然后在該表上登記的每個 RegionObserver 上調(diào)用 prePut()
4、如果沒有被 prePut()攔截,該請求繼續(xù)送到 region,然后進行處理
5、region 產(chǎn)生的結(jié)果再次被 CoprocessorHost 攔截,調(diào)用 postPut()
6、假如沒有 postPut()攔截該響應(yīng),最終結(jié)果被返回給客戶端
Endpoint 協(xié)處理器類似傳統(tǒng)數(shù)據(jù)庫中的存儲過程,客戶端可以調(diào)用這些 Endpoint 協(xié)處 理器執(zhí)行一段 Server 端代碼,并將 Server 端代碼的結(jié)果返回給客戶端進一步處理,最常見 的用法就是進行聚集操作。如果沒有協(xié)處理器,當用戶需要找出一張表中的最大數(shù)據(jù),即 max 聚合操作,就必須進行全表掃描,在客戶端代碼內(nèi)遍歷掃描結(jié)果,并執(zhí)行求最大值的 操作。這樣的方法無法利用底層集群的并發(fā)能力,而將所有計算都集中到 Client 端統(tǒng)一執(zhí)行, 勢必效率低下。利用 Coprocessor,用戶可以將求最大值的代碼部署到 HBase Server 端,HBase 將利用底層 cluster 的多個節(jié)點并發(fā)執(zhí)行求最大值的操作。即在每個 Region 范圍內(nèi)執(zhí)行求最 大值的代碼,將每個 Region 的最大值在 Region Server 端計算出,僅僅將該 max 值返回給客 戶端。在客戶端進一步將多個 Region 的最大值進一步處理而找到其中的最大值。這樣整體 的執(zhí)行效率就會提高很多
下圖是 EndPoint 的工作原理:

3、總結(jié)
Observer 允許集群在正常的客戶端操作過程中可以有不同的行為表現(xiàn)
Endpoint 允許擴展集群的能力,對客戶端應(yīng)用開放新的運算命令
Observer 類似于 RDBMS 中的觸發(fā)器,主要在服務(wù)端工作
Endpoint 類似于 RDBMS 中的存儲過程,主要在服務(wù)端工作
Observer 可以實現(xiàn)權(quán)限管理、優(yōu)先級設(shè)置、監(jiān)控、ddl 控制、二級索引等功能
Endpoint 可以實現(xiàn) min、max、avg、sum、distinct、group by 等功能
協(xié)處理加載方式
協(xié)處理器的加載方式有兩種,我們稱之為靜態(tài)加載方式(Static Load)和動態(tài)加載方式 (Dynamic Load)。靜態(tài)加載的協(xié)處理器稱之為 System Coprocessor,動態(tài)加載的協(xié)處理器稱 之為 Table Coprocessor。
1、 靜態(tài)加載
通過修改 hbase-site.xml 這個文件來實現(xiàn),啟動全局 aggregation,能過操縱所有的表上 的數(shù)據(jù)。只需要添加如下代碼:
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
為所有 table 加載了一個 cp class,可以用”,”分割加載多個 class
2、 動態(tài)加載
啟用表 aggregation,只對特定的表生效。通過 HBase Shell 來實現(xiàn)。
(1)停用表 disable 'guanzhu'
(2)添加協(xié)處理器 alter 'guanzhu', METHOD => 'table_att', 'coprocessor' => 'hdfs://myha01/hbase/guanzhu.jar|com.study.hbase.cp.HbaseCoprocessorTest|1001|'
(3)啟用表 enable 'guanzhu'
3、 協(xié)處理器卸載
同樣是3步
disable 'mytable'
alter 'mytable',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
enable 'mytable'
案例(二級索引)
說明:二狗子是王寶強的粉絲
關(guān)注表:二狗子關(guān)注了王寶強 rowKey='ergouzi' cell="star:wangbaoqiang"
put 'guanzhu', 'ergouzi', 'cf:star', 'wangbaoqiang'
粉絲表:二狗子是王寶強的粉絲 rowKey="wangbaoqiang" cell="fensi:ergouzi"
put 'fans', 'wangbaoqiang', 'cf:fensi', 'ergouzi'
java實現(xiàn)代碼

public class HbaseCoprocessorTest extends BaseRegionObserver{
static Configuration conf = HBaseConfiguration.create();
static Connection conn = null;
static Table table = null;
static {
conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
try {
conn = ConnectionFactory.createConnection(conf);
table = conn.getTable(TableName.valueOf("fans"));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 此方法是在真正的put方法調(diào)用之前進行調(diào)用
* 參數(shù)put為table.put(put)里面的參數(shù)put對象,是要進行插入的那條數(shù)據(jù)
*
* 例如:要向關(guān)注表里面插入一條數(shù)據(jù) 姓名:二狗子 關(guān)注的明星:王寶強
* shell語句:put 'guanzhu','ergouzi', 'cf:star', 'wangbaoqiang'
*
* */
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
throws IOException {
//獲取put對象里面的rowkey'ergouzi'
byte[] row = put.getRow();
//獲取put對象里面的cell
List<Cell> list = put.get("cf".getBytes(), "star".getBytes());
Cell cell = list.get(0);
//創(chuàng)建一個新的put對象
Put new_put = new Put(cell.getValueArray());
new_put.addColumn("cf".getBytes(), "fensi".getBytes(), row);
table.put(new_put);
conn.close();
}
}

打成jar包,命名為guanzhu.jar,將其上傳到HDFS目錄/hbase下面
[hadoop@hadoop1 ~]$ hadoop fs -put guanzhu.jar /hbase
打開hbase shell命令,按順序呢執(zhí)行(提前已經(jīng)創(chuàng)建好guanzhu和fans表)

hbase(main):001:0> disable 'guanzhu'
0 row(s) in 2.8850 seconds
hbase(main):002:0> alter 'guanzhu', METHOD => 'table_att', 'coprocessor' => 'hdfs://myha01/hbase/guanzhu.jar|com.study.hbase.cp.HbaseCoprocessorTest|1001|'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 2.7570 seconds
hbase(main):003:0> enable 'guanzhu'
0 row(s) in 2.3400 seconds
hbase(main):004:0> desc 'guanzhu'
Table guanzhu is ENABLED
guanzhu, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://myha01/hbase/guanzhu.jar|com.study.hbase.cp.HbaseCoproce
ssorTest|1001|'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_
BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BL
OCKSIZE => '65536', REPLICATION_SCOPE => '0'}
1 row(s) in 0.0500 seconds
hbase(main):005:0> put 'guanzhu', 'ergouzi', 'cf:star', 'wangbaoqiang'
0 row(s) in 0.3050 seconds
hbase(main):006:0> scan 'guanzhu'
ROW COLUMN+CELL
ergouzi column=cf:star, timestamp=1522759023001, value=wangbaoqiang
1 row(s) in 0.0790 seconds
hbase(main):007:0> scan 'fans'
ROW COLUMN+CELL
\x00\x00\x00\x19\x00\x00\x00 column=cf:fensi, timestamp=1522759022996, value=ergouzi
\x0C\x00\x07ergouzi\x02cfsta
r\x7F\xFF\xFF\xFF\xFF\xFF\xF
F\xFF\x04wangbaoqiang
1 row(s) in 0.0330 seconds
hbase(main):008:0>

|